You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/23 21:36:51 UTC
svn commit: r1174971 - in /incubator/airavata/trunk/modules/ws-messenger:
commons/src/main/java/org/apache/airavata/wsmg/commons/storage/
messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/
messagebroker/src/main/java/org/apache/ai...
Author: patanachai
Date: Fri Sep 23 19:36:50 2011
New Revision: 1174971
URL: http://svn.apache.org/viewvc?rev=1174971&view=rev
Log:
AIRAVATA-101 clean up on database connection
Added:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
Removed:
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/KeyValueWrapper.java
Modified:
incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java Fri Sep 23 19:36:50 2011
@@ -25,6 +25,7 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,9 +74,7 @@ public class JdbcStorage {
public void commitAndFree(Connection conn) {
commit(conn);
- if (conn != null) {
- closeConnection(conn);
- }
+ closeConnection(conn);
}
public void rollback(Connection conn) {
@@ -90,9 +89,7 @@ public class JdbcStorage {
public void rollbackAndFree(Connection conn) {
rollback(conn);
- if (conn != null) {
- closeConnection(conn);
- }
+ closeConnection(conn);
}
public Connection connect() {
@@ -107,32 +104,6 @@ public class JdbcStorage {
return conn;
}
- public int update(String query) throws SQLException {
- int result = 0;
- Connection conn = null;
- PreparedStatement stmt = null;
- try {
- conn = connectionPool.getConnection();
- stmt = conn.prepareStatement(query);
- result = stmt.executeUpdate();
- commit(conn);
- } catch (SQLException sql) {
- rollback(conn);
- throw sql;
- } finally {
- try {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- } finally {
- if (conn != null) {
- closeConnection(conn);
- }
- }
- }
- return result;
- }
-
/**
* This method is provided so that you can have better control over the
* statement. For example: You can use stmt.setString to convert quotation
@@ -174,18 +145,33 @@ public class JdbcStorage {
stmt.close();
}
} finally {
- if (conn != null) {
- closeConnection(conn);
- }
+ closeConnection(conn);
}
}
return count;
}
+ public void quietlyClose(Connection conn, Statement... stmts) {
+ if (stmts != null) {
+ for (Statement stmt : stmts) {
+ try {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ } catch (SQLException sql) {
+ log.error(sql.getMessage(), sql);
+ }
+ }
+ }
+ closeConnection(conn);
+ }
+
public void closeConnection(Connection conn) {
- connectionPool.free(conn);
- }
-
+ if (conn != null) {
+ connectionPool.free(conn);
+ }
+ }
+
public void closeAllConnections() {
if (connectionPool != null)
connectionPool.dispose();
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java Fri Sep 23 19:36:50 2011
@@ -43,32 +43,35 @@ import org.slf4j.LoggerFactory;
public class DatabaseStorageImpl implements MsgBoxStorage {
private static final Logger logger = LoggerFactory.getLogger(DatabaseStorageImpl.class);
-
+
private static final String TABLE_NAME_TO_CHECK = "msgbox";
-
- private JdbcStorage db;
+
+ private JdbcStorage db;
public DatabaseStorageImpl(String jdbcUrl, String jdbcDriver, long timeOfOldMessage) {
+ db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
+
+ Connection conn = null;
try {
- db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
-
+
/*
* Check database
*/
- Connection conn = db.connect();
+ conn = db.connect();
if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
DatabaseCreator.createMsgBoxDatabase(conn);
logger.info("New Database created for Message Box");
} else {
logger.info("Database already created for Message Box!");
}
- db.closeConnection(conn);
-
+
MessageBoxDB.initialize(db, timeOfOldMessage);
-
+
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException("Database failure");
+ } finally {
+ db.closeConnection(conn);
}
}
@@ -111,8 +114,8 @@ public class DatabaseStorageImpl impleme
}
public void dispose() {
- if(db != null){
- db.closeAllConnections();
+ if (db != null) {
+ db.closeAllConnections();
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java Fri Sep 23 19:36:50 2011
@@ -190,6 +190,7 @@ public class MessageBoxDB {
Connection connection = null;
PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
try {
connection = db.connect();
stmt = connection.prepareStatement(SQL_SELECT_MSGBOX_STATEMENT);
@@ -202,15 +203,13 @@ public class MessageBoxDB {
logger.debug(xmlString);
list.add(xmlString);
}
- resultSet.close();
- stmt.close();
/*
* Delete all retrieved messages
*/
- stmt = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
- stmt.setString(1, msgBoxId);
- db.executeUpdateAndClose(stmt);
+ stmt2 = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+ stmt2.setString(1, msgBoxId);
+ stmt2.executeUpdate();
// commit
db.commit(connection);
@@ -223,17 +222,7 @@ public class MessageBoxDB {
* If there is error during query, close everything and throw
* error
*/
- try {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- } catch (SQLException sql) {
- throw sql;
- } finally {
- if (connection != null) {
- db.closeConnection(connection);
- }
- }
+ db.quietlyClose(connection, stmt, stmt2);
}
}
return list;
@@ -251,7 +240,6 @@ public class MessageBoxDB {
db.rollbackAndFree(connection);
logger.error("Caught exception while removing old entries from msgbox db table", sql);
}
-
}
private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
@@ -267,16 +255,11 @@ public class MessageBoxDB {
msgBoxids.add(resultSet.getString("msgboxid"));
}
db.commit(connection);
+ } catch(SQLException e){
+ db.rollback(connection);
+ throw e;
} finally {
- try {
- if (stmt != null) {
- stmt.close();
- }
- } catch (SQLException sql) {
- throw sql;
- } finally {
- db.commitAndFree(connection);
- }
+ db.quietlyClose(connection, stmt);
}
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Fri Sep 23 19:36:50 2011
@@ -64,12 +64,8 @@ public class BrokerServiceLifeCycle impl
if (inited == null || inited == false) {
log.info("starting broker");
Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
- initConfigurations(configContext, axisService);
-
- WsmgConfigurationContext brokerConext = (WsmgConfigurationContext) configContext
- .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
- initQueue(brokerConext.getStorage());
+ WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService);
+ initQueue(brokerConext);
initDeliveryMethod(brokerConext.getConfigurationManager());
inited = true;
@@ -79,7 +75,7 @@ public class BrokerServiceLifeCycle impl
}
}
- private void initConfigurations(ConfigurationContext configContext, AxisService axisService) {
+ private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
@@ -112,14 +108,15 @@ public class BrokerServiceLifeCycle impl
NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
wsmgConfig.setNotificationProcessor(notificatonProcessor);
-
+
+ return wsmgConfig;
}
- private void initQueue(WsmgStorage storage) {
+ private void initQueue(WsmgConfigurationContext context) {
log.info("setting up queue");
- WSMGParameter.OUT_GOING_QUEUE = storage;
+ WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
if (WSMGParameter.cleanQueueonStartUp) {
log.debug("cleaning up persistant queue");
@@ -141,15 +138,12 @@ public class BrokerServiceLifeCycle impl
WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase(
WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) {
- // user has asked to use in memory queue
- // but with out starting the delivery thread.
- // this will accumulate message in memory.
-
- log.error("conflicting configuration ditected, " + "using in memory queue with out starting delivery "
- + "thread will result memory growth.");
+ /*
+ * user has asked to use in memory queue but without starting the delivery thread. this will accumulate message in memory.
+ */
+ log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
}
-
return;
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Fri Sep 23 19:36:50 2011
@@ -30,8 +30,8 @@ import javax.xml.stream.XMLStreamExcepti
import org.apache.airavata.wsmg.broker.context.ContextParameters;
import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
@@ -52,14 +52,14 @@ public class NotificationProcessor {
private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
- WsmgConfigurationContext wsmgConfigContext = null;
+ private WsmgConfigurationContext wsmgConfigContext;
protected long messageCounter = 0;
protected long messageId = 0;
OMFactory factory = OMAbstractFactory.getOMFactory();
- OutGoingQueue outgoingQueue = null;
+ private OutGoingQueue outgoingQueue;
public NotificationProcessor(WsmgConfigurationContext config) {
init(config);
@@ -68,14 +68,6 @@ public class NotificationProcessor {
private void init(WsmgConfigurationContext config) {
this.wsmgConfigContext = config;
outgoingQueue = config.getOutgoingQueue();
-
- // Ramith : JMS support is removed.
- assert (!WSMGParameter.useIncomingQueue);
- /*
- * PublisherThread publisherThread = new PublisherThread(wsmgConfigContext); new
- * Thread(publisherThread).start();
- */
-
}
private synchronized long getNextTrackId() {
@@ -272,9 +264,6 @@ public class NotificationProcessor {
private void matchAndSave(String notificationMessage, String topicLocalString,
AdditionalMessageContent additionalMessageContent) {
- // Ramith: Jms support is not implemented now.
- assert (!WSMGParameter.useIncomingQueue);
-
List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
// not use incoming queue
@@ -300,9 +289,6 @@ public class NotificationProcessor {
if (consumerInfoList.size() == 0) // No subscription
return;
- assert (WSMGParameter.useOutGoingQueue); // we will only use out going
- // queue for the moment
-
RunTimeStatistics.addNewNotificationMessageSize(message.length());
OutGoingMessage outGoingMessage = new OutGoingMessage();
outGoingMessage.setTextMessage(message);
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java Fri Sep 23 19:36:50 2011
@@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlocki
import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-public class WsmgInMemoryStorage implements WsmgStorage {
+public class WsmgInMemoryStorage implements WsmgStorage, WsmgQueue {
private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
@@ -39,8 +39,18 @@ public class WsmgInMemoryStorage impleme
private Map<String, SubscriptionState> unexpirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
+ public int insert(SubscriptionState subscription) {
+ if (subscription.isNeverExpire()) {
+ unexpirableSubscriptions.put(subscription.getId(), subscription);
+ } else {
+ expirableSubscriptions.put(subscription.getId(), subscription);
+ }
+ return 0;
+ }
+
public int delete(String subscriptionId) {
-
+ expirableSubscriptions.remove(subscriptionId);
+ unexpirableSubscriptions.remove(subscriptionId);
return 0;
}
@@ -52,7 +62,6 @@ public class WsmgInMemoryStorage impleme
Collection<SubscriptionState> entries = expirableSubscriptions.values();
for (SubscriptionState s : entries) {
-
SubscriptionEntry se = new SubscriptionEntry();
se.setSubscribeXml(s.getSubscribeXml());
se.setSubscriptionId(s.getId());
@@ -73,9 +82,7 @@ public class WsmgInMemoryStorage impleme
Object obj = null;
try {
-
obj = queue.take();
-
} catch (InterruptedException ie) {
throw new RuntimeException("interruped exception occured", ie);
}
@@ -85,31 +92,9 @@ public class WsmgInMemoryStorage impleme
public void cleanup() {
queue.clear();
-
}
public void enqueue(Object object, String trackId) {
queue.offer(object);
-
- }
-
- public void flush() {
- // nothing to do.
- }
-
- public int size() {
- return queue.size();
}
-
- public int insert(SubscriptionState subscription) {
-
- if (subscription.isNeverExpire()) {
- unexpirableSubscriptions.put(subscription.getId(), subscription);
- } else {
- expirableSubscriptions.put(subscription.getId(), subscription);
- }
-
- return 0;
- }
-
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Fri Sep 23 19:36:50 2011
@@ -35,7 +35,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -54,27 +53,20 @@ import org.apache.axiom.om.OMElement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WsmgPersistantStorage implements WsmgStorage {
+public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
/*
* Table name
*/
- public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
- public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
-
- private static final String TABLE_NAME_TO_CHECK = TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+ private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
private Counter storeToDBCounter = new Counter();
- private JdbcStorage db = null;
-
- private String dbName = null;
+ private JdbcStorage db;
public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
- this.dbName = TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
db = new JdbcStorage(jdbcUrl, jdbcDriver);
Connection conn = null;
@@ -90,13 +82,6 @@ public class WsmgPersistantStorage imple
logger.info("Database already created for Message Broker!");
}
- // inject dbname to sql statement.
- SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
- SubscriptionConstants.INSERT_SQL_QUERY, TABLE_NAME_EXPIRABLE_SUBCRIPTIONS);
-
- SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY = String.format(
- SubscriptionConstants.INSERT_SQL_QUERY, TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS);
-
if (WSMGParameter.measureMessageRate) {
TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
new Thread(timerThread).start();
@@ -107,22 +92,6 @@ public class WsmgPersistantStorage imple
logger.error(e.getMessage(), e);
throw new RuntimeException("Database failure");
} finally {
- if (conn != null) {
- db.closeConnection(conn);
- }
- }
- }
-
- private void quietlyClose(Statement stmt, Connection conn) {
- try {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- } catch (SQLException sql) {
- logger.error(sql.getMessage(), sql);
- }
-
- if (conn != null) {
db.closeConnection(conn);
}
}
@@ -137,17 +106,16 @@ public class WsmgPersistantStorage imple
ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
- String query = "SELECT * FROM " + dbName;
Connection conn = null;
PreparedStatement stmt = null;
try {
// get number of row first and increase the arrayList size for
// better performance
- int size = db.countRow(dbName, "*");
+ int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
conn = db.connect();
- stmt = conn.prepareStatement(query);
+ stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
ResultSet rs = stmt.executeQuery();
ret.ensureCapacity(size);
@@ -162,7 +130,7 @@ public class WsmgPersistantStorage imple
} catch (SQLException ex) {
logger.error("sql exception occured", ex);
} finally {
- quietlyClose(stmt, conn);
+ db.quietlyClose(conn, stmt);
}
return ret;
}
@@ -196,11 +164,7 @@ public class WsmgPersistantStorage imple
policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
}
- Date today = new Date();
- Timestamp now = new Timestamp(today.getTime());
-
- String sql = subscription.isNeverExpire() ? SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY
- : SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY;
+ Timestamp now = new Timestamp(System.currentTimeMillis());
int result = 0;
Connection connection = null;
@@ -208,7 +172,7 @@ public class WsmgPersistantStorage imple
try {
connection = db.connect();
- stmt = connection.prepareStatement(sql);
+ stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
stmt.setString(1, subscription.getId());
stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
@@ -221,8 +185,10 @@ public class WsmgPersistantStorage imple
consumerReferenceParameters.getBytes().length);
stmt.setTimestamp(8, now);
result = db.executeUpdateAndClose(stmt);
- storeToDBCounter.addCounter();
db.commitAndFree(connection);
+
+ storeToDBCounter.addCounter();
+
} catch (SQLException ex) {
logger.error("sql exception occured", ex);
db.rollbackAndFree(connection);
@@ -238,34 +204,19 @@ public class WsmgPersistantStorage imple
* .lang.String)
*/
public int delete(String subscriptionId) {
- String query;
- query = "DELETE FROM " + dbName + " WHERE SubscriptionId='" + subscriptionId + "'";
+ int result = 0;
+ Connection connection = null;
try {
- return db.update(query);
- } catch (SQLException ex) {
- logger.error("sql exception occured", ex);
- }
- return 0;
- }
-
- // QUEUE related
-
- public Object blockingDequeue() {
- while (true) {
- try {
- // FIXME::: WHY RETURN KeyValueWrapper Object??????
- // FIXME::: Can it cast to OutGoingMessage????
- KeyValueWrapper wrapper = retrive();
- done(wrapper.getKey());
- return wrapper.getValue();
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- e.printStackTrace();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- e.printStackTrace();
- }
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
+ stmt.setString(1, subscriptionId);
+ result = db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ logger.error("sql exception occured", sql);
}
+ return result;
}
public void cleanup() {
@@ -285,8 +236,24 @@ public class WsmgPersistantStorage imple
logger.error(e.getMessage(), e);
}
}
+ db.quietlyClose(conn, stmt);
+ }
+ }
- quietlyClose(stmt, conn);
+ public Object blockingDequeue() {
+ while (true) {
+ try {
+ return retrive();
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
}
}
@@ -295,6 +262,8 @@ public class WsmgPersistantStorage imple
// Get the Max ID cache and update and unlock the table
Connection connection = null;
PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
try {
int nextkey;
@@ -308,11 +277,9 @@ public class WsmgPersistantStorage imple
if (result.next()) {
nextkey = result.getInt(1);
- result.close();
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
- db.executeUpdateAndClose(stmt);
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
+ stmt2.executeUpdate();
} else {
throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
}
@@ -320,19 +287,18 @@ public class WsmgPersistantStorage imple
/*
* After update MAX_ID put data into queue table
*/
- stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
- stmt.setInt(1, nextkey);
- stmt.setString(2, trackId);
+ stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
+ stmt3.setInt(1, nextkey);
+ stmt3.setString(2, trackId);
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(output);
out.writeObject(object);
byte[] buffer = output.toByteArray();
ByteArrayInputStream in = new ByteArrayInputStream(buffer);
- stmt.setBinaryStream(3, in, buffer.length);
- db.executeUpdateAndClose(stmt);
+ stmt3.setBinaryStream(3, in, buffer.length);
+ stmt3.executeUpdate();
db.commit(connection);
-
} catch (SQLException sqlEx) {
db.rollback(connection);
logger.error("unable to enque the message in persistant storage", sqlEx);
@@ -346,14 +312,10 @@ public class WsmgPersistantStorage imple
logger.error("Cannot Unlock Table", sql);
}
- quietlyClose(stmt, connection);
+ db.quietlyClose(connection, stmt, stmt2, stmt3);
}
}
- public void flush() {
- // nothing to do.
- }
-
public int size() {
throw new UnsupportedOperationException();
}
@@ -361,6 +323,9 @@ public class WsmgPersistantStorage imple
private void initMessageQueueStorage() throws SQLException {
Connection connection = null;
PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
+ PreparedStatement stmt4 = null;
try {
connection = db.connect();
@@ -372,20 +337,18 @@ public class WsmgPersistantStorage imple
stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
ResultSet result = stmt.executeQuery();
if (!result.next()) {
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
- db.executeUpdateAndClose(stmt);
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+ stmt2.executeUpdate();
}
/*
* Get Min ID
*/
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
+ stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+ result = stmt3.executeQuery();
if (!result.next()) {
- stmt.close();
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
- db.executeUpdateAndClose(stmt);
+ stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+ stmt4.executeUpdate();
}
db.commit(connection);
} catch (SQLException sqle) {
@@ -398,19 +361,19 @@ public class WsmgPersistantStorage imple
logger.error("Cannot Unlock Table", sql);
}
- quietlyClose(stmt, connection);
+ db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
}
}
- private KeyValueWrapper retrive() throws SQLException, IOException {
- Object obj = null;
+ private Object retrive() throws SQLException, IOException, InterruptedException {
+ long wait = 1000;
int nextkey = -1;
int maxid = -2;
Connection connection = null;
PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
ResultSet result = null;
- long wait = 1000;
-
while (true) {
try {
connection = db.connect();
@@ -424,7 +387,6 @@ public class WsmgPersistantStorage imple
result = stmt.executeQuery();
if (result.next()) {
nextkey = result.getInt(1);
- stmt.close();
} else {
throw new RuntimeException("Queue init has failed earlier");
}
@@ -432,11 +394,10 @@ public class WsmgPersistantStorage imple
/*
* Get Max ID
*/
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+ result = stmt2.executeQuery();
if (result.next()) {
maxid = result.getInt(1);
- stmt.close();
} else {
throw new RuntimeException("Queue init has failed earlier");
}
@@ -445,12 +406,13 @@ public class WsmgPersistantStorage imple
* Update value and exit the loop
*/
if (maxid > nextkey) {
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
- db.executeUpdateAndClose(stmt);
+ stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+ stmt3.executeUpdate();
logger.debug("Update MIN ID by one");
db.commit(connection);
break;
}
+
db.commit(connection);
} catch (SQLException sql) {
db.rollback(connection);
@@ -463,7 +425,7 @@ public class WsmgPersistantStorage imple
logger.error("Cannot Unlock Table", sql);
}
- quietlyClose(stmt, connection);
+ db.quietlyClose(connection, stmt, stmt2, stmt3);
}
/*
@@ -475,55 +437,46 @@ public class WsmgPersistantStorage imple
Thread.sleep(wait);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
- break;
+ throw e;
}
}
/*
- * Create Subscription Object from MIN_ID
+ * Create Subscription Object from MIN_ID and delete data in table
*/
+ Object resultObj = null;
+ int key = -1;
try {
connection = db.connect();
stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
result = stmt.executeQuery();
if (result.next()) {
- int id = result.getInt(1);
+ key = result.getInt(1);
InputStream in = result.getAsciiStream(2);
ObjectInputStream s = new ObjectInputStream(in);
try {
- obj = s.readObject();
+ resultObj = s.readObject();
} catch (ClassNotFoundException e) {
- logger.error(e.getMessage(), e);
+ logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
}
- return new KeyValueWrapper(id, obj);
} else {
throw new RuntimeException(
"MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
}
- } finally {
- quietlyClose(stmt, connection);
- }
- }
- /**
- * Delete data in subscription table since it is read
- *
- * @param key
- * @throws SQLException
- */
- private void done(int key) throws SQLException {
- String query = null;
- Connection connection = null;
- try {
- connection = db.connect();
- query = QueueContants.SQL_DELETE_STATEMENT + key;
- PreparedStatement stmt = connection.prepareStatement(query);
- db.executeUpdateAndClose(stmt);
- db.commitAndFree(connection);
- } catch (SQLException sqle) {
- db.rollbackAndFree(connection);
- throw sqle;
+ try {
+ String query = QueueContants.SQL_DELETE_STATEMENT + key;
+ stmt2 = connection.prepareStatement(query);
+ stmt2.executeUpdate();
+ db.commit(connection);
+ } catch (SQLException sqle) {
+ db.rollback(connection);
+ throw sqle;
+ }
+ } finally {
+ db.quietlyClose(connection, stmt, stmt2);
}
+ return resultObj;
}
private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
@@ -717,12 +670,27 @@ public class WsmgPersistantStorage imple
private static class SubscriptionConstants {
- public static String INSERT_SQL_QUERY = "INSERT INTO %s(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+ public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
+
+ public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
+
+ public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+ + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+ + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+ public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+ + " WHERE SubscriptionId= ?";
+
+ public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+ public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+ + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+ "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
- public static String ORDINARY_SUBSCRIPTION_INSERT_QUERY = null;
+ public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+ + " WHERE SubscriptionId= ?";
- public static String SPECIAL_SUBSCRIPTION_INSERT_QUERY = null;
+ public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
}
private static class QueueContants {
Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java?rev=1174971&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java Fri Sep 23 19:36:50 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public interface WsmgQueue {
+
+ List<SubscriptionEntry> getAllSubscription();
+
+ int insert(SubscriptionState subscription);
+
+ int delete(String subscriptionId);
+
+ void cleanup();
+
+ void enqueue(Object object, String trackId);
+
+ Object blockingDequeue();
+
+}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java Fri Sep 23 19:36:50 2011
@@ -28,20 +28,10 @@ import org.apache.airavata.wsmg.broker.s
public interface WsmgStorage {
- public abstract List<SubscriptionEntry> getAllSubscription();
+ List<SubscriptionEntry> getAllSubscription();
- public abstract int insert(SubscriptionState subscription);
-
- public abstract int delete(String subscriptionId);
-
- public void enqueue(Object object, String trackId);
-
- public Object blockingDequeue();
-
- public int size();
-
- public void flush();
-
- public void cleanup();
+ int insert(SubscriptionState subscription);
+ int delete(String subscriptionId);
+
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java Fri Sep 23 19:36:50 2011
@@ -21,21 +21,17 @@
package org.apache.airavata.wsmg.config;
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
public class WSMGParameter {
/**
* Global variable for the Out Going queue (contains message to send to subscribers)
*/
- public static WsmgStorage OUT_GOING_QUEUE = null; // default=null
+ public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
- public static final boolean useIncomingQueue = false; // default=false
-
- public static final boolean useOutGoingQueue = true; // default=true
-
// enable or disable the TimerThread that displays the message rate
public static final boolean measureMessageRate = false; // default=false
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java Fri Sep 23 19:36:50 2011
@@ -21,15 +21,14 @@
package org.apache.airavata.wsmg.config;
-import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.airavata.wsmg.broker.NotificationProcessor;
import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
@@ -41,8 +40,6 @@ public class WsmgConfigurationContext {
private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
- private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();
-
private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
private ConfigurationManager configurationManager;
@@ -52,37 +49,24 @@ public class WsmgConfigurationContext {
private NotificationProcessor notificationProcessor;
private WsmgStorage storage;
-
- /**
- * @return Returns the publisherRegistrationDB.
- */
- public Map getPublisherRegistrationDB() {
- return publisherRegistrationDB;
- }
-
- /**
- * @return Returns the wsntAdapter.
- */
-
- public List<AbstractMessageMatcher> getMessageMatchers() {
- return messageMatchers;
- }
+
+ private WsmgQueue queue;
public WsmgConfigurationContext() {
-
outgoingQueue = new OutGoingQueue();
-
setDirectFilter();
}
private void setDirectFilter() {
-
- messageMatchers.add(new YFilterMessageMatcher(publisherRegistrationDB));
+ messageMatchers.add(new YFilterMessageMatcher());
// messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
// publisherRegistrationDB));
-
}
+ public List<AbstractMessageMatcher> getMessageMatchers() {
+ return messageMatchers;
+ }
+
public OutGoingQueue getOutgoingQueue() {
return outgoingQueue;
}
@@ -118,6 +102,14 @@ public class WsmgConfigurationContext {
public void setStorage(WsmgStorage s) {
storage = s;
}
+
+ public WsmgQueue getQueue() {
+ return queue;
+ }
+
+ public void setQueue(WsmgQueue s) {
+ queue = s;
+ }
public ReentrantReadWriteLock getMessegeMatcherLock() {
return messegeMatchersLock;
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -37,16 +37,11 @@ public abstract class AbstractMessageMat
protected Map<String, String> currentMessageCache;
- protected Map<Object, Object> publisherRegistrationDB;
-
private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
// infer types of
// key and value
-
- public AbstractMessageMatcher(Map<Object, Object> publisherRegistrationDB) {
-
- this.publisherRegistrationDB = publisherRegistrationDB;
+ public AbstractMessageMatcher() {
this.currentMessageCache = new HashMap<String, String>();
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
-import java.util.Map;
import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
@@ -45,22 +44,18 @@ public class YFilterMessageMatcher exten
private OutGoingQueue outGoingQueue = null;
- // private HashMap subIdToQuery=new HashMap();
- // private HashMap yFilterIdToXPath=new HashMap();
private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
- // private Map xpath2ConsumerListMap = new HashMap();
// used for topic only subscription, so that we don't have to create a
// YFilter object
private ConsumerListManager consumerListmanager = new ConsumerListManager();
- public YFilterMessageMatcher(Map<Object, Object> publisherRegistrationDB) {
- super(publisherRegistrationDB);
+ public YFilterMessageMatcher() {
+ super();
}
public void start(String carrierLocation) {
-
currentMessageCache = new Hashtable<String, String>();
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -24,7 +24,6 @@ package org.apache.airavata.wsmg.matchin
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
-import java.util.Map;
import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
@@ -45,19 +44,11 @@ public class DirectWsntMessageMatcher ex
private OutGoingQueue outGoingQueue = null;
- public DirectWsntMessageMatcher(
-
- Map<Object, Object> publisherRegistrationDB) {
- super(publisherRegistrationDB);
- }
-
- public DirectWsntMessageMatcher(Map<String, SubscriptionState> subscriptionDB,
- Map<Object, Object> publisherRegistrationDB, String carrier) {
- super(publisherRegistrationDB);
+ public DirectWsntMessageMatcher(){
+ super();
}
public void start(String carrierLocation) {
-
currentMessageCache = new Hashtable<String, String>();
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java Fri Sep 23 19:36:50 2011
@@ -35,13 +35,11 @@ public class OutGoingQueue {
private Counter storeToOutQueueCounter;
public OutGoingQueue() {
-
if (WSMGParameter.measureMessageRate) {
storeToOutQueueCounter = new Counter();
TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
new Thread(timerThread).start();
}
-
}
// need synchronized???
@@ -50,7 +48,7 @@ public class OutGoingQueue {
boolean loop = false;
do {
- // this outgoing Que is created inside the messenger which is
+ // this outgoing Queue is created inside the messenger which is
// intended to send the notification message to the consumer.
WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
.getTrackId());
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java Fri Sep 23 19:36:50 2011
@@ -154,13 +154,4 @@ public class RunTimeStatistics {
htmlString += "</p>\n";
return htmlString;
}
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- // setStartUpTime();
- }
-
}
Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java Fri Sep 23 19:36:50 2011
@@ -30,10 +30,6 @@ public class TimerThread implements Runn
String comment = "";
- public TimerThread(Counter counter) {
- this.counter = counter;
- }
-
public TimerThread(Counter counter, String comment) {
this.counter = counter;
this.comment = comment;