You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:26 UTC

[84/90] [abbrv] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
new file mode 100644
index 0000000..60a9705
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
@@ -0,0 +1,773 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.util.Counter;
+import org.apache.airavata.wsmg.util.TimerThread;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
+    private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
+
+    /*
+     * Table name
+     */
+    private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+    private Counter storeToDBCounter = new Counter();
+
+    private JdbcStorage db;
+
+    public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
+
+        db = new JdbcStorage(jdbcUrl, jdbcDriver);
+
+        Connection conn = null;
+        try {
+            /*
+             * Check database
+             */
+            conn = db.connect();
+            if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
+                DatabaseCreator.createMsgBrokerDatabase(conn);
+                logger.info("New Database created for Message Broker");
+            } else {
+                logger.debug("Database already created for Message Broker!");
+            }
+
+            if (WSMGParameter.measureMessageRate) {
+                TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
+                new Thread(timerThread).start();
+            }
+
+            initMessageQueueStorage();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException("Database failure");
+        } finally {
+            db.closeConnection(conn);
+        }
+    }
+
+    public void dispose() {
+        if (db != null) {
+            db.closeAllConnections();
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
+     */
+    public List<SubscriptionEntry> getAllSubscription() {
+
+        ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
+
+        Connection conn = null;
+        PreparedStatement stmt = null;
+        try {
+
+            // get number of row first and increase the arrayList size for
+            // better performance
+            int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
+
+            conn = db.connect();
+            stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
+            ResultSet rs = stmt.executeQuery();
+            ret.ensureCapacity(size);
+
+            if (rs != null) {
+
+                /*
+                 * Buffer data
+                 */
+                int nRead;
+                byte[] buffer = new byte[1024];
+                ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+
+                while (rs.next()) {
+                    SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
+                    subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
+
+                    /*
+                     * Read Binary Stream
+                     */
+                    InputStream inStream = null;
+
+                    try {
+                        inStream = rs.getBinaryStream("content");
+                        while ((nRead = inStream.read(buffer)) != -1) {
+                            outStream.write(buffer, 0, nRead);
+                        }
+                        outStream.flush();
+
+                        subscriptionEntry.setSubscribeXml(new String(outStream.toByteArray()));
+
+                    } catch (IOException ie) {
+                        logger.error("Unable to read XML from database", ie);
+
+                        // skip this subscription entry
+                        continue;
+                    } finally {
+                        // clear all data in outputStream
+                        outStream.reset();
+
+                        // close database stream
+                        if (inStream != null) {
+                            try {
+                                inStream.close();
+                            } catch (Exception e) {
+                                logger.error("Cannot close database stream", e);
+                            }
+                        }
+                    }
+
+                    ret.add(subscriptionEntry);
+
+                }
+            }
+        } catch (SQLException ex) {
+            logger.error("sql exception occured", ex);
+        } finally {
+            db.quietlyClose(conn, stmt);
+        }
+        return ret;
+    }
+
+    public int insert(SubscriptionState subscription) {
+        String address = subscription.getConsumerReference().getAddress();
+        Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
+
+        String consumerReferenceParameters = null;
+        if (referenceParametersMap == null) {
+            consumerReferenceParameters = "";
+        } else {
+
+            StringBuffer buffer = new StringBuffer();
+
+            for (Iterator<OMElement> ite = referenceParametersMap.values().iterator(); ite.hasNext();) {
+                OMElement currentReferenceParameter = ite.next();
+
+                try {
+                    buffer.append(currentReferenceParameter.toStringWithConsume());
+                } catch (XMLStreamException se) {
+                    logger.error("unable to convert reference parameter", se);
+                }
+
+            }
+            consumerReferenceParameters = buffer.toString();
+        }
+
+        int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
+        if (subscription.isWsrmPolicy()) {
+            policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
+        }
+
+        Timestamp now = new Timestamp(System.currentTimeMillis());
+
+        int result = 0;
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        try {
+
+            connection = db.connect();
+            stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
+
+            stmt.setString(1, subscription.getId());
+            stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
+                    .getSubscribeXml().getBytes().length);
+            stmt.setInt(3, policyValue);
+            stmt.setString(4, subscription.getLocalTopic());
+            stmt.setString(5, subscription.getXpathString());
+            stmt.setString(6, address);
+            stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
+                    consumerReferenceParameters.getBytes().length);
+            stmt.setTimestamp(8, now);
+            result = db.executeUpdateAndClose(stmt);
+            db.commitAndFree(connection);
+
+            storeToDBCounter.addCounter();
+
+        } catch (SQLException ex) {
+            logger.error("sql exception occured", ex);
+            db.rollbackAndFree(connection);
+        }
+        return result;
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java .lang.String)
+     */
+    public int delete(String subscriptionId) {
+        int result = 0;
+        Connection connection = null;
+        try {
+            connection = db.connect();
+            PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
+            stmt.setString(1, subscriptionId);
+            result = db.executeUpdateAndClose(stmt);
+            db.commitAndFree(connection);
+        } catch (SQLException sql) {
+            db.rollbackAndFree(connection);
+            logger.error("sql exception occured", sql);
+        }
+        return result;
+    }
+
+    public void cleanup() {
+        Connection conn = null;
+        Statement stmt = null;
+        try {
+            conn = db.connect();
+            stmt = conn.createStatement();
+            batchCleanDB(stmt, conn);
+        } catch (SQLException e) {
+            logger.error(e.getMessage(), e);
+        } finally {
+            if (db.isAutoCommit()) {
+                try {
+                    conn.setAutoCommit(true);
+                } catch (SQLException e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+            db.quietlyClose(conn, stmt);
+        }
+    }
+
+    public Object blockingDequeue() throws InterruptedException {
+        while (true) {
+            try {
+                return retrive();
+            } catch (SQLException e) {
+                logger.error(e.getMessage(), e);
+                e.printStackTrace();
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                e.printStackTrace();
+            }
+        }
+    }
+
+    public void enqueue(Object object, String trackId) {
+
+        // Get the Max ID cache and update and unlock the table
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
+        try {
+            int nextkey;
+
+            connection = db.connect();
+
+            lockMaxMinTables(connection);
+
+            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+
+            ResultSet result = stmt.executeQuery();
+
+            if (result.next()) {
+                nextkey = result.getInt(1);
+
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
+                stmt2.executeUpdate();
+            } else {
+                throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
+            }
+            
+            /**
+             * Before executing the SQL_INSERT_STATEMENT query, we need to unlock 
+             * MaxIDTable and MinIDTable since we are going to insert data to another 
+             * table, disQ. If we do not unlock tables, insert query fails in MySQL. But 
+             * in Derby, this will execute without any issues even without unlocking 
+             * tables. Since it fails with MySQL, we need to unlock the tables 
+             * before executing the insert query.
+             */
+            try{
+            	 unLockTables(connection);
+            }catch (SQLException sql) {
+                logger.error("Cannot Unlock Table", sql);
+            }
+           
+
+            /*
+             * After update MAX_ID put data into queue table
+             */
+            stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
+            stmt3.setInt(1, nextkey);
+            stmt3.setString(2, trackId);
+
+            ByteArrayOutputStream output = new ByteArrayOutputStream();
+            ObjectOutputStream out = new ObjectOutputStream(output);
+            out.writeObject(object);
+            byte[] buffer = output.toByteArray();
+            ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+            stmt3.setBinaryStream(3, in, buffer.length);
+            stmt3.executeUpdate();
+            db.commit(connection);
+        } catch (SQLException sqlEx) {
+            db.rollback(connection);
+            logger.error("unable to enque the message in persistant storage", sqlEx);
+        } catch (IOException ioEx) {
+            db.rollback(connection);
+            logger.error("unable to enque the message in persistant storage", ioEx);
+        } finally {
+            try {
+                unLockTables(connection);
+            } catch (SQLException sql) {
+                logger.error("Cannot Unlock Table", sql);
+            }
+
+            db.quietlyClose(connection, stmt, stmt2, stmt3);
+        }
+    }
+
+    private void initMessageQueueStorage() throws SQLException {
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
+        PreparedStatement stmt4 = null;
+        try {
+            connection = db.connect();
+
+            lockMaxMinTables(connection);
+
+            /*
+             * Get Max ID
+             */
+            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+            ResultSet result = stmt.executeQuery();
+            if (!result.next()) {
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+                stmt2.executeUpdate();
+            }
+
+            /*
+             * Get Min ID
+             */
+            stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+            result = stmt3.executeQuery();
+            if (!result.next()) {
+                stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+                stmt4.executeUpdate();
+            }
+            db.commit(connection);
+        } catch (SQLException sqle) {
+            db.rollback(connection);
+            throw sqle;
+        } finally {
+            try {
+                unLockTables(connection);
+            } catch (SQLException sql) {
+                logger.error("Cannot Unlock Table", sql);
+            }
+
+            db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
+        }
+    }
+
+    private Object retrive() throws SQLException, IOException, InterruptedException {
+        long wait = 1000;
+        int nextkey = -1;
+        int maxid = -2;
+        Connection connection = null;
+        PreparedStatement stmt = null;
+        PreparedStatement stmt2 = null;
+        PreparedStatement stmt3 = null;
+        ResultSet result = null;
+        while (true) {
+            try {
+                connection = db.connect();
+
+                lockMaxMinTables(connection);
+
+                /*
+                 * Get Min ID
+                 */
+                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+                result = stmt.executeQuery();
+                if (result.next()) {
+                    nextkey = result.getInt(1);
+                } else {
+                    throw new RuntimeException("Queue init has failed earlier");
+                }
+
+                /*
+                 * Get Max ID
+                 */
+                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+                result = stmt2.executeQuery();
+                if (result.next()) {
+                    maxid = result.getInt(1);
+                } else {
+                    throw new RuntimeException("Queue init has failed earlier");
+                }
+
+                /*
+                 * Update value and exit the loop
+                 */
+                if (maxid > nextkey) {
+                    stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+                    stmt3.executeUpdate();
+                    logger.debug("Update MIN ID by one");
+                    db.commit(connection);
+                    break;
+                }
+
+                db.commit(connection);
+            } catch (SQLException sql) {
+                db.rollback(connection);
+                throw sql;
+            } finally {
+                try {
+                    unLockTables(connection);
+                } catch (SQLException sql) {
+                    sql.printStackTrace();
+                    logger.error("Cannot Unlock Table", sql);
+                }
+
+                db.quietlyClose(connection, stmt, stmt2, stmt3);
+            }
+
+            /*
+             * Sleep if there is nothing to do
+             */
+            try {
+                wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
+                logger.debug("Wait=" + wait);
+                Thread.sleep(wait);
+            } catch (InterruptedException e) {
+                logger.warn("Queue is interrupted to close");
+                throw e;
+            }
+        }
+
+        /*
+         * Create Subscription Object from MIN_ID and delete data in table
+         */
+        Object resultObj = null;
+        int key = -1;
+        try {
+            connection = db.connect();
+            stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
+            result = stmt.executeQuery();
+            if (result.next()) {
+                key = result.getInt(1);
+                InputStream in = result.getAsciiStream(2);
+                ObjectInputStream s = new ObjectInputStream(in);
+                try {
+                    resultObj = s.readObject();
+                } catch (ClassNotFoundException e) {
+                    logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
+                }
+            } else {
+                throw new RuntimeException(
+                        "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
+            }
+
+            try {
+                String query = QueueContants.SQL_DELETE_STATEMENT + key;
+                stmt2 = connection.prepareStatement(query);
+                stmt2.executeUpdate();
+                db.commit(connection);
+            } catch (SQLException sqle) {
+                db.rollback(connection);
+                throw sqle;
+            }
+        } finally {
+            db.quietlyClose(connection, stmt, stmt2);
+        }
+        return resultObj;
+    }
+
+    private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
+        DatabaseType databaseType = DatabaseType.other;
+        int[] aiupdateCounts = new int[0];
+        boolean bError = false;
+        try {
+
+            con.setAutoCommit(false);
+
+            stmt.clearBatch();
+
+            int totalStatement = 0;
+
+            try {
+                databaseType = DatabaseCreator.getDatabaseType(con);
+            } catch (Exception e) {
+                logger.error("Error evaluating database type", e);
+            }
+            // add SQL statements
+            if (DatabaseType.mysql.equals(databaseType)) {
+                stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
+                totalStatement++;
+            } else if (DatabaseType.derby.equals(databaseType)) {
+                stmt.addBatch("lock table disQ in exclusive mode;");
+                totalStatement++;
+                stmt.addBatch("lock table MaxIDTable in exclusive mode;");
+                totalStatement++;
+                stmt.addBatch("lock table MinIDTable in exclusive mode;");
+                totalStatement++;
+            }
+            stmt.addBatch("Delete from disQ;");
+            totalStatement++;
+            stmt.addBatch("Delete from MaxIDTable;");
+            totalStatement++;
+            stmt.addBatch("Delete from MinIDTable;");
+            totalStatement++;
+
+            aiupdateCounts = new int[totalStatement];
+
+            // execute the statements
+            aiupdateCounts = stmt.executeBatch();
+
+        } catch (BatchUpdateException bue) {
+            bError = true;
+            aiupdateCounts = bue.getUpdateCounts();
+            logger.error("SQLException: " + bue.getMessage());
+            logger.error("SQLState:  " + bue.getSQLState());
+            logger.error("Message:  " + bue.getMessage());
+            logger.error("Vendor:  " + bue.getErrorCode());
+            logger.info("Update counts:  ");
+
+            for (int i = 0; i < aiupdateCounts.length; i++) {
+                logger.error(aiupdateCounts[i] + "   ");
+            }
+
+            SQLException SQLe = bue;
+            while (SQLe != null) {
+                SQLe = SQLe.getNextException();
+                logger.error(SQLe.getMessage(), SQLe);
+            }
+        } catch (SQLException SQLe) {
+            bError = true;
+            throw SQLe;
+        } finally {
+            // determine operation result
+            for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
+                int iProcessed = aiupdateCounts[i];
+                /**
+                 * The int values that can be returned in the update counts array are: <br/>
+                 * -3--Operation error. A driver has the option to stop at the first error and throw a
+                 * BatchUpdateException or to report the error and continue. This value is only seen in the latter case. <br/>
+                 * -2--The operation was successful, but the number of rows affected is unknown. <br/>
+                 * Zero--DDL statement or no rows affected by the operation. Greater than zero--Operation was
+                 * successful, number of rows affected by the operation.
+                 */
+                if (iProcessed < 0 && iProcessed != -2) {
+                    // error on statement
+                    logger.info("Error batch." + iProcessed);
+                    bError = true;
+                }
+            }
+
+            if (bError) {
+                con.rollback();
+            } else {
+                con.commit();
+            }
+
+            /*
+             * Unlock table after rollback and commit, since it is not automatic in MySql
+             */
+
+            if (DatabaseType.mysql.equals(databaseType)) {
+                PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
+                db.executeUpdateAndClose(prepareStmt);
+            }
+        } // end finally
+        logger.info("Queue is cleaned.");
+    }
+
+    private void lockMaxMinTables(Connection connection) throws SQLException {
+        DatabaseType databaseType = DatabaseType.other;
+        try {
+            databaseType = DatabaseCreator.getDatabaseType(connection);
+        } catch (Exception e) {
+            logger.error("Error evaluating database type", e);
+        }
+
+        /*
+         * Must turn off auto commit
+         */
+        connection.setAutoCommit(false);
+        String sql = null;
+        Statement stmt = null;
+        try {
+            switch (databaseType) {
+            case derby:
+                sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
+                String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
+                stmt = connection.createStatement();
+                stmt.addBatch(sql);
+                stmt.addBatch(sql2);
+                stmt.executeBatch();
+                break;
+            case mysql:
+                sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
+                        + " write";
+                stmt = connection.createStatement();
+                stmt.executeQuery(sql);
+                break;
+            default:
+                return;
+            }
+
+        } finally {
+            if (stmt != null && !stmt.isClosed()) {
+                stmt.close();
+            }
+        }
+    }
+
+    private void unLockTables(Connection connection) throws SQLException {
+        DatabaseType databaseType = DatabaseType.other;
+        try {
+            databaseType = DatabaseCreator.getDatabaseType(connection);
+        } catch (Exception e) {
+            logger.error("Error evaluating database type", e);
+        }
+
+        try {
+            switch (databaseType) {
+            case derby:
+                /*
+                 * Derby doesn't have explicit unlock SQL It uses commit or rollback as a unlock mechanism, so make sure
+                 * that connection is always commited or rollbacked
+                 */
+                break;
+            case mysql:
+                String sql = "unlock tables";
+                PreparedStatement stmt = null;
+                try {
+                    stmt = connection.prepareStatement(sql);
+                    stmt.executeQuery();
+                    db.commit(connection);
+                } finally {
+                    if (stmt != null) {
+                        stmt.close();
+                    }
+                }
+                break;
+            default:
+                return;
+            }
+        } finally {
+            /*
+             * Set auto commit when needed
+             */
+            if (db.isAutoCommit()) {
+                connection.setAutoCommit(true);
+            }
+        }
+    }
+
+    private static class SubscriptionConstants {
+
+        public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
+
+        public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
+
+        public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+                + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+        public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+                + " WHERE SubscriptionId= ?";
+
+        public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+        public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+                + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+        public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+                + " WHERE SubscriptionId= ?";
+
+        public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
+    }
+
+    private static class QueueContants {
+        public static final int FINAL_WAIT_IN_MILI = 5000;
+
+        public static final String TABLE_NAME = "disQ";
+
+        public static final String TABLE_NAME_MAXID = "MaxIDTable";
+
+        public static final String TABLE_NAME_MINID = "MinIDTable";
+
+        public static final int STATUS_OPEN = 0;
+
+        public static final String SQL_INSERT_STATEMENT = "INSERT INTO " + TABLE_NAME
+                + " (id, trackId, message, status) " + "VALUES (?,?,?," + STATUS_OPEN + ")";
+
+        public static String SQL_DELETE_STATEMENT = "DELETE FROM " + TABLE_NAME + " WHERE id=";
+
+        public static String SQL_SELECT_STATEMENT = "SELECT id,message FROM " + TABLE_NAME + " WHERE id=";
+
+        public static String SQL_MAX_ID_SEPERATE_TABLE = "SELECT maxID FROM " + TABLE_NAME_MAXID;
+
+        public static String SQL_MIN_ID_SEPERATE_TABLE = "SELECT minID FROM " + TABLE_NAME_MINID;
+
+        public static String SQL_MAX_ID_INSERT = "INSERT INTO " + TABLE_NAME_MAXID + " (maxID) VALUES (1)";
+
+        public static String SQL_MIN_ID_INSERT = "INSERT INTO " + TABLE_NAME_MINID + " (minID) VALUES (1)";
+
+        public static String SQL_MAX_ID_INCREMENT = "UPDATE " + TABLE_NAME_MAXID + " SET maxID = maxID+1 WHERE maxID =";
+
+        public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
new file mode 100644
index 0000000..5430b33
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+public interface WsmgQueue {
+
+    void cleanup();
+
+    void enqueue(Object object, String trackId);
+
+    Object blockingDequeue() throws InterruptedException;
+
+    void dispose();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
new file mode 100644
index 0000000..2a1d1cb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public interface WsmgStorage {
+
+    List<SubscriptionEntry> getAllSubscription();
+
+    int insert(SubscriptionState subscription);
+
+    int delete(String subscriptionId);
+
+    void dispose();
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
new file mode 100644
index 0000000..d3be422
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compare two OMElement with its namespace, attributes, children, and text. Current implementation supports ignore
+ * namespace checking i.e. if the namespace is in the list, it is skipped and return as equals.
+ */
+public class OMElementComparator {
+
+    private static final Logger log = LoggerFactory.getLogger(OMElementComparator.class);
+
+    private static List<String> ignorableNamespaceList = new ArrayList<String>();
+
+    private OMElementComparator() {
+    }
+
+    public void addIgnorableNamespace(String nsURI) {
+        ignorableNamespaceList.add(nsURI);
+    }
+
+    public void clearIgnorableNamespaces() {
+        ignorableNamespaceList.clear();
+    }
+
+    public static boolean compare(OMElement elementOne, OMElement elementTwo) {
+
+        if (isIgnorable(elementOne) || isIgnorable(elementTwo)) {
+            // ignore if the elements belong to any of the ignorable namespaces
+            // list
+            return true;
+        } else if (elementOne == null && elementTwo == null) {
+            log.debug("Both Elements are null.");
+            return true;
+        } else if (elementOne == null || elementTwo == null) {
+            log.debug("One of item to compare is null");
+            return false;
+        }
+
+        return BrokerUtil.sameStringValue(elementOne.getLocalName(), elementTwo.getLocalName())
+                && compare(elementOne.getNamespace(), elementTwo.getNamespace())
+                && compareAttibutes(elementOne, elementTwo)
+                /*
+                 * Trimming the value of the XMLElement is not correct since this compare method cannot be used to
+                 * compare element contents with trailing and leading whitespaces BUT for the practical side of tests
+                 * and to get the current tests working we have to trim() the contents
+                 */
+                && BrokerUtil.sameStringValue(elementOne.getText().trim(), elementTwo.getText().trim())
+                && compareChildren(elementOne, elementTwo);
+    }
+
+    private static boolean isIgnorable(OMElement elt) {
+        if (elt != null) {
+            OMNamespace namespace = elt.getNamespace();
+            if (namespace != null) {
+                return ignorableNamespaceList.contains(namespace.getNamespaceURI());
+            } else {
+                return false;
+            }
+        } else {
+            return false;
+        }
+    }
+
+    private static boolean compareChildren(OMElement elementOne, OMElement elementTwo) {
+        HashMap<QName, OMElement> map = new HashMap<QName, OMElement>();
+        Iterator oneIter = elementOne.getChildElements();
+        while (oneIter.hasNext()) {
+            OMElement elementOneChild = (OMElement) oneIter.next();
+            OMElement elementTwoChild = elementTwo.getFirstChildWithName(elementOneChild.getQName());
+            if (!compare(elementOneChild, elementTwoChild)) {
+                return false;
+            }
+
+            /*
+             * Cache for later access
+             */
+            map.put(elementOneChild.getQName(), elementOneChild);
+        }
+
+        /*
+         * Case the second element has more elements than the first
+         */
+        Iterator twoIter = elementTwo.getChildElements();
+        while (twoIter.hasNext()) {
+            OMElement elementTwoChild = (OMElement) twoIter.next();
+            if (!isIgnorable(elementTwoChild) && !map.containsKey(elementTwoChild.getQName())) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+    private static boolean compareAttibutes(OMElement elementOne, OMElement elementTwo) {
+        int elementOneAtribCount = 0;
+        int elementTwoAtribCount = 0;
+        Iterator oneIter = elementOne.getAllAttributes();
+        while (oneIter.hasNext()) {
+
+            /*
+             * This catches a case where the first one has more items than the second one (one.attributes.size >
+             * two.attributes.size) and a case where the first and the second have a different attributes.
+             * (one.attributes.size == two.attributes.size)
+             */
+            OMAttribute omAttribute = (OMAttribute) oneIter.next();
+            OMAttribute attr = elementTwo.getAttribute(omAttribute.getQName());
+            if (attr == null) {
+                log.debug("Attribute " + omAttribute + " is not found in both elements.");
+                return false;
+            }
+            /*
+             * Count attributes in the first item
+             */
+            elementOneAtribCount++;
+        }
+
+        /*
+         * Count attributes in the second item
+         */
+        Iterator elementTwoIter = elementTwo.getAllAttributes();
+        while (elementTwoIter.hasNext()) {
+            elementTwoIter.next();
+            elementTwoAtribCount++;
+        }
+
+        /*
+         * This catches a case where the second one has more items than the first one. (two.attributes.size >
+         * one.attributes.size)
+         */
+        log.debug("Number of Attributes are equal? : " + (elementOneAtribCount == elementTwoAtribCount));
+        return elementOneAtribCount == elementTwoAtribCount;
+    }
+
+    /*
+     * Compare only URI not prefix
+     */
+    private static boolean compare(OMNamespace x, OMNamespace y) {
+        log.debug("Compare namespace:" + x + " with " + y);
+        return (x == null && y == null)
+                || (x != null && y != null && BrokerUtil.sameStringValue(x.getNamespaceURI(), y.getNamespaceURI()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
new file mode 100644
index 0000000..06d435a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.config;
+
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
+
+public class WSMGParameter {
+
+    /**
+     * Global variable for the Out Going queue (contains message to send to subscribers)
+     */
+    public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
+
+    public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
+
+    // enable or disable the TimerThread that displays the message rate
+    public static final boolean measureMessageRate = false; // default=false
+
+    public static final boolean enableAutoCleanSubscriptions = false; // default=true
+
+    public static final boolean debugYFilter = false;
+
+    public static final boolean cleanQueueonStartUp = false; // default=true
+    public static final boolean requireSubscriptionRenew = true;
+    public static final long expirationTime = 1000 * 60 * 60 * 72; // 72 hours
+
+    public static final boolean showTrackId = false;
+    public static final String versionSetUpNote = "Added_Sub_Timeout";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
new file mode 100644
index 0000000..c12f460
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.airavata.wsmg.broker.NotificationProcessor;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
+import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+
+public class WsmgConfigurationContext {
+
+    private OutGoingQueue outgoingQueue = null;
+
+    private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
+
+    private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
+
+    private ConfigurationManager configurationManager;
+
+    private SubscriptionManager subscriptionMan;
+
+    private NotificationProcessor notificationProcessor;
+
+    private WsmgStorage storage;
+
+    private WsmgQueue queue;
+
+    public WsmgConfigurationContext() {
+        outgoingQueue = new OutGoingQueue();
+        setDirectFilter();
+    }
+
+    private void setDirectFilter() {
+        messageMatchers.add(new YFilterMessageMatcher());
+        // messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
+        // publisherRegistrationDB));
+    }
+
+    public List<AbstractMessageMatcher> getMessageMatchers() {
+        return messageMatchers;
+    }
+
+    public OutGoingQueue getOutgoingQueue() {
+        return outgoingQueue;
+    }
+
+    public ConfigurationManager getConfigurationManager() {
+        return configurationManager;
+    }
+
+    public SubscriptionManager getSubscriptionManager() {
+        return subscriptionMan;
+    }
+
+    public NotificationProcessor getNotificationProcessor() {
+        return notificationProcessor;
+    }
+
+    public void setConfigurationManager(ConfigurationManager configMan) {
+        this.configurationManager = configMan;
+    }
+
+    public void setSubscriptionManager(SubscriptionManager subMan) {
+        this.subscriptionMan = subMan;
+    }
+
+    public void setNotificationProcessor(NotificationProcessor processor) {
+        this.notificationProcessor = processor;
+    }
+
+    public WsmgStorage getStorage() {
+        return storage;
+    }
+
+    public void setStorage(WsmgStorage s) {
+        storage = s;
+    }
+
+    public WsmgQueue getQueue() {
+        return queue;
+    }
+
+    public void setQueue(WsmgQueue s) {
+        queue = s;
+    }
+
+    public ReentrantReadWriteLock getMessegeMatcherLock() {
+        return messegeMatchersLock;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
new file mode 100644
index 0000000..dbd16f6
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public abstract class AbstractMessageMatcher {
+
+    protected Map<String, String> currentMessageCache;
+
+    private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
+
+    // infer types of
+    // key and value
+    public AbstractMessageMatcher() {
+        this.currentMessageCache = new HashMap<String, String>();
+    }
+
+    public abstract void start(String carrierLocation);
+
+    // Message can be either String or XmlElement. Added XMLElement for
+    // performance consideration so that if not using queue,
+    // we don't need to serialize to String
+    // If we already serialized to String because of the using queue, we don't
+    // have to change back to XMLElement until the delivery to consumers
+
+    public abstract void populateMatches(String wsntMessageConverterClassName,
+            AdditionalMessageContent additionalMessageContent, String message, String topic,
+            List<ConsumerInfo> matchedConsumers);
+
+    public abstract int handleUnsubscribe(String subscriptionId);
+
+    public abstract void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId);
+
+    public String handleGetCurrentMessage(String topic) {
+        String currentMessage = currentMessageCache.get(topic);
+        return currentMessage;
+    }
+
+    public void readLockUnlockConsumers(boolean lock) {
+        ReadLock readlock = consumerListLock.readLock();
+        lockUnlock(readlock, lock);
+    }
+
+    public void writeLockUnlockConsumers(boolean lock) {
+        WriteLock writeLock = consumerListLock.writeLock();
+        lockUnlock(writeLock, lock);
+    }
+
+    private void lockUnlock(Lock l, boolean lock) {
+
+        if (lock) {
+            l.lock();
+        } else {
+            l.unlock();
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
new file mode 100644
index 0000000..1881968
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.berkeley.cs.db.yfilter.filter.EXfilterBasic;
+import edu.berkeley.cs.db.yfilter.filter.SystemGlobals;
+import edu.berkeley.cs.db.yfilterplus.queryparser.Query;
+import edu.berkeley.cs.db.yfilterplus.queryparser.XPQuery;
+import edu.berkeley.cs.db.yfilterplus.xmltree.XMLTree;
+
+public class YFilterInfo {
+    private static final Logger logger = LoggerFactory.getLogger(YFilterInfo.class);
+
+    private EXfilterBasic yfilter = new EXfilterBasic();
+    private HashMap<Integer, String> yFilterIdToXPath = new HashMap<Integer, String>();
+    private HashMap<Integer, Query> yFilterIdToQuery = new HashMap<Integer, Query>();
+    private HashMap<String, Integer> xPathToYFilterId = new HashMap<String, Integer>();
+    private ConsumerListManager consumerListmanager = new ConsumerListManager();
+    private int index = 0;
+    private int counter = 0;
+
+    public EXfilterBasic getYfilter() {
+        return yfilter;
+    }
+
+    public void setYfilter(EXfilterBasic yfilter) {
+        this.yfilter = yfilter;
+    }
+
+    public HashMap<Integer, String> getYFilterIdToXPath() {
+        return yFilterIdToXPath;
+    }
+
+    public void setYFilterIdToXPath(HashMap<Integer, String> filterIdToXPath) {
+        yFilterIdToXPath = filterIdToXPath;
+    }
+
+    public void addXPathQuery(String xpathExpression, String subscriptionId, SubscriptionState subscribeRequest)
+            throws RuntimeException {
+        index++;
+        counter++;
+        if (WSMGParameter.debugYFilter)
+            logger.debug("QueryExp=" + xpathExpression);
+
+        Integer yFilterIdObj = xPathToYFilterId.get(xpathExpression);
+        int yFilterId = -1;
+        if (yFilterIdObj != null) {
+            yFilterId = yFilterIdObj.intValue();
+        } else {
+            Query query = XPQuery.parseQuery(xpathExpression, index);
+            if (query == null) {
+                throw new RuntimeException("Invalid XPath expression:" + xpathExpression);
+            }
+            if (WSMGParameter.debugYFilter)
+                logger.debug("addSubscription " + xpathExpression + " query :" + query);
+            yFilterId = yfilter.addQuery(query);
+            if (WSMGParameter.debugYFilter)
+                yfilter.printQueryIndex();
+            xPathToYFilterId.put(xpathExpression, Integer.valueOf(yFilterId));
+            yFilterIdToXPath.put(new Integer(yFilterId), xpathExpression);
+            yFilterIdToQuery.put(yFilterId, query);
+        }
+        if (WSMGParameter.debugYFilter)
+            logger.debug("YFilterId=" + yFilterId);
+
+        consumerListmanager.addToConsumerList(xpathExpression, subscribeRequest, subscriptionId);
+    }
+
+    public int removeSubscription(String subscriptionId) {
+
+        String xPath = consumerListmanager.getTokenBySubscriptionId(subscriptionId);
+        int result = consumerListmanager.removeFromConsumerList(subscriptionId, xPath);
+        if (result == 0) {
+            return 0;
+        }
+        int currentConsumerCount = consumerListmanager.getConsumerListByToken(xPath).size();
+        if (currentConsumerCount == 0) {
+            Integer yFilterId = xPathToYFilterId.get(xPath);
+            Query q = yFilterIdToQuery.get(yFilterId);
+            yfilter.deleteQuery(q, q.getQueryId());
+            yFilterIdToQuery.remove(yFilterId);
+        }
+        counter--;
+        return result;
+    }
+
+    public List<ConsumerInfo> getMatchingConsumerList(String messageString) {
+        List<ConsumerInfo> matchingConsumerList = new LinkedList<ConsumerInfo>();
+        XMLTree tree = new XMLTree(new java.io.StringReader(messageString));
+        if (WSMGParameter.debugYFilter)
+            tree.print();
+        yfilter.setEventSequence(tree.getEvents());
+        yfilter.startParsing();
+
+        // print the matched queries //
+        if (SystemGlobals.hasQueries) {
+            if (WSMGParameter.debugYFilter)
+
+                yfilter.printQueryResults(System.out);
+        } else {
+            System.out.println("no match");
+            return matchingConsumerList;
+        }
+
+        Iterator<Integer> it = (Iterator<Integer>) yfilter.getMatchedQueries().iterator();
+        while (it.hasNext()) {
+            Integer qid = it.next();
+
+            String xpath = yFilterIdToXPath.get(qid);
+            ConsumerList consumerList = consumerListmanager.getConsumerListByToken(xpath);
+
+            if (consumerList != null) {// has subscription to this topic
+                matchingConsumerList.addAll(consumerList.getConsumerList());
+            }
+        }
+        yfilter.clear();
+        return matchingConsumerList;
+    }
+
+    public int getCounter() {
+        return counter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
new file mode 100644
index 0000000..0f59ec1
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YFilterMessageMatcher extends AbstractMessageMatcher {
+
+    private static final Logger logger = LoggerFactory.getLogger(YFilterMessageMatcher.class);
+
+    private OutGoingQueue outGoingQueue = null;
+
+    private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
+    private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
+
+    // used for topic only subscription, so that we don't have to create a
+    // YFilter object
+    private ConsumerListManager consumerListmanager = new ConsumerListManager();
+
+    public YFilterMessageMatcher() {
+        super();
+    }
+
+    public void start(String carrierLocation) {
+        currentMessageCache = new Hashtable<String, String>();
+    }
+
+    @Override
+    public void populateMatches(String wsntMessageConverterClassName,
+            AdditionalMessageContent additionalMessageContent, String message, String topic,
+            List<ConsumerInfo> matchedConsumers) {
+
+        assert (matchedConsumers != null);
+
+        if (WSMGParameter.debugYFilter)
+            logger.info("Message In YFilterAdapter=" + message);
+
+        // Important Get a Read Lock....
+        readLockUnlockConsumers(true);
+        try {
+
+            // 1, Topic only
+            ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
+            if (topicConsumerList != null) {// has subscription to this topic
+
+                ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
+                matchedConsumers.addAll(list);
+            }
+            // 2, wild card topic only
+            ConsumerList wildcardConsumerList = consumerListmanager
+                    .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
+            if (wildcardConsumerList != null) {// has wildcard subscriptions
+                List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
+                if (wildCardConsumerInfoList != null) {
+                    // System.out.println("ConsumerListSize2="+wildCardConsumerInfoList.size());
+                    matchedConsumers.addAll(wildCardConsumerInfoList);
+                }
+            }
+            // 3, topic with Xpath
+            YFilterInfo yfilterInfo = topicToYFilterInfo.get(topic);
+            if (yfilterInfo != null) {
+                List<ConsumerInfo> topicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
+                if (topicAndXPathConsumerInfoList != null) {
+                    // System.out.println("ConsumerListSize3="+topicAndXPathConsumerInfoList.size());
+                    matchedConsumers.addAll(topicAndXPathConsumerInfoList);
+                }
+            }
+            // 4, wild card topic with Xpath (XPath only)
+            yfilterInfo = topicToYFilterInfo.get(WsmgCommonConstants.WILDCARD_TOPIC);
+            if (yfilterInfo != null) {
+                List<ConsumerInfo> wildcardTopicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
+                if (wildcardTopicAndXPathConsumerInfoList != null) {
+                    // System.out.println("ConsumerListSize4="+wildcardTopicAndXPathConsumerInfoList.size());
+                    matchedConsumers.addAll(wildcardTopicAndXPathConsumerInfoList);
+                }
+            }
+
+        } finally {
+
+            // Release the Read Lock...
+            readLockUnlockConsumers(false);
+        }
+
+    }
+
+    public int handleUnsubscribe(String subscriptionId) {
+
+        int ret = 1;
+
+        writeLockUnlockConsumers(true);
+        try {
+            String topicExpression = subIdToTopic.get(subscriptionId);
+            if (subscriptionId.startsWith("T")) { // Topic only
+                consumerListmanager.removeFromConsumerList(subscriptionId, topicExpression);
+            } else {
+                YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
+                if (yfilterInfo != null) {
+                    yfilterInfo.removeSubscription(subscriptionId);
+                    if (yfilterInfo.getCounter() == 0) {
+                        yfilterInfo = null;
+                        topicToYFilterInfo.remove(topicExpression);
+                    }
+                } else {
+                    System.out.println("ERROR: Cannot find subscription with the subId=" + subscriptionId);
+                    ret = 0;
+                }
+            }
+        } finally {
+            writeLockUnlockConsumers(false);
+        }
+
+        return ret;
+    }
+
+    public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
+
+        // Get the write lock
+        writeLockUnlockConsumers(true);
+        try {
+
+            String topicExpression = subscribeRequest.getLocalTopic();
+            subIdToTopic.put(subscriptionId, topicExpression);
+
+            String xpathExpression = subscribeRequest.getXpathString();
+            if (xpathExpression == null || xpathExpression.length() == 0) { // Topic
+                // only
+                consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
+            } else {
+                YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
+                if (yfilterInfo == null) {
+                    yfilterInfo = new YFilterInfo();
+                    topicToYFilterInfo.put(topicExpression, yfilterInfo);
+                }
+                yfilterInfo.addXPathQuery(xpathExpression, subscriptionId, subscribeRequest);
+            }
+
+            if (outGoingQueue == null) {
+                outGoingQueue = subscribeRequest.getOutGoingQueue();
+            }
+
+        } finally {
+            // release the write lock
+            writeLockUnlockConsumers(false);
+        }
+
+        return;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
new file mode 100644
index 0000000..ba50e45
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.simpleTopic;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirectWsntMessageMatcher extends AbstractMessageMatcher {
+
+    private static final Logger logger = LoggerFactory.getLogger(DirectWsntMessageMatcher.class);
+
+    private ConsumerListManager consumerListmanager = new ConsumerListManager();
+
+    private OutGoingQueue outGoingQueue = null;
+
+    public DirectWsntMessageMatcher() {
+        super();
+    }
+
+    public void start(String carrierLocation) {
+        currentMessageCache = new Hashtable<String, String>();
+    }
+
+    public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
+
+        String topicExpression = subscribeRequest.getLocalTopic();
+        if (topicExpression == null || topicExpression.length() == 0) {
+            logger.error("ERROR:WsntAdapterConnection creation failed.");
+            return;
+        }
+
+        writeLockUnlockConsumers(true);
+
+        try {
+            consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
+            if (outGoingQueue == null) {
+                outGoingQueue = subscribeRequest.getOutGoingQueue();
+            }
+        } finally {
+            writeLockUnlockConsumers(false);
+        }
+
+        return;
+
+    }
+
+    public int handleUnsubscribe(String subscriptionId) {
+
+        int ret = 0;
+
+        writeLockUnlockConsumers(true);
+        try {
+            ret = consumerListmanager.removeFromConsumerList(subscriptionId, null);
+        } finally {
+            writeLockUnlockConsumers(false);
+        }
+
+        return ret;
+    }
+
+    @Override
+    public void populateMatches(String wsntMessageConverterClassName,
+            AdditionalMessageContent additionalMessageContent, String message, String topic,
+            List<ConsumerInfo> matchedConsumers) {
+
+        assert (matchedConsumers != null);
+
+        readLockUnlockConsumers(true);
+
+        try {
+
+            ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
+            ConsumerList wildcardConsumerList = consumerListmanager
+                    .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
+            if (topicConsumerList != null) {// has subscription to this topic
+
+                ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
+
+                matchedConsumers.addAll(list);
+            }
+            if (wildcardConsumerList != null) {// has wildcard subscriptions
+                List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
+                if (wildCardConsumerInfoList != null) {
+                    matchedConsumers.addAll(wildCardConsumerInfoList);
+                }
+            }
+
+        } finally {
+            readLockUnlockConsumers(false);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
new file mode 100644
index 0000000..c9b255f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.util.RunTimeStatistics;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerUrlManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
+
+    private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new ConcurrentHashMap<String, FailedConsumerInfo>();
+
+    private final int defaultMaxRetry;
+
+    private long expireTimeGap; // milliseconds
+
+    private Timer cleanupTimer;
+
+    public ConsumerUrlManager(ConfigurationManager config) {
+
+        defaultMaxRetry = config.getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES, 2);
+
+        // time is in milliseconds
+        expireTimeGap = 1000 * 60 * config.getConfig(WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP, 5l);
+
+        // let minimum time to be 1 minute
+        long timerThreadInterval = Math.max(expireTimeGap / 5, 1000 * 60);
+
+        cleanupTimer = new Timer("Failed consumer url handler", true);
+        cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
+    }
+
+    public void stop() {
+        logger.info("Stop ConsumerUrlManager");
+        if (this.cleanupTimer != null) {
+            this.cleanupTimer.cancel();
+        }
+        logger.info("ConsumerUrlManager Stopped");
+    }
+
+    public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,
+            AxisFault exception, AdditionalMessageContent headers) {
+        String url = consumerEndpointReference.getAddress();
+
+        RunTimeStatistics.addNewFailedDeliverTime(timeTaken);
+        RunTimeStatistics.addFailedConsumerURL(url);
+
+        if (isEligibleToBlackList(exception)) {
+
+            synchronized (failedConsumerUrls) {
+                FailedConsumerInfo info = failedConsumerUrls.get(url);
+                if (info == null) {
+                    info = new FailedConsumerInfo();
+                    failedConsumerUrls.put(url, info);
+                }
+                info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
+            }
+
+        } else {
+
+            String errorMsg = String.format("unable to deliver message: [%s] to consumer: [%s], " + "reason: [%s]",
+                    headers.toString(), url, exception.getMessage());
+
+            logger.error(errorMsg);
+        }
+    }
+
+    public void onSucessfullDelivery(EndpointReference consumerEndpointReference, long timeTaken) {
+
+        RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
+        synchronized (failedConsumerUrls) {
+
+            FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
+
+            if (info != null) {
+                logger.debug(String.format("message was delivered to " + "previously %d times failed url : %s",
+                        info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
+            }
+        }
+    }
+
+    public boolean isUnavailable(String url) {
+        synchronized (failedConsumerUrls) {
+            FailedConsumerInfo info = failedConsumerUrls.get(url);
+            return (info != null && info.isMaxRetryCountReached());
+        }
+    }
+
+    private boolean isEligibleToBlackList(AxisFault f) {
+
+        Throwable cause = f.getCause();
+
+        if (cause == null) {
+            logger.error("unknown error occured", cause);
+            return false;
+        }
+
+        /*
+         * if timeout because of the set timeout in this class In windows, timeout cause ConnectException with
+         * "Connection timed out" message
+         */
+        if (cause instanceof SocketTimeoutException || cause.getMessage().indexOf("timed out") > 0
+                || cause instanceof NoRouteToHostException) {
+            return true;
+        }
+
+        return false;
+    }
+
+    class FailedConsumerInfo {
+
+        private int numberOfTimesTried;
+        private long expiryTime;
+
+        public void incrementNumberOfTimesTried(long expireTime) {
+            numberOfTimesTried++;
+            expiryTime = expireTime;
+        }
+
+        public void decrementNumberOfTimeTried() {
+            numberOfTimesTried--;
+        }
+
+        public int getNumberOfTimesTried() {
+            return numberOfTimesTried;
+        }
+
+        public boolean isMaxRetryCountReached() {
+            return numberOfTimesTried >= defaultMaxRetry;
+        }
+
+        public long getLastAtteptExpiryTime() {
+            return expiryTime;
+        }
+
+    }
+
+    class URLCleanUpTask extends TimerTask {
+
+        @Override
+        public void run() {
+
+            logger.debug("starting to clean up black listed consumer urls");
+            long currentTime = System.currentTimeMillis();
+
+            synchronized (failedConsumerUrls) {
+                for (Entry<String, FailedConsumerInfo> entry : failedConsumerUrls.entrySet()) {
+                    FailedConsumerInfo info = entry.getValue();
+
+                    if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime() >= currentTime) {
+
+                        info.decrementNumberOfTimeTried();
+                        logger.info("decrementing number of times" + " tried for consumer url: " + entry.getKey());
+
+                    }
+                }
+            }
+
+            logger.debug("finished cleaning black listed consumer urls");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
new file mode 100644
index 0000000..92b6cfe
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.axiom.om.OMElement;
+
+public interface Deliverable {
+    void setProtocol(DeliveryProtocol protocol);
+
+    void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
new file mode 100644
index 0000000..7eb5b4f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryProcessor {
+
+    private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
+
+    private SendingStrategy strategy;
+    private Deliverable deliverable;
+
+    private boolean running;
+    private Thread t;
+
+    public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
+        this.strategy = strategy;
+        this.deliverable = deliverable;
+    }
+
+    public void start() {
+        this.running = true;
+        this.t = new Thread(new CheckingAndSending());
+        this.t.start();
+    }
+
+    public void stop() {
+        this.running = false;
+
+        if (this.t != null) {
+            this.t.interrupt();
+
+            try {
+                this.t.join();
+            } catch (InterruptedException ie) {
+                logger.error("Wait for sending thread to finish (join) is interrupted");
+            }
+        }
+
+        WSMGParameter.OUT_GOING_QUEUE.dispose();
+    }
+
+    private class CheckingAndSending implements Runnable {
+
+        public void run() {
+            strategy.init();
+            while (running) {
+                logger.debug("run - delivery thread");
+                try {
+
+                    OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+                    if (WSMGParameter.showTrackId)
+                        logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+                                + ": dequeued from outgoing queue");
+
+                    strategy.addMessageToSend(outGoingMessage, deliverable);
+
+                } catch (Exception e) {
+                    logger.warn("Unexpected_exception:");
+                }
+            }
+            logger.debug("Shutdown Strategy");
+            strategy.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
new file mode 100644
index 0000000..6764a42
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.util.Counter;
+import org.apache.airavata.wsmg.util.TimerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutGoingQueue {
+
+    private static final Logger logger = LoggerFactory.getLogger(OutGoingQueue.class);
+
+    private Counter storeToOutQueueCounter;
+
+    public OutGoingQueue() {
+        if (WSMGParameter.measureMessageRate) {
+            storeToOutQueueCounter = new Counter();
+            TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
+            new Thread(timerThread).start();
+        }
+    }
+
+    // need synchronized???
+    public void storeNotification(OutGoingMessage outGoingMessage, long messageId) {
+
+        boolean loop = false;
+        do {
+            // this outgoing Queue is created inside the messenger which is
+            // intended to send the notification message to the consumer.
+            WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
+                    .getTrackId());
+            if (WSMGParameter.measureMessageRate) {
+                storeToOutQueueCounter.addCounter();
+            }
+            if (WSMGParameter.testOutGoingQueueMaxiumLength && storeToOutQueueCounter.getCounterValue() < 1000000) {
+                loop = true;
+            }else{
+                loop = false;
+            }
+        } while (loop);
+
+    }
+}