You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by pa...@apache.org on 2011/09/23 21:36:51 UTC

svn commit: r1174971 - in /incubator/airavata/trunk/modules/ws-messenger: commons/src/main/java/org/apache/airavata/wsmg/commons/storage/ messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/ messagebroker/src/main/java/org/apache/ai...

Author: patanachai
Date: Fri Sep 23 19:36:50 2011
New Revision: 1174971

URL: http://svn.apache.org/viewvc?rev=1174971&view=rev
Log:
AIRAVATA-101 clean up on database connection

Added:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
Removed:
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/KeyValueWrapper.java
Modified:
    incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
    incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
    incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java

Modified: incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/commons/src/main/java/org/apache/airavata/wsmg/commons/storage/JdbcStorage.java Fri Sep 23 19:36:50 2011
@@ -25,6 +25,7 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,9 +74,7 @@ public class JdbcStorage {
 
     public void commitAndFree(Connection conn) {
         commit(conn);
-        if (conn != null) {
-            closeConnection(conn);
-        }
+        closeConnection(conn);
     }
 
     public void rollback(Connection conn) {
@@ -90,9 +89,7 @@ public class JdbcStorage {
 
     public void rollbackAndFree(Connection conn) {
         rollback(conn);
-        if (conn != null) {
-            closeConnection(conn);
-        }
+        closeConnection(conn);
     }
 
     public Connection connect() {
@@ -107,32 +104,6 @@ public class JdbcStorage {
         return conn;
     }
 
-    public int update(String query) throws SQLException {
-        int result = 0;
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        try {
-            conn = connectionPool.getConnection();
-            stmt = conn.prepareStatement(query);
-            result = stmt.executeUpdate();
-            commit(conn);
-        } catch (SQLException sql) {
-            rollback(conn);
-            throw sql;
-        } finally {
-            try {
-                if (stmt != null && !stmt.isClosed()) {
-                    stmt.close();
-                }
-            } finally {
-                if (conn != null) {
-                    closeConnection(conn);
-                }
-            }
-        }
-        return result;
-    }
-
     /**
      * This method is provided so that you can have better control over the
      * statement. For example: You can use stmt.setString to convert quotation
@@ -174,18 +145,33 @@ public class JdbcStorage {
                     stmt.close();
                 }
             } finally {
-                if (conn != null) {
-                    closeConnection(conn);
-                }
+                closeConnection(conn);
             }
         }
         return count;
     }
 
+    public void quietlyClose(Connection conn, Statement... stmts) {
+        if (stmts != null) {
+            for (Statement stmt : stmts) {
+                try {
+                    if (stmt != null && !stmt.isClosed()) {
+                        stmt.close();
+                    }
+                } catch (SQLException sql) {
+                    log.error(sql.getMessage(), sql);
+                }
+            }
+        }
+        closeConnection(conn);
+    }
+
     public void closeConnection(Connection conn) {
-        connectionPool.free(conn);
-    }    
-    
+        if (conn != null) {
+            connectionPool.free(conn);
+        }
+    }
+
     public void closeAllConnections() {
         if (connectionPool != null)
             connectionPool.dispose();

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java Fri Sep 23 19:36:50 2011
@@ -43,32 +43,35 @@ import org.slf4j.LoggerFactory;
 public class DatabaseStorageImpl implements MsgBoxStorage {
 
     private static final Logger logger = LoggerFactory.getLogger(DatabaseStorageImpl.class);
-    
+
     private static final String TABLE_NAME_TO_CHECK = "msgbox";
-    
-    private JdbcStorage db;   
+
+    private JdbcStorage db;
 
     public DatabaseStorageImpl(String jdbcUrl, String jdbcDriver, long timeOfOldMessage) {
+        db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
+        
+        Connection conn = null;
         try {
-            db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
-            
+
             /*
              * Check database
              */
-            Connection conn = db.connect();
+            conn = db.connect();
             if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
                 DatabaseCreator.createMsgBoxDatabase(conn);
                 logger.info("New Database created for Message Box");
             } else {
                 logger.info("Database already created for Message Box!");
             }
-            db.closeConnection(conn);
-                       
+
             MessageBoxDB.initialize(db, timeOfOldMessage);
-            
+
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             throw new RuntimeException("Database failure");
+        } finally {
+            db.closeConnection(conn);
         }
     }
 
@@ -111,8 +114,8 @@ public class DatabaseStorageImpl impleme
     }
 
     public void dispose() {
-        if(db != null){
-            db.closeAllConnections();   
+        if (db != null) {
+            db.closeAllConnections();
         }
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java Fri Sep 23 19:36:50 2011
@@ -190,6 +190,7 @@ public class MessageBoxDB {
 
             Connection connection = null;
             PreparedStatement stmt = null;
+            PreparedStatement stmt2 = null;
             try {
                 connection = db.connect();
                 stmt = connection.prepareStatement(SQL_SELECT_MSGBOX_STATEMENT);
@@ -202,15 +203,13 @@ public class MessageBoxDB {
                     logger.debug(xmlString);
                     list.add(xmlString);
                 }
-                resultSet.close();
-                stmt.close();
 
                 /*
                  * Delete all retrieved messages
                  */
-                stmt = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
-                stmt.setString(1, msgBoxId);
-                db.executeUpdateAndClose(stmt);
+                stmt2 = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+                stmt2.setString(1, msgBoxId);
+                stmt2.executeUpdate();
 
                 // commit
                 db.commit(connection);
@@ -223,17 +222,7 @@ public class MessageBoxDB {
                  * If there is error during query, close everything and throw
                  * error
                  */
-                try {
-                    if (stmt != null && !stmt.isClosed()) {
-                        stmt.close();
-                    }
-                } catch (SQLException sql) {
-                    throw sql;
-                } finally {
-                    if (connection != null) {
-                        db.closeConnection(connection);
-                    }
-                }
+                db.quietlyClose(connection, stmt, stmt2);
             }
         }
         return list;
@@ -251,7 +240,6 @@ public class MessageBoxDB {
             db.rollbackAndFree(connection);
             logger.error("Caught exception while removing old entries from msgbox db table", sql);
         }
-
     }
 
     private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
@@ -267,16 +255,11 @@ public class MessageBoxDB {
                 msgBoxids.add(resultSet.getString("msgboxid"));
             }
             db.commit(connection);
+        } catch(SQLException e){
+            db.rollback(connection);
+            throw e;
         } finally {
-            try {
-                if (stmt != null) {
-                    stmt.close();
-                }
-            } catch (SQLException sql) {
-                throw sql;
-            } finally {
-                db.commitAndFree(connection);
-            }
+            db.quietlyClose(connection, stmt);            
         }
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/BrokerServiceLifeCycle.java Fri Sep 23 19:36:50 2011
@@ -64,12 +64,8 @@ public class BrokerServiceLifeCycle impl
         if (inited == null || inited == false) {
             log.info("starting broker");
             Axis2Utils.overrideAddressingPhaseHander(configContext, new PublishedMessageHandler());
-            initConfigurations(configContext, axisService);
-
-            WsmgConfigurationContext brokerConext = (WsmgConfigurationContext) configContext
-                    .getProperty(WsmgCommonConstants.BROKER_WSMGCONFIG);
-
-            initQueue(brokerConext.getStorage());
+            WsmgConfigurationContext brokerConext = initConfigurations(configContext, axisService);
+            initQueue(brokerConext);
             initDeliveryMethod(brokerConext.getConfigurationManager());
 
             inited = true;
@@ -79,7 +75,7 @@ public class BrokerServiceLifeCycle impl
         }
     }
 
-    private void initConfigurations(ConfigurationContext configContext, AxisService axisService) {
+    private WsmgConfigurationContext initConfigurations(ConfigurationContext configContext, AxisService axisService) {
 
         WsmgConfigurationContext wsmgConfig = new WsmgConfigurationContext();
         configContext.setProperty(WsmgCommonConstants.BROKER_WSMGCONFIG, wsmgConfig);
@@ -112,14 +108,15 @@ public class BrokerServiceLifeCycle impl
 
         NotificationProcessor notificatonProcessor = new NotificationProcessor(wsmgConfig);
         wsmgConfig.setNotificationProcessor(notificatonProcessor);
-
+        
+        return wsmgConfig;
     }
 
-    private void initQueue(WsmgStorage storage) {
+    private void initQueue(WsmgConfigurationContext context) {
 
         log.info("setting up queue");
 
-        WSMGParameter.OUT_GOING_QUEUE = storage;
+        WSMGParameter.OUT_GOING_QUEUE = context.getQueue();
 
         if (WSMGParameter.cleanQueueonStartUp) {
             log.debug("cleaning up persistant queue");
@@ -141,15 +138,12 @@ public class BrokerServiceLifeCycle impl
                     WsmgCommonConstants.STORAGE_TYPE_PERSISTANT).equalsIgnoreCase(
                     WsmgCommonConstants.STORAGE_TYPE_IN_MEMORY)) {
 
-                // user has asked to use in memory queue
-                // but with out starting the delivery thread.
-                // this will accumulate message in memory.
-
-                log.error("conflicting configuration ditected, " + "using in memory queue with out starting delivery "
-                        + "thread will result memory growth.");
+                /*
+                 *  user has asked to use in memory queue but without starting the delivery thread. this will accumulate message in memory.
+                 */
+                log.error("conflicting configuration detected, using in memory queue without starting delivery thread will result memory growth.");
 
             }
-
             return;
         }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Fri Sep 23 19:36:50 2011
@@ -30,8 +30,8 @@ import javax.xml.stream.XMLStreamExcepti
 
 import org.apache.airavata.wsmg.broker.context.ContextParameters;
 import org.apache.airavata.wsmg.broker.context.ProcessingContext;
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
 import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
 import org.apache.airavata.wsmg.config.WSMGParameter;
 import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
 import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
@@ -52,14 +52,14 @@ public class NotificationProcessor {
 
     private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);
 
-    WsmgConfigurationContext wsmgConfigContext = null;
+    private WsmgConfigurationContext wsmgConfigContext;
 
     protected long messageCounter = 0;
     protected long messageId = 0;
 
     OMFactory factory = OMAbstractFactory.getOMFactory();
 
-    OutGoingQueue outgoingQueue = null;
+    private OutGoingQueue outgoingQueue;
 
     public NotificationProcessor(WsmgConfigurationContext config) {
         init(config);
@@ -68,14 +68,6 @@ public class NotificationProcessor {
     private void init(WsmgConfigurationContext config) {
         this.wsmgConfigContext = config;
         outgoingQueue = config.getOutgoingQueue();
-
-        // Ramith : JMS support is removed.
-        assert (!WSMGParameter.useIncomingQueue);
-        /*
-         * PublisherThread publisherThread = new PublisherThread(wsmgConfigContext); new
-         * Thread(publisherThread).start();
-         */
-
     }
 
     private synchronized long getNextTrackId() {
@@ -272,9 +264,6 @@ public class NotificationProcessor {
     private void matchAndSave(String notificationMessage, String topicLocalString,
             AdditionalMessageContent additionalMessageContent) {
 
-        // Ramith: Jms support is not implemented now.
-        assert (!WSMGParameter.useIncomingQueue);
-
         List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();
 
         // not use incoming queue
@@ -300,9 +289,6 @@ public class NotificationProcessor {
         if (consumerInfoList.size() == 0) // No subscription
             return;
 
-        assert (WSMGParameter.useOutGoingQueue); // we will only use out going
-        // queue for the moment
-
         RunTimeStatistics.addNewNotificationMessageSize(message.length());
         OutGoingMessage outGoingMessage = new OutGoingMessage();
         outGoingMessage.setTextMessage(message);

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgInMemoryStorage.java Fri Sep 23 19:36:50 2011
@@ -31,7 +31,7 @@ import java.util.concurrent.LinkedBlocki
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
 
-public class WsmgInMemoryStorage implements WsmgStorage {
+public class WsmgInMemoryStorage implements WsmgStorage, WsmgQueue {
 
     private LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
 
@@ -39,8 +39,18 @@ public class WsmgInMemoryStorage impleme
 
     private Map<String, SubscriptionState> unexpirableSubscriptions = new ConcurrentHashMap<String, SubscriptionState>();
 
+    public int insert(SubscriptionState subscription) {
+        if (subscription.isNeverExpire()) {
+            unexpirableSubscriptions.put(subscription.getId(), subscription);
+        } else {
+            expirableSubscriptions.put(subscription.getId(), subscription);
+        }
+        return 0;
+    }
+    
     public int delete(String subscriptionId) {
-
+        expirableSubscriptions.remove(subscriptionId);
+        unexpirableSubscriptions.remove(subscriptionId);
         return 0;
     }
 
@@ -52,7 +62,6 @@ public class WsmgInMemoryStorage impleme
         Collection<SubscriptionState> entries = expirableSubscriptions.values();
 
         for (SubscriptionState s : entries) {
-
             SubscriptionEntry se = new SubscriptionEntry();
             se.setSubscribeXml(s.getSubscribeXml());
             se.setSubscriptionId(s.getId());
@@ -73,9 +82,7 @@ public class WsmgInMemoryStorage impleme
         Object obj = null;
 
         try {
-
             obj = queue.take();
-
         } catch (InterruptedException ie) {
             throw new RuntimeException("interruped exception occured", ie);
         }
@@ -85,31 +92,9 @@ public class WsmgInMemoryStorage impleme
 
     public void cleanup() {
         queue.clear();
-
     }
 
     public void enqueue(Object object, String trackId) {
         queue.offer(object);
-
-    }
-
-    public void flush() {
-        // nothing to do.
-    }
-
-    public int size() {
-        return queue.size();
     }
-
-    public int insert(SubscriptionState subscription) {
-
-        if (subscription.isNeverExpire()) {
-            unexpirableSubscriptions.put(subscription.getId(), subscription);
-        } else {
-            expirableSubscriptions.put(subscription.getId(), subscription);
-        }
-
-        return 0;
-    }
-
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java Fri Sep 23 19:36:50 2011
@@ -35,7 +35,6 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -54,27 +53,20 @@ import org.apache.axiom.om.OMElement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WsmgPersistantStorage implements WsmgStorage {
+public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
     private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
 
     /*
      * Table name
      */
-    public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
-    public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
-    
-    private static final String TABLE_NAME_TO_CHECK = TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+    private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
 
     private Counter storeToDBCounter = new Counter();
 
-    private JdbcStorage db = null;
-
-    private String dbName = null;
+    private JdbcStorage db;
 
     public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
 
-        this.dbName = TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
         db = new JdbcStorage(jdbcUrl, jdbcDriver);
 
         Connection conn = null;
@@ -90,13 +82,6 @@ public class WsmgPersistantStorage imple
                 logger.info("Database already created for Message Broker!");
             }
 
-            // inject dbname to sql statement.
-            SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY = String.format(
-                    SubscriptionConstants.INSERT_SQL_QUERY, TABLE_NAME_EXPIRABLE_SUBCRIPTIONS);
-
-            SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY = String.format(
-                    SubscriptionConstants.INSERT_SQL_QUERY, TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS);
-
             if (WSMGParameter.measureMessageRate) {
                 TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
                 new Thread(timerThread).start();
@@ -107,22 +92,6 @@ public class WsmgPersistantStorage imple
             logger.error(e.getMessage(), e);
             throw new RuntimeException("Database failure");
         } finally {
-            if (conn != null) {
-                db.closeConnection(conn);
-            }
-        }
-    }
-
-    private void quietlyClose(Statement stmt, Connection conn) {
-        try {
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
-        } catch (SQLException sql) {
-            logger.error(sql.getMessage(), sql);
-        }
-
-        if (conn != null) {
             db.closeConnection(conn);
         }
     }
@@ -137,17 +106,16 @@ public class WsmgPersistantStorage imple
 
         ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
 
-        String query = "SELECT * FROM " + dbName;
         Connection conn = null;
         PreparedStatement stmt = null;
         try {
 
             // get number of row first and increase the arrayList size for
             // better performance
-            int size = db.countRow(dbName, "*");
+            int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
 
             conn = db.connect();
-            stmt = conn.prepareStatement(query);
+            stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
             ResultSet rs = stmt.executeQuery();
             ret.ensureCapacity(size);
 
@@ -162,7 +130,7 @@ public class WsmgPersistantStorage imple
         } catch (SQLException ex) {
             logger.error("sql exception occured", ex);
         } finally {
-            quietlyClose(stmt, conn);
+            db.quietlyClose(conn, stmt);
         }
         return ret;
     }
@@ -196,11 +164,7 @@ public class WsmgPersistantStorage imple
             policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
         }
 
-        Date today = new Date();
-        Timestamp now = new Timestamp(today.getTime());
-
-        String sql = subscription.isNeverExpire() ? SubscriptionConstants.SPECIAL_SUBSCRIPTION_INSERT_QUERY
-                : SubscriptionConstants.ORDINARY_SUBSCRIPTION_INSERT_QUERY;
+        Timestamp now = new Timestamp(System.currentTimeMillis());
 
         int result = 0;
         Connection connection = null;
@@ -208,7 +172,7 @@ public class WsmgPersistantStorage imple
         try {
 
             connection = db.connect();
-            stmt = connection.prepareStatement(sql);
+            stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
 
             stmt.setString(1, subscription.getId());
             stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
@@ -221,8 +185,10 @@ public class WsmgPersistantStorage imple
                     consumerReferenceParameters.getBytes().length);
             stmt.setTimestamp(8, now);
             result = db.executeUpdateAndClose(stmt);
-            storeToDBCounter.addCounter();
             db.commitAndFree(connection);
+
+            storeToDBCounter.addCounter();
+
         } catch (SQLException ex) {
             logger.error("sql exception occured", ex);
             db.rollbackAndFree(connection);
@@ -238,34 +204,19 @@ public class WsmgPersistantStorage imple
      * .lang.String)
      */
     public int delete(String subscriptionId) {
-        String query;
-        query = "DELETE FROM " + dbName + " WHERE SubscriptionId='" + subscriptionId + "'";
+        int result = 0;
+        Connection connection = null;
         try {
-            return db.update(query);
-        } catch (SQLException ex) {
-            logger.error("sql exception occured", ex);
-        }
-        return 0;
-    }
-
-    // QUEUE related
-
-    public Object blockingDequeue() {
-        while (true) {
-            try {
-                // FIXME::: WHY RETURN KeyValueWrapper Object??????
-                // FIXME::: Can it cast to OutGoingMessage????
-                KeyValueWrapper wrapper = retrive();
-                done(wrapper.getKey());
-                return wrapper.getValue();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
-                e.printStackTrace();
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-                e.printStackTrace();
-            }
+            connection = db.connect();
+            PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
+            stmt.setString(1, subscriptionId);
+            result = db.executeUpdateAndClose(stmt);
+            db.commitAndFree(connection);
+        } catch (SQLException sql) {
+            db.rollbackAndFree(connection);
+            logger.error("sql exception occured", sql);
         }
+        return result;
     }
 
     public void cleanup() {
@@ -285,8 +236,24 @@ public class WsmgPersistantStorage imple
                     logger.error(e.getMessage(), e);
                 }
             }
+            db.quietlyClose(conn, stmt);
+        }
+    }
 
-            quietlyClose(stmt, conn);
+    public Object blockingDequeue() {
+        while (true) {
+            try {
+                return retrive();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+                e.printStackTrace();
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                logger.error(e.getMessage(), e);
+                e.printStackTrace();
+            }
         }
     }
 
@@ -295,6 +262,8 @@ public class WsmgPersistantStorage imple
         // Get the Max ID cache and update and unlock the table
         Connection connection = null;
         PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
         try {
             int nextkey;
 
@@ -308,11 +277,9 @@ public class WsmgPersistantStorage imple
 
             if (result.next()) {
                 nextkey = result.getInt(1);
-                result.close();
-                stmt.close();
 
-                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
-                db.executeUpdateAndClose(stmt);
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
+                stmt2.executeUpdate();
             } else {
                 throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
             }
@@ -320,19 +287,18 @@ public class WsmgPersistantStorage imple
             /*
              * After update MAX_ID put data into queue table
              */
-            stmt = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
-            stmt.setInt(1, nextkey);
-            stmt.setString(2, trackId);
+            stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
+            stmt3.setInt(1, nextkey);
+            stmt3.setString(2, trackId);
 
             ByteArrayOutputStream output = new ByteArrayOutputStream();
             ObjectOutputStream out = new ObjectOutputStream(output);
             out.writeObject(object);
             byte[] buffer = output.toByteArray();
             ByteArrayInputStream in = new ByteArrayInputStream(buffer);
-            stmt.setBinaryStream(3, in, buffer.length);
-            db.executeUpdateAndClose(stmt);
+            stmt3.setBinaryStream(3, in, buffer.length);
+            stmt3.executeUpdate();
             db.commit(connection);
-
         } catch (SQLException sqlEx) {
             db.rollback(connection);
             logger.error("unable to enque the message in persistant storage", sqlEx);
@@ -346,14 +312,10 @@ public class WsmgPersistantStorage imple
                 logger.error("Cannot Unlock Table", sql);
             }
 
-            quietlyClose(stmt, connection);
+            db.quietlyClose(connection, stmt, stmt2, stmt3);
         }
     }
 
-    public void flush() {
-        // nothing to do.
-    }
-
     public int size() {
         throw new UnsupportedOperationException();
     }
@@ -361,6 +323,9 @@ public class WsmgPersistantStorage imple
     private void initMessageQueueStorage() throws SQLException {
         Connection connection = null;
         PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
+        PreparedStatement stmt4 = null;
         try {
             connection = db.connect();
 
@@ -372,20 +337,18 @@ public class WsmgPersistantStorage imple
             stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
             ResultSet result = stmt.executeQuery();
             if (!result.next()) {
-                stmt.close();
-                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
-                db.executeUpdateAndClose(stmt);
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+                stmt2.executeUpdate();
             }
 
             /*
              * Get Min ID
              */
-            stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
-            result = stmt.executeQuery();
+            stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+            result = stmt3.executeQuery();
             if (!result.next()) {
-                stmt.close();
-                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
-                db.executeUpdateAndClose(stmt);
+                stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+                stmt4.executeUpdate();
             }
             db.commit(connection);
         } catch (SQLException sqle) {
@@ -398,19 +361,19 @@ public class WsmgPersistantStorage imple
                 logger.error("Cannot Unlock Table", sql);
             }
 
-            quietlyClose(stmt, connection);
+            db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
         }
     }
 
-    private KeyValueWrapper retrive() throws SQLException, IOException {
-        Object obj = null;
+    private Object retrive() throws SQLException, IOException, InterruptedException {
+        long wait = 1000;
         int nextkey = -1;
         int maxid = -2;
         Connection connection = null;
         PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
         ResultSet result = null;
-        long wait = 1000;
-
         while (true) {
             try {
                 connection = db.connect();
@@ -424,7 +387,6 @@ public class WsmgPersistantStorage imple
                 result = stmt.executeQuery();
                 if (result.next()) {
                     nextkey = result.getInt(1);
-                    stmt.close();
                 } else {
                     throw new RuntimeException("Queue init has failed earlier");
                 }
@@ -432,11 +394,10 @@ public class WsmgPersistantStorage imple
                 /*
                  * Get Max ID
                  */
-                stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-                result = stmt.executeQuery();
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+                result = stmt2.executeQuery();
                 if (result.next()) {
                     maxid = result.getInt(1);
-                    stmt.close();
                 } else {
                     throw new RuntimeException("Queue init has failed earlier");
                 }
@@ -445,12 +406,13 @@ public class WsmgPersistantStorage imple
                  * Update value and exit the loop
                  */
                 if (maxid > nextkey) {
-                    stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
-                    db.executeUpdateAndClose(stmt);
+                    stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+                    stmt3.executeUpdate();
                     logger.debug("Update MIN ID by one");
                     db.commit(connection);
                     break;
                 }
+
                 db.commit(connection);
             } catch (SQLException sql) {
                 db.rollback(connection);
@@ -463,7 +425,7 @@ public class WsmgPersistantStorage imple
                     logger.error("Cannot Unlock Table", sql);
                 }
 
-                quietlyClose(stmt, connection);
+                db.quietlyClose(connection, stmt, stmt2, stmt3);
             }
 
             /*
@@ -475,55 +437,46 @@ public class WsmgPersistantStorage imple
                 Thread.sleep(wait);
             } catch (InterruptedException e) {
                 logger.error(e.getMessage(), e);
-                break;
+                throw e;
             }
         }
 
         /*
-         * Create Subscription Object from MIN_ID
+         * Create Subscription Object from MIN_ID and delete data in table
          */
+        Object resultObj = null;
+        int key = -1;
         try {
             connection = db.connect();
             stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
             result = stmt.executeQuery();
             if (result.next()) {
-                int id = result.getInt(1);
+                key = result.getInt(1);
                 InputStream in = result.getAsciiStream(2);
                 ObjectInputStream s = new ObjectInputStream(in);
                 try {
-                    obj = s.readObject();
+                    resultObj = s.readObject();
                 } catch (ClassNotFoundException e) {
-                    logger.error(e.getMessage(), e);
+                    logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
                 }
-                return new KeyValueWrapper(id, obj);
             } else {
                 throw new RuntimeException(
                         "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
             }
-        } finally {
-            quietlyClose(stmt, connection);
-        }
-    }
 
-    /**
-     * Delete data in subscription table since it is read
-     * 
-     * @param key
-     * @throws SQLException
-     */
-    private void done(int key) throws SQLException {
-        String query = null;
-        Connection connection = null;
-        try {
-            connection = db.connect();
-            query = QueueContants.SQL_DELETE_STATEMENT + key;
-            PreparedStatement stmt = connection.prepareStatement(query);
-            db.executeUpdateAndClose(stmt);
-            db.commitAndFree(connection);
-        } catch (SQLException sqle) {
-            db.rollbackAndFree(connection);
-            throw sqle;
+            try {
+                String query = QueueContants.SQL_DELETE_STATEMENT + key;
+                stmt2 = connection.prepareStatement(query);
+                stmt2.executeUpdate();
+                db.commit(connection);
+            } catch (SQLException sqle) {
+                db.rollback(connection);
+                throw sqle;
+            }
+        } finally {
+            db.quietlyClose(connection, stmt, stmt2);
         }
+        return resultObj;
     }
 
     private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
@@ -717,12 +670,27 @@ public class WsmgPersistantStorage imple
 
     private static class SubscriptionConstants {
 
-        public static String INSERT_SQL_QUERY = "INSERT INTO %s(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+        public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
+
+        public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
+
+        public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+                + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+        public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+                + " WHERE SubscriptionId= ?";
+
+        public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+        public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
                 + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
 
-        public static String ORDINARY_SUBSCRIPTION_INSERT_QUERY = null;
+        public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+                + " WHERE SubscriptionId= ?";
 
-        public static String SPECIAL_SUBSCRIPTION_INSERT_QUERY = null;
+        public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
     }
 
     private static class QueueContants {

Added: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java?rev=1174971&view=auto
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java (added)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java Fri Sep 23 19:36:50 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public interface WsmgQueue {
+
+    List<SubscriptionEntry> getAllSubscription();
+
+    int insert(SubscriptionState subscription);
+
+    int delete(String subscriptionId);
+
+    void cleanup();
+
+    void enqueue(Object object, String trackId);
+
+    Object blockingDequeue();
+    
+}

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java Fri Sep 23 19:36:50 2011
@@ -28,20 +28,10 @@ import org.apache.airavata.wsmg.broker.s
 
 public interface WsmgStorage {
 
-    public abstract List<SubscriptionEntry> getAllSubscription();
+    List<SubscriptionEntry> getAllSubscription();
 
-    public abstract int insert(SubscriptionState subscription);
-
-    public abstract int delete(String subscriptionId);
-
-    public void enqueue(Object object, String trackId);
-
-    public Object blockingDequeue();
-
-    public int size();
-
-    public void flush();
-
-    public void cleanup();
+    int insert(SubscriptionState subscription);
 
+    int delete(String subscriptionId);
+    
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java Fri Sep 23 19:36:50 2011
@@ -21,21 +21,17 @@
 
 package org.apache.airavata.wsmg.config;
 
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
 
 public class WSMGParameter {
 
     /**
      * Global variable for the Out Going queue (contains message to send to subscribers)
      */
-    public static WsmgStorage OUT_GOING_QUEUE = null; // default=null
+    public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
     
     public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
 
-    public static final boolean useIncomingQueue = false; // default=false
-
-    public static final boolean useOutGoingQueue = true; // default=true
-
     // enable or disable the TimerThread that displays the message rate
     public static final boolean measureMessageRate = false; // default=false    
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java Fri Sep 23 19:36:50 2011
@@ -21,15 +21,14 @@
 
 package org.apache.airavata.wsmg.config;
 
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.airavata.wsmg.broker.NotificationProcessor;
 import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
 import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
 import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
 import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
 import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
@@ -41,8 +40,6 @@ public class WsmgConfigurationContext {
 
     private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
 
-    private Map<Object, Object> publisherRegistrationDB = new HashMap<Object, Object>();
-
     private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
 
     private ConfigurationManager configurationManager;
@@ -52,37 +49,24 @@ public class WsmgConfigurationContext {
     private NotificationProcessor notificationProcessor;
 
     private WsmgStorage storage;
-
-    /**
-     * @return Returns the publisherRegistrationDB.
-     */
-    public Map getPublisherRegistrationDB() {
-        return publisherRegistrationDB;
-    }
-
-    /**
-     * @return Returns the wsntAdapter.
-     */
-
-    public List<AbstractMessageMatcher> getMessageMatchers() {
-        return messageMatchers;
-    }
+    
+    private WsmgQueue queue;   
 
     public WsmgConfigurationContext() {
-
         outgoingQueue = new OutGoingQueue();
-
         setDirectFilter();
     }
 
     private void setDirectFilter() {
-
-        messageMatchers.add(new YFilterMessageMatcher(publisherRegistrationDB));
+        messageMatchers.add(new YFilterMessageMatcher());
         // messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
         // publisherRegistrationDB));
-
     }
 
+    public List<AbstractMessageMatcher> getMessageMatchers() {
+        return messageMatchers;
+    }
+    
     public OutGoingQueue getOutgoingQueue() {
         return outgoingQueue;
     }
@@ -118,6 +102,14 @@ public class WsmgConfigurationContext {
     public void setStorage(WsmgStorage s) {
         storage = s;
     }
+    
+    public WsmgQueue getQueue() {
+        return queue;
+    }
+
+    public void setQueue(WsmgQueue s) {
+        queue = s;
+    }    
 
     public ReentrantReadWriteLock getMessegeMatcherLock() {
         return messegeMatchersLock;

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -37,16 +37,11 @@ public abstract class AbstractMessageMat
 
     protected Map<String, String> currentMessageCache;
 
-    protected Map<Object, Object> publisherRegistrationDB;
-
     private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
 
     // infer types of
     // key and value
-
-    public AbstractMessageMatcher(Map<Object, Object> publisherRegistrationDB) {
-
-        this.publisherRegistrationDB = publisherRegistrationDB;
+    public AbstractMessageMatcher() {
         this.currentMessageCache = new HashMap<String, String>();
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
@@ -45,22 +44,18 @@ public class YFilterMessageMatcher exten
     
     private OutGoingQueue outGoingQueue = null;
 
-    // private HashMap subIdToQuery=new HashMap();
-    // private HashMap yFilterIdToXPath=new HashMap();
     private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
     private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
-    // private Map xpath2ConsumerListMap = new HashMap();
 
     // used for topic only subscription, so that we don't have to create a
     // YFilter object
     private ConsumerListManager consumerListmanager = new ConsumerListManager();
 
-    public YFilterMessageMatcher(Map<Object, Object> publisherRegistrationDB) {
-        super(publisherRegistrationDB);
+    public YFilterMessageMatcher() {
+        super();
     }
 
     public void start(String carrierLocation) {
-
         currentMessageCache = new Hashtable<String, String>();
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java Fri Sep 23 19:36:50 2011
@@ -24,7 +24,6 @@ package org.apache.airavata.wsmg.matchin
 import java.util.ArrayList;
 import java.util.Hashtable;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
 import org.apache.airavata.wsmg.broker.ConsumerInfo;
@@ -45,19 +44,11 @@ public class DirectWsntMessageMatcher ex
 
     private OutGoingQueue outGoingQueue = null;   
 
-    public DirectWsntMessageMatcher(
-
-    Map<Object, Object> publisherRegistrationDB) {
-        super(publisherRegistrationDB);
-    }
-
-    public DirectWsntMessageMatcher(Map<String, SubscriptionState> subscriptionDB,
-            Map<Object, Object> publisherRegistrationDB, String carrier) {
-        super(publisherRegistrationDB);
+    public DirectWsntMessageMatcher(){
+        super();
     }
 
     public void start(String carrierLocation) {
-
         currentMessageCache = new Hashtable<String, String>();
     }
 

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java Fri Sep 23 19:36:50 2011
@@ -35,13 +35,11 @@ public class OutGoingQueue {
     private Counter storeToOutQueueCounter;
 
     public OutGoingQueue() {
-
         if (WSMGParameter.measureMessageRate) {
             storeToOutQueueCounter = new Counter();
             TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
             new Thread(timerThread).start();
         }
-
     }
 
     // need synchronized???
@@ -50,7 +48,7 @@ public class OutGoingQueue {
         boolean loop = false;
         do {
 
-            // this outgoing Que is created inside the messenger which is
+            // this outgoing Queue is created inside the messenger which is
             // intended to send the notification message to the consumer.
             WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
                     .getTrackId());

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java Fri Sep 23 19:36:50 2011
@@ -154,13 +154,4 @@ public class RunTimeStatistics {
         htmlString += "</p>\n";
         return htmlString;
     }
-
-    /**
-     * @param args
-     */
-    public static void main(String[] args) {
-        // TODO Auto-generated method stub
-        // setStartUpTime();
-    }
-
 }

Modified: incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
URL: http://svn.apache.org/viewvc/incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java?rev=1174971&r1=1174970&r2=1174971&view=diff
==============================================================================
--- incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java (original)
+++ incubator/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java Fri Sep 23 19:36:50 2011
@@ -30,10 +30,6 @@ public class TimerThread implements Runn
 
     String comment = "";
 
-    public TimerThread(Counter counter) {
-        this.counter = counter;
-    }
-
     public TimerThread(Counter counter, String comment) {
         this.counter = counter;
         this.comment = comment;