You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/01/27 16:26:58 UTC

[04/13] airavata git commit: retiring ws-messenger and remove dependency of workflow tracking - AIRAVATA-1556, AIRAVATA-1557

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
deleted file mode 100644
index 60a9705..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.util.Counter;
-import org.apache.airavata.wsmg.util.TimerThread;
-import org.apache.axiom.om.OMElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
-    private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
-
-    /*
-     * Table name
-     */
-    private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
-    private Counter storeToDBCounter = new Counter();
-
-    private JdbcStorage db;
-
-    public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
-
-        db = new JdbcStorage(jdbcUrl, jdbcDriver);
-
-        Connection conn = null;
-        try {
-            /*
-             * Check database
-             */
-            conn = db.connect();
-            if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
-                DatabaseCreator.createMsgBrokerDatabase(conn);
-                logger.info("New Database created for Message Broker");
-            } else {
-                logger.debug("Database already created for Message Broker!");
-            }
-
-            if (WSMGParameter.measureMessageRate) {
-                TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
-                new Thread(timerThread).start();
-            }
-
-            initMessageQueueStorage();
-        } catch (Exception e) {
-            logger.error(e.getMessage(), e);
-            throw new RuntimeException("Database failure");
-        } finally {
-            db.closeConnection(conn);
-        }
-    }
-
-    public void dispose() {
-        if (db != null) {
-            db.closeAllConnections();
-        }
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
-     */
-    public List<SubscriptionEntry> getAllSubscription() {
-
-        ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
-
-        Connection conn = null;
-        PreparedStatement stmt = null;
-        try {
-
-            // get number of row first and increase the arrayList size for
-            // better performance
-            int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
-
-            conn = db.connect();
-            stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
-            ResultSet rs = stmt.executeQuery();
-            ret.ensureCapacity(size);
-
-            if (rs != null) {
-
-                /*
-                 * Buffer data
-                 */
-                int nRead;
-                byte[] buffer = new byte[1024];
-                ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-
-                while (rs.next()) {
-                    SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
-                    subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
-
-                    /*
-                     * Read Binary Stream
-                     */
-                    InputStream inStream = null;
-
-                    try {
-                        inStream = rs.getBinaryStream("content");
-                        while ((nRead = inStream.read(buffer)) != -1) {
-                            outStream.write(buffer, 0, nRead);
-                        }
-                        outStream.flush();
-
-                        subscriptionEntry.setSubscribeXml(new String(outStream.toByteArray()));
-
-                    } catch (IOException ie) {
-                        logger.error("Unable to read XML from database", ie);
-
-                        // skip this subscription entry
-                        continue;
-                    } finally {
-                        // clear all data in outputStream
-                        outStream.reset();
-
-                        // close database stream
-                        if (inStream != null) {
-                            try {
-                                inStream.close();
-                            } catch (Exception e) {
-                                logger.error("Cannot close database stream", e);
-                            }
-                        }
-                    }
-
-                    ret.add(subscriptionEntry);
-
-                }
-            }
-        } catch (SQLException ex) {
-            logger.error("sql exception occured", ex);
-        } finally {
-            db.quietlyClose(conn, stmt);
-        }
-        return ret;
-    }
-
-    public int insert(SubscriptionState subscription) {
-        String address = subscription.getConsumerReference().getAddress();
-        Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
-
-        String consumerReferenceParameters = null;
-        if (referenceParametersMap == null) {
-            consumerReferenceParameters = "";
-        } else {
-
-            StringBuffer buffer = new StringBuffer();
-
-            for (Iterator<OMElement> ite = referenceParametersMap.values().iterator(); ite.hasNext();) {
-                OMElement currentReferenceParameter = ite.next();
-
-                try {
-                    buffer.append(currentReferenceParameter.toStringWithConsume());
-                } catch (XMLStreamException se) {
-                    logger.error("unable to convert reference parameter", se);
-                }
-
-            }
-            consumerReferenceParameters = buffer.toString();
-        }
-
-        int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
-        if (subscription.isWsrmPolicy()) {
-            policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
-        }
-
-        Timestamp now = new Timestamp(System.currentTimeMillis());
-
-        int result = 0;
-        Connection connection = null;
-        PreparedStatement stmt = null;
-        try {
-
-            connection = db.connect();
-            stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
-
-            stmt.setString(1, subscription.getId());
-            stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
-                    .getSubscribeXml().getBytes().length);
-            stmt.setInt(3, policyValue);
-            stmt.setString(4, subscription.getLocalTopic());
-            stmt.setString(5, subscription.getXpathString());
-            stmt.setString(6, address);
-            stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
-                    consumerReferenceParameters.getBytes().length);
-            stmt.setTimestamp(8, now);
-            result = db.executeUpdateAndClose(stmt);
-            db.commitAndFree(connection);
-
-            storeToDBCounter.addCounter();
-
-        } catch (SQLException ex) {
-            logger.error("sql exception occured", ex);
-            db.rollbackAndFree(connection);
-        }
-        return result;
-    }
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java .lang.String)
-     */
-    public int delete(String subscriptionId) {
-        int result = 0;
-        Connection connection = null;
-        try {
-            connection = db.connect();
-            PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
-            stmt.setString(1, subscriptionId);
-            result = db.executeUpdateAndClose(stmt);
-            db.commitAndFree(connection);
-        } catch (SQLException sql) {
-            db.rollbackAndFree(connection);
-            logger.error("sql exception occured", sql);
-        }
-        return result;
-    }
-
-    public void cleanup() {
-        Connection conn = null;
-        Statement stmt = null;
-        try {
-            conn = db.connect();
-            stmt = conn.createStatement();
-            batchCleanDB(stmt, conn);
-        } catch (SQLException e) {
-            logger.error(e.getMessage(), e);
-        } finally {
-            if (db.isAutoCommit()) {
-                try {
-                    conn.setAutoCommit(true);
-                } catch (SQLException e) {
-                    logger.error(e.getMessage(), e);
-                }
-            }
-            db.quietlyClose(conn, stmt);
-        }
-    }
-
-    public Object blockingDequeue() throws InterruptedException {
-        while (true) {
-            try {
-                return retrive();
-            } catch (SQLException e) {
-                logger.error(e.getMessage(), e);
-                e.printStackTrace();
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public void enqueue(Object object, String trackId) {
-
-        // Get the Max ID cache and update and unlock the table
-        Connection connection = null;
-        PreparedStatement stmt = null;
-        PreparedStatement stmt2 = null;
-        PreparedStatement stmt3 = null;
-        try {
-            int nextkey;
-
-            connection = db.connect();
-
-            lockMaxMinTables(connection);
-
-            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-
-            ResultSet result = stmt.executeQuery();
-
-            if (result.next()) {
-                nextkey = result.getInt(1);
-
-                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
-                stmt2.executeUpdate();
-            } else {
-                throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
-            }
-            
-            /**
-             * Before executing the SQL_INSERT_STATEMENT query, we need to unlock 
-             * MaxIDTable and MinIDTable since we are going to insert data to another 
-             * table, disQ. If we do not unlock tables, insert query fails in MySQL. But 
-             * in Derby, this will execute without any issues even without unlocking 
-             * tables. Since it fails with MySQL, we need to unlock the tables 
-             * before executing the insert query.
-             */
-            try{
-            	 unLockTables(connection);
-            }catch (SQLException sql) {
-                logger.error("Cannot Unlock Table", sql);
-            }
-           
-
-            /*
-             * After update MAX_ID put data into queue table
-             */
-            stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
-            stmt3.setInt(1, nextkey);
-            stmt3.setString(2, trackId);
-
-            ByteArrayOutputStream output = new ByteArrayOutputStream();
-            ObjectOutputStream out = new ObjectOutputStream(output);
-            out.writeObject(object);
-            byte[] buffer = output.toByteArray();
-            ByteArrayInputStream in = new ByteArrayInputStream(buffer);
-            stmt3.setBinaryStream(3, in, buffer.length);
-            stmt3.executeUpdate();
-            db.commit(connection);
-        } catch (SQLException sqlEx) {
-            db.rollback(connection);
-            logger.error("unable to enque the message in persistant storage", sqlEx);
-        } catch (IOException ioEx) {
-            db.rollback(connection);
-            logger.error("unable to enque the message in persistant storage", ioEx);
-        } finally {
-            try {
-                unLockTables(connection);
-            } catch (SQLException sql) {
-                logger.error("Cannot Unlock Table", sql);
-            }
-
-            db.quietlyClose(connection, stmt, stmt2, stmt3);
-        }
-    }
-
-    private void initMessageQueueStorage() throws SQLException {
-        Connection connection = null;
-        PreparedStatement stmt = null;
-        PreparedStatement stmt2 = null;
-        PreparedStatement stmt3 = null;
-        PreparedStatement stmt4 = null;
-        try {
-            connection = db.connect();
-
-            lockMaxMinTables(connection);
-
-            /*
-             * Get Max ID
-             */
-            stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-            ResultSet result = stmt.executeQuery();
-            if (!result.next()) {
-                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
-                stmt2.executeUpdate();
-            }
-
-            /*
-             * Get Min ID
-             */
-            stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
-            result = stmt3.executeQuery();
-            if (!result.next()) {
-                stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
-                stmt4.executeUpdate();
-            }
-            db.commit(connection);
-        } catch (SQLException sqle) {
-            db.rollback(connection);
-            throw sqle;
-        } finally {
-            try {
-                unLockTables(connection);
-            } catch (SQLException sql) {
-                logger.error("Cannot Unlock Table", sql);
-            }
-
-            db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
-        }
-    }
-
-    private Object retrive() throws SQLException, IOException, InterruptedException {
-        long wait = 1000;
-        int nextkey = -1;
-        int maxid = -2;
-        Connection connection = null;
-        PreparedStatement stmt = null;
-        PreparedStatement stmt2 = null;
-        PreparedStatement stmt3 = null;
-        ResultSet result = null;
-        while (true) {
-            try {
-                connection = db.connect();
-
-                lockMaxMinTables(connection);
-
-                /*
-                 * Get Min ID
-                 */
-                stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
-                result = stmt.executeQuery();
-                if (result.next()) {
-                    nextkey = result.getInt(1);
-                } else {
-                    throw new RuntimeException("Queue init has failed earlier");
-                }
-
-                /*
-                 * Get Max ID
-                 */
-                stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-                result = stmt2.executeQuery();
-                if (result.next()) {
-                    maxid = result.getInt(1);
-                } else {
-                    throw new RuntimeException("Queue init has failed earlier");
-                }
-
-                /*
-                 * Update value and exit the loop
-                 */
-                if (maxid > nextkey) {
-                    stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
-                    stmt3.executeUpdate();
-                    logger.debug("Update MIN ID by one");
-                    db.commit(connection);
-                    break;
-                }
-
-                db.commit(connection);
-            } catch (SQLException sql) {
-                db.rollback(connection);
-                throw sql;
-            } finally {
-                try {
-                    unLockTables(connection);
-                } catch (SQLException sql) {
-                    sql.printStackTrace();
-                    logger.error("Cannot Unlock Table", sql);
-                }
-
-                db.quietlyClose(connection, stmt, stmt2, stmt3);
-            }
-
-            /*
-             * Sleep if there is nothing to do
-             */
-            try {
-                wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
-                logger.debug("Wait=" + wait);
-                Thread.sleep(wait);
-            } catch (InterruptedException e) {
-                logger.warn("Queue is interrupted to close");
-                throw e;
-            }
-        }
-
-        /*
-         * Create Subscription Object from MIN_ID and delete data in table
-         */
-        Object resultObj = null;
-        int key = -1;
-        try {
-            connection = db.connect();
-            stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
-            result = stmt.executeQuery();
-            if (result.next()) {
-                key = result.getInt(1);
-                InputStream in = result.getAsciiStream(2);
-                ObjectInputStream s = new ObjectInputStream(in);
-                try {
-                    resultObj = s.readObject();
-                } catch (ClassNotFoundException e) {
-                    logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
-                }
-            } else {
-                throw new RuntimeException(
-                        "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
-            }
-
-            try {
-                String query = QueueContants.SQL_DELETE_STATEMENT + key;
-                stmt2 = connection.prepareStatement(query);
-                stmt2.executeUpdate();
-                db.commit(connection);
-            } catch (SQLException sqle) {
-                db.rollback(connection);
-                throw sqle;
-            }
-        } finally {
-            db.quietlyClose(connection, stmt, stmt2);
-        }
-        return resultObj;
-    }
-
-    private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
-        DatabaseType databaseType = DatabaseType.other;
-        int[] aiupdateCounts = new int[0];
-        boolean bError = false;
-        try {
-
-            con.setAutoCommit(false);
-
-            stmt.clearBatch();
-
-            int totalStatement = 0;
-
-            try {
-                databaseType = DatabaseCreator.getDatabaseType(con);
-            } catch (Exception e) {
-                logger.error("Error evaluating database type", e);
-            }
-            // add SQL statements
-            if (DatabaseType.mysql.equals(databaseType)) {
-                stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
-                totalStatement++;
-            } else if (DatabaseType.derby.equals(databaseType)) {
-                stmt.addBatch("lock table disQ in exclusive mode;");
-                totalStatement++;
-                stmt.addBatch("lock table MaxIDTable in exclusive mode;");
-                totalStatement++;
-                stmt.addBatch("lock table MinIDTable in exclusive mode;");
-                totalStatement++;
-            }
-            stmt.addBatch("Delete from disQ;");
-            totalStatement++;
-            stmt.addBatch("Delete from MaxIDTable;");
-            totalStatement++;
-            stmt.addBatch("Delete from MinIDTable;");
-            totalStatement++;
-
-            aiupdateCounts = new int[totalStatement];
-
-            // execute the statements
-            aiupdateCounts = stmt.executeBatch();
-
-        } catch (BatchUpdateException bue) {
-            bError = true;
-            aiupdateCounts = bue.getUpdateCounts();
-            logger.error("SQLException: " + bue.getMessage());
-            logger.error("SQLState:  " + bue.getSQLState());
-            logger.error("Message:  " + bue.getMessage());
-            logger.error("Vendor:  " + bue.getErrorCode());
-            logger.info("Update counts:  ");
-
-            for (int i = 0; i < aiupdateCounts.length; i++) {
-                logger.error(aiupdateCounts[i] + "   ");
-            }
-
-            SQLException SQLe = bue;
-            while (SQLe != null) {
-                SQLe = SQLe.getNextException();
-                logger.error(SQLe.getMessage(), SQLe);
-            }
-        } catch (SQLException SQLe) {
-            bError = true;
-            throw SQLe;
-        } finally {
-            // determine operation result
-            for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
-                int iProcessed = aiupdateCounts[i];
-                /**
-                 * The int values that can be returned in the update counts array are: <br/>
-                 * -3--Operation error. A driver has the option to stop at the first error and throw a
-                 * BatchUpdateException or to report the error and continue. This value is only seen in the latter case. <br/>
-                 * -2--The operation was successful, but the number of rows affected is unknown. <br/>
-                 * Zero--DDL statement or no rows affected by the operation. Greater than zero--Operation was
-                 * successful, number of rows affected by the operation.
-                 */
-                if (iProcessed < 0 && iProcessed != -2) {
-                    // error on statement
-                    logger.info("Error batch." + iProcessed);
-                    bError = true;
-                }
-            }
-
-            if (bError) {
-                con.rollback();
-            } else {
-                con.commit();
-            }
-
-            /*
-             * Unlock table after rollback and commit, since it is not automatic in MySql
-             */
-
-            if (DatabaseType.mysql.equals(databaseType)) {
-                PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
-                db.executeUpdateAndClose(prepareStmt);
-            }
-        } // end finally
-        logger.info("Queue is cleaned.");
-    }
-
-    private void lockMaxMinTables(Connection connection) throws SQLException {
-        DatabaseType databaseType = DatabaseType.other;
-        try {
-            databaseType = DatabaseCreator.getDatabaseType(connection);
-        } catch (Exception e) {
-            logger.error("Error evaluating database type", e);
-        }
-
-        /*
-         * Must turn off auto commit
-         */
-        connection.setAutoCommit(false);
-        String sql = null;
-        Statement stmt = null;
-        try {
-            switch (databaseType) {
-            case derby:
-                sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
-                String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
-                stmt = connection.createStatement();
-                stmt.addBatch(sql);
-                stmt.addBatch(sql2);
-                stmt.executeBatch();
-                break;
-            case mysql:
-                sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
-                        + " write";
-                stmt = connection.createStatement();
-                stmt.executeQuery(sql);
-                break;
-            default:
-                return;
-            }
-
-        } finally {
-            if (stmt != null && !stmt.isClosed()) {
-                stmt.close();
-            }
-        }
-    }
-
-    private void unLockTables(Connection connection) throws SQLException {
-        DatabaseType databaseType = DatabaseType.other;
-        try {
-            databaseType = DatabaseCreator.getDatabaseType(connection);
-        } catch (Exception e) {
-            logger.error("Error evaluating database type", e);
-        }
-
-        try {
-            switch (databaseType) {
-            case derby:
-                /*
-                 * Derby doesn't have explicit unlock SQL It uses commit or rollback as a unlock mechanism, so make sure
-                 * that connection is always commited or rollbacked
-                 */
-                break;
-            case mysql:
-                String sql = "unlock tables";
-                PreparedStatement stmt = null;
-                try {
-                    stmt = connection.prepareStatement(sql);
-                    stmt.executeQuery();
-                    db.commit(connection);
-                } finally {
-                    if (stmt != null) {
-                        stmt.close();
-                    }
-                }
-                break;
-            default:
-                return;
-            }
-        } finally {
-            /*
-             * Set auto commit when needed
-             */
-            if (db.isAutoCommit()) {
-                connection.setAutoCommit(true);
-            }
-        }
-    }
-
-    private static class SubscriptionConstants {
-
-        public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
-
-        public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
-
-        public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
-                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
-                + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
-
-        public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
-                + " WHERE SubscriptionId= ?";
-
-        public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
-        public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
-                + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
-                + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
-
-        public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
-                + " WHERE SubscriptionId= ?";
-
-        public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
-    }
-
-    private static class QueueContants {
-        public static final int FINAL_WAIT_IN_MILI = 5000;
-
-        public static final String TABLE_NAME = "disQ";
-
-        public static final String TABLE_NAME_MAXID = "MaxIDTable";
-
-        public static final String TABLE_NAME_MINID = "MinIDTable";
-
-        public static final int STATUS_OPEN = 0;
-
-        public static final String SQL_INSERT_STATEMENT = "INSERT INTO " + TABLE_NAME
-                + " (id, trackId, message, status) " + "VALUES (?,?,?," + STATUS_OPEN + ")";
-
-        public static String SQL_DELETE_STATEMENT = "DELETE FROM " + TABLE_NAME + " WHERE id=";
-
-        public static String SQL_SELECT_STATEMENT = "SELECT id,message FROM " + TABLE_NAME + " WHERE id=";
-
-        public static String SQL_MAX_ID_SEPERATE_TABLE = "SELECT maxID FROM " + TABLE_NAME_MAXID;
-
-        public static String SQL_MIN_ID_SEPERATE_TABLE = "SELECT minID FROM " + TABLE_NAME_MINID;
-
-        public static String SQL_MAX_ID_INSERT = "INSERT INTO " + TABLE_NAME_MAXID + " (maxID) VALUES (1)";
-
-        public static String SQL_MIN_ID_INSERT = "INSERT INTO " + TABLE_NAME_MINID + " (minID) VALUES (1)";
-
-        public static String SQL_MAX_ID_INCREMENT = "UPDATE " + TABLE_NAME_MAXID + " SET maxID = maxID+1 WHERE maxID =";
-
-        public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
deleted file mode 100644
index 5430b33..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-public interface WsmgQueue {
-
-    void cleanup();
-
-    void enqueue(Object object, String trackId);
-
-    Object blockingDequeue() throws InterruptedException;
-
-    void dispose();
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
deleted file mode 100644
index 2a1d1cb..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-
-public interface WsmgStorage {
-
-    List<SubscriptionEntry> getAllSubscription();
-
-    int insert(SubscriptionState subscription);
-
-    int delete(String subscriptionId);
-
-    void dispose();
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
deleted file mode 100644
index d3be422..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNamespace;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Compare two OMElement with its namespace, attributes, children, and text. Current implementation supports ignore
- * namespace checking i.e. if the namespace is in the list, it is skipped and return as equals.
- */
-public class OMElementComparator {
-
-    private static final Logger log = LoggerFactory.getLogger(OMElementComparator.class);
-
-    private static List<String> ignorableNamespaceList = new ArrayList<String>();
-
-    private OMElementComparator() {
-    }
-
-    public void addIgnorableNamespace(String nsURI) {
-        ignorableNamespaceList.add(nsURI);
-    }
-
-    public void clearIgnorableNamespaces() {
-        ignorableNamespaceList.clear();
-    }
-
-    public static boolean compare(OMElement elementOne, OMElement elementTwo) {
-
-        if (isIgnorable(elementOne) || isIgnorable(elementTwo)) {
-            // ignore if the elements belong to any of the ignorable namespaces
-            // list
-            return true;
-        } else if (elementOne == null && elementTwo == null) {
-            log.debug("Both Elements are null.");
-            return true;
-        } else if (elementOne == null || elementTwo == null) {
-            log.debug("One of item to compare is null");
-            return false;
-        }
-
-        return BrokerUtil.sameStringValue(elementOne.getLocalName(), elementTwo.getLocalName())
-                && compare(elementOne.getNamespace(), elementTwo.getNamespace())
-                && compareAttibutes(elementOne, elementTwo)
-                /*
-                 * Trimming the value of the XMLElement is not correct since this compare method cannot be used to
-                 * compare element contents with trailing and leading whitespaces BUT for the practical side of tests
-                 * and to get the current tests working we have to trim() the contents
-                 */
-                && BrokerUtil.sameStringValue(elementOne.getText().trim(), elementTwo.getText().trim())
-                && compareChildren(elementOne, elementTwo);
-    }
-
-    private static boolean isIgnorable(OMElement elt) {
-        if (elt != null) {
-            OMNamespace namespace = elt.getNamespace();
-            if (namespace != null) {
-                return ignorableNamespaceList.contains(namespace.getNamespaceURI());
-            } else {
-                return false;
-            }
-        } else {
-            return false;
-        }
-    }
-
-    private static boolean compareChildren(OMElement elementOne, OMElement elementTwo) {
-        HashMap<QName, OMElement> map = new HashMap<QName, OMElement>();
-        Iterator oneIter = elementOne.getChildElements();
-        while (oneIter.hasNext()) {
-            OMElement elementOneChild = (OMElement) oneIter.next();
-            OMElement elementTwoChild = elementTwo.getFirstChildWithName(elementOneChild.getQName());
-            if (!compare(elementOneChild, elementTwoChild)) {
-                return false;
-            }
-
-            /*
-             * Cache for later access
-             */
-            map.put(elementOneChild.getQName(), elementOneChild);
-        }
-
-        /*
-         * Case the second element has more elements than the first
-         */
-        Iterator twoIter = elementTwo.getChildElements();
-        while (twoIter.hasNext()) {
-            OMElement elementTwoChild = (OMElement) twoIter.next();
-            if (!isIgnorable(elementTwoChild) && !map.containsKey(elementTwoChild.getQName())) {
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    private static boolean compareAttibutes(OMElement elementOne, OMElement elementTwo) {
-        int elementOneAtribCount = 0;
-        int elementTwoAtribCount = 0;
-        Iterator oneIter = elementOne.getAllAttributes();
-        while (oneIter.hasNext()) {
-
-            /*
-             * This catches a case where the first one has more items than the second one (one.attributes.size >
-             * two.attributes.size) and a case where the first and the second have a different attributes.
-             * (one.attributes.size == two.attributes.size)
-             */
-            OMAttribute omAttribute = (OMAttribute) oneIter.next();
-            OMAttribute attr = elementTwo.getAttribute(omAttribute.getQName());
-            if (attr == null) {
-                log.debug("Attribute " + omAttribute + " is not found in both elements.");
-                return false;
-            }
-            /*
-             * Count attributes in the first item
-             */
-            elementOneAtribCount++;
-        }
-
-        /*
-         * Count attributes in the second item
-         */
-        Iterator elementTwoIter = elementTwo.getAllAttributes();
-        while (elementTwoIter.hasNext()) {
-            elementTwoIter.next();
-            elementTwoAtribCount++;
-        }
-
-        /*
-         * This catches a case where the second one has more items than the first one. (two.attributes.size >
-         * one.attributes.size)
-         */
-        log.debug("Number of Attributes are equal? : " + (elementOneAtribCount == elementTwoAtribCount));
-        return elementOneAtribCount == elementTwoAtribCount;
-    }
-
-    /*
-     * Compare only URI not prefix
-     */
-    private static boolean compare(OMNamespace x, OMNamespace y) {
-        log.debug("Compare namespace:" + x + " with " + y);
-        return (x == null && y == null)
-                || (x != null && y != null && BrokerUtil.sameStringValue(x.getNamespaceURI(), y.getNamespaceURI()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
deleted file mode 100644
index 06d435a..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.config;
-
-import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
-
-public class WSMGParameter {
-
-    /**
-     * Global variable for the Out Going queue (contains message to send to subscribers)
-     */
-    public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
-
-    public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
-
-    // enable or disable the TimerThread that displays the message rate
-    public static final boolean measureMessageRate = false; // default=false
-
-    public static final boolean enableAutoCleanSubscriptions = false; // default=true
-
-    public static final boolean debugYFilter = false;
-
-    public static final boolean cleanQueueonStartUp = false; // default=true
-    public static final boolean requireSubscriptionRenew = true;
-    public static final long expirationTime = 1000 * 60 * 60 * 72; // 72 hours
-
-    public static final boolean showTrackId = false;
-    public static final String versionSetUpNote = "Added_Sub_Timeout";
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
deleted file mode 100644
index c12f460..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.config;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.airavata.wsmg.broker.NotificationProcessor;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-
-public class WsmgConfigurationContext {
-
-    private OutGoingQueue outgoingQueue = null;
-
-    private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
-
-    private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
-
-    private ConfigurationManager configurationManager;
-
-    private SubscriptionManager subscriptionMan;
-
-    private NotificationProcessor notificationProcessor;
-
-    private WsmgStorage storage;
-
-    private WsmgQueue queue;
-
-    public WsmgConfigurationContext() {
-        outgoingQueue = new OutGoingQueue();
-        setDirectFilter();
-    }
-
-    private void setDirectFilter() {
-        messageMatchers.add(new YFilterMessageMatcher());
-        // messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
-        // publisherRegistrationDB));
-    }
-
-    public List<AbstractMessageMatcher> getMessageMatchers() {
-        return messageMatchers;
-    }
-
-    public OutGoingQueue getOutgoingQueue() {
-        return outgoingQueue;
-    }
-
-    public ConfigurationManager getConfigurationManager() {
-        return configurationManager;
-    }
-
-    public SubscriptionManager getSubscriptionManager() {
-        return subscriptionMan;
-    }
-
-    public NotificationProcessor getNotificationProcessor() {
-        return notificationProcessor;
-    }
-
-    public void setConfigurationManager(ConfigurationManager configMan) {
-        this.configurationManager = configMan;
-    }
-
-    public void setSubscriptionManager(SubscriptionManager subMan) {
-        this.subscriptionMan = subMan;
-    }
-
-    public void setNotificationProcessor(NotificationProcessor processor) {
-        this.notificationProcessor = processor;
-    }
-
-    public WsmgStorage getStorage() {
-        return storage;
-    }
-
-    public void setStorage(WsmgStorage s) {
-        storage = s;
-    }
-
-    public WsmgQueue getQueue() {
-        return queue;
-    }
-
-    public void setQueue(WsmgQueue s) {
-        queue = s;
-    }
-
-    public ReentrantReadWriteLock getMessegeMatcherLock() {
-        return messegeMatchersLock;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
deleted file mode 100644
index dbd16f6..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-
-public abstract class AbstractMessageMatcher {
-
-    protected Map<String, String> currentMessageCache;
-
-    private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
-
-    // infer types of
-    // key and value
-    public AbstractMessageMatcher() {
-        this.currentMessageCache = new HashMap<String, String>();
-    }
-
-    public abstract void start(String carrierLocation);
-
-    // Message can be either String or XmlElement. Added XMLElement for
-    // performance consideration so that if not using queue,
-    // we don't need to serialize to String
-    // If we already serialized to String because of the using queue, we don't
-    // have to change back to XMLElement until the delivery to consumers
-
-    public abstract void populateMatches(String wsntMessageConverterClassName,
-            AdditionalMessageContent additionalMessageContent, String message, String topic,
-            List<ConsumerInfo> matchedConsumers);
-
-    public abstract int handleUnsubscribe(String subscriptionId);
-
-    public abstract void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId);
-
-    public String handleGetCurrentMessage(String topic) {
-        String currentMessage = currentMessageCache.get(topic);
-        return currentMessage;
-    }
-
-    public void readLockUnlockConsumers(boolean lock) {
-        ReadLock readlock = consumerListLock.readLock();
-        lockUnlock(readlock, lock);
-    }
-
-    public void writeLockUnlockConsumers(boolean lock) {
-        WriteLock writeLock = consumerListLock.writeLock();
-        lockUnlock(writeLock, lock);
-    }
-
-    private void lockUnlock(Lock l, boolean lock) {
-
-        if (lock) {
-            l.lock();
-        } else {
-            l.unlock();
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
deleted file mode 100644
index 1881968..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.XPath;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.berkeley.cs.db.yfilter.filter.EXfilterBasic;
-import edu.berkeley.cs.db.yfilter.filter.SystemGlobals;
-import edu.berkeley.cs.db.yfilterplus.queryparser.Query;
-import edu.berkeley.cs.db.yfilterplus.queryparser.XPQuery;
-import edu.berkeley.cs.db.yfilterplus.xmltree.XMLTree;
-
-public class YFilterInfo {
-    private static final Logger logger = LoggerFactory.getLogger(YFilterInfo.class);
-
-    private EXfilterBasic yfilter = new EXfilterBasic();
-    private HashMap<Integer, String> yFilterIdToXPath = new HashMap<Integer, String>();
-    private HashMap<Integer, Query> yFilterIdToQuery = new HashMap<Integer, Query>();
-    private HashMap<String, Integer> xPathToYFilterId = new HashMap<String, Integer>();
-    private ConsumerListManager consumerListmanager = new ConsumerListManager();
-    private int index = 0;
-    private int counter = 0;
-
-    public EXfilterBasic getYfilter() {
-        return yfilter;
-    }
-
-    public void setYfilter(EXfilterBasic yfilter) {
-        this.yfilter = yfilter;
-    }
-
-    public HashMap<Integer, String> getYFilterIdToXPath() {
-        return yFilterIdToXPath;
-    }
-
-    public void setYFilterIdToXPath(HashMap<Integer, String> filterIdToXPath) {
-        yFilterIdToXPath = filterIdToXPath;
-    }
-
-    public void addXPathQuery(String xpathExpression, String subscriptionId, SubscriptionState subscribeRequest)
-            throws RuntimeException {
-        index++;
-        counter++;
-        if (WSMGParameter.debugYFilter)
-            logger.debug("QueryExp=" + xpathExpression);
-
-        Integer yFilterIdObj = xPathToYFilterId.get(xpathExpression);
-        int yFilterId = -1;
-        if (yFilterIdObj != null) {
-            yFilterId = yFilterIdObj.intValue();
-        } else {
-            Query query = XPQuery.parseQuery(xpathExpression, index);
-            if (query == null) {
-                throw new RuntimeException("Invalid XPath expression:" + xpathExpression);
-            }
-            if (WSMGParameter.debugYFilter)
-                logger.debug("addSubscription " + xpathExpression + " query :" + query);
-            yFilterId = yfilter.addQuery(query);
-            if (WSMGParameter.debugYFilter)
-                yfilter.printQueryIndex();
-            xPathToYFilterId.put(xpathExpression, Integer.valueOf(yFilterId));
-            yFilterIdToXPath.put(new Integer(yFilterId), xpathExpression);
-            yFilterIdToQuery.put(yFilterId, query);
-        }
-        if (WSMGParameter.debugYFilter)
-            logger.debug("YFilterId=" + yFilterId);
-
-        consumerListmanager.addToConsumerList(xpathExpression, subscribeRequest, subscriptionId);
-    }
-
-    public int removeSubscription(String subscriptionId) {
-
-        String xPath = consumerListmanager.getTokenBySubscriptionId(subscriptionId);
-        int result = consumerListmanager.removeFromConsumerList(subscriptionId, xPath);
-        if (result == 0) {
-            return 0;
-        }
-        int currentConsumerCount = consumerListmanager.getConsumerListByToken(xPath).size();
-        if (currentConsumerCount == 0) {
-            Integer yFilterId = xPathToYFilterId.get(xPath);
-            Query q = yFilterIdToQuery.get(yFilterId);
-            yfilter.deleteQuery(q, q.getQueryId());
-            yFilterIdToQuery.remove(yFilterId);
-        }
-        counter--;
-        return result;
-    }
-
-    public List<ConsumerInfo> getMatchingConsumerList(String messageString) {
-        List<ConsumerInfo> matchingConsumerList = new LinkedList<ConsumerInfo>();
-        XMLTree tree = new XMLTree(new java.io.StringReader(messageString));
-        if (WSMGParameter.debugYFilter)
-            tree.print();
-        yfilter.setEventSequence(tree.getEvents());
-        yfilter.startParsing();
-
-        // print the matched queries //
-        if (SystemGlobals.hasQueries) {
-            if (WSMGParameter.debugYFilter)
-
-                yfilter.printQueryResults(System.out);
-        } else {
-            System.out.println("no match");
-            return matchingConsumerList;
-        }
-
-        Iterator<Integer> it = (Iterator<Integer>) yfilter.getMatchedQueries().iterator();
-        while (it.hasNext()) {
-            Integer qid = it.next();
-
-            String xpath = yFilterIdToXPath.get(qid);
-            ConsumerList consumerList = consumerListmanager.getConsumerListByToken(xpath);
-
-            if (consumerList != null) {// has subscription to this topic
-                matchingConsumerList.addAll(consumerList.getConsumerList());
-            }
-        }
-        yfilter.clear();
-        return matchingConsumerList;
-    }
-
-    public int getCounter() {
-        return counter;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
deleted file mode 100644
index 0f59ec1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.XPath;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class YFilterMessageMatcher extends AbstractMessageMatcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(YFilterMessageMatcher.class);
-
-    private OutGoingQueue outGoingQueue = null;
-
-    private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
-    private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
-
-    // used for topic only subscription, so that we don't have to create a
-    // YFilter object
-    private ConsumerListManager consumerListmanager = new ConsumerListManager();
-
-    public YFilterMessageMatcher() {
-        super();
-    }
-
-    public void start(String carrierLocation) {
-        currentMessageCache = new Hashtable<String, String>();
-    }
-
-    @Override
-    public void populateMatches(String wsntMessageConverterClassName,
-            AdditionalMessageContent additionalMessageContent, String message, String topic,
-            List<ConsumerInfo> matchedConsumers) {
-
-        assert (matchedConsumers != null);
-
-        if (WSMGParameter.debugYFilter)
-            logger.info("Message In YFilterAdapter=" + message);
-
-        // Important Get a Read Lock....
-        readLockUnlockConsumers(true);
-        try {
-
-            // 1, Topic only
-            ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
-            if (topicConsumerList != null) {// has subscription to this topic
-
-                ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
-                matchedConsumers.addAll(list);
-            }
-            // 2, wild card topic only
-            ConsumerList wildcardConsumerList = consumerListmanager
-                    .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
-            if (wildcardConsumerList != null) {// has wildcard subscriptions
-                List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
-                if (wildCardConsumerInfoList != null) {
-                    // System.out.println("ConsumerListSize2="+wildCardConsumerInfoList.size());
-                    matchedConsumers.addAll(wildCardConsumerInfoList);
-                }
-            }
-            // 3, topic with Xpath
-            YFilterInfo yfilterInfo = topicToYFilterInfo.get(topic);
-            if (yfilterInfo != null) {
-                List<ConsumerInfo> topicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
-                if (topicAndXPathConsumerInfoList != null) {
-                    // System.out.println("ConsumerListSize3="+topicAndXPathConsumerInfoList.size());
-                    matchedConsumers.addAll(topicAndXPathConsumerInfoList);
-                }
-            }
-            // 4, wild card topic with Xpath (XPath only)
-            yfilterInfo = topicToYFilterInfo.get(WsmgCommonConstants.WILDCARD_TOPIC);
-            if (yfilterInfo != null) {
-                List<ConsumerInfo> wildcardTopicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
-                if (wildcardTopicAndXPathConsumerInfoList != null) {
-                    // System.out.println("ConsumerListSize4="+wildcardTopicAndXPathConsumerInfoList.size());
-                    matchedConsumers.addAll(wildcardTopicAndXPathConsumerInfoList);
-                }
-            }
-
-        } finally {
-
-            // Release the Read Lock...
-            readLockUnlockConsumers(false);
-        }
-
-    }
-
-    public int handleUnsubscribe(String subscriptionId) {
-
-        int ret = 1;
-
-        writeLockUnlockConsumers(true);
-        try {
-            String topicExpression = subIdToTopic.get(subscriptionId);
-            if (subscriptionId.startsWith("T")) { // Topic only
-                consumerListmanager.removeFromConsumerList(subscriptionId, topicExpression);
-            } else {
-                YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
-                if (yfilterInfo != null) {
-                    yfilterInfo.removeSubscription(subscriptionId);
-                    if (yfilterInfo.getCounter() == 0) {
-                        yfilterInfo = null;
-                        topicToYFilterInfo.remove(topicExpression);
-                    }
-                } else {
-                    System.out.println("ERROR: Cannot find subscription with the subId=" + subscriptionId);
-                    ret = 0;
-                }
-            }
-        } finally {
-            writeLockUnlockConsumers(false);
-        }
-
-        return ret;
-    }
-
-    public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
-
-        // Get the write lock
-        writeLockUnlockConsumers(true);
-        try {
-
-            String topicExpression = subscribeRequest.getLocalTopic();
-            subIdToTopic.put(subscriptionId, topicExpression);
-
-            String xpathExpression = subscribeRequest.getXpathString();
-            if (xpathExpression == null || xpathExpression.length() == 0) { // Topic
-                // only
-                consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
-            } else {
-                YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
-                if (yfilterInfo == null) {
-                    yfilterInfo = new YFilterInfo();
-                    topicToYFilterInfo.put(topicExpression, yfilterInfo);
-                }
-                yfilterInfo.addXPathQuery(xpathExpression, subscriptionId, subscribeRequest);
-            }
-
-            if (outGoingQueue == null) {
-                outGoingQueue = subscribeRequest.getOutGoingQueue();
-            }
-
-        } finally {
-            // release the write lock
-            writeLockUnlockConsumers(false);
-        }
-
-        return;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
deleted file mode 100644
index ba50e45..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.simpleTopic;
-
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectWsntMessageMatcher extends AbstractMessageMatcher {
-
-    private static final Logger logger = LoggerFactory.getLogger(DirectWsntMessageMatcher.class);
-
-    private ConsumerListManager consumerListmanager = new ConsumerListManager();
-
-    private OutGoingQueue outGoingQueue = null;
-
-    public DirectWsntMessageMatcher() {
-        super();
-    }
-
-    public void start(String carrierLocation) {
-        currentMessageCache = new Hashtable<String, String>();
-    }
-
-    public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
-
-        String topicExpression = subscribeRequest.getLocalTopic();
-        if (topicExpression == null || topicExpression.length() == 0) {
-            logger.error("ERROR:WsntAdapterConnection creation failed.");
-            return;
-        }
-
-        writeLockUnlockConsumers(true);
-
-        try {
-            consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
-            if (outGoingQueue == null) {
-                outGoingQueue = subscribeRequest.getOutGoingQueue();
-            }
-        } finally {
-            writeLockUnlockConsumers(false);
-        }
-
-        return;
-
-    }
-
-    public int handleUnsubscribe(String subscriptionId) {
-
-        int ret = 0;
-
-        writeLockUnlockConsumers(true);
-        try {
-            ret = consumerListmanager.removeFromConsumerList(subscriptionId, null);
-        } finally {
-            writeLockUnlockConsumers(false);
-        }
-
-        return ret;
-    }
-
-    @Override
-    public void populateMatches(String wsntMessageConverterClassName,
-            AdditionalMessageContent additionalMessageContent, String message, String topic,
-            List<ConsumerInfo> matchedConsumers) {
-
-        assert (matchedConsumers != null);
-
-        readLockUnlockConsumers(true);
-
-        try {
-
-            ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
-            ConsumerList wildcardConsumerList = consumerListmanager
-                    .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
-            if (topicConsumerList != null) {// has subscription to this topic
-
-                ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
-
-                matchedConsumers.addAll(list);
-            }
-            if (wildcardConsumerList != null) {// has wildcard subscriptions
-                List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
-                if (wildCardConsumerInfoList != null) {
-                    matchedConsumers.addAll(wildCardConsumerInfoList);
-                }
-            }
-
-        } finally {
-            readLockUnlockConsumers(false);
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
deleted file mode 100644
index c9b255f..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.messenger;
-
-import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerUrlManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
-
-    private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new ConcurrentHashMap<String, FailedConsumerInfo>();
-
-    private final int defaultMaxRetry;
-
-    private long expireTimeGap; // milliseconds
-
-    private Timer cleanupTimer;
-
-    public ConsumerUrlManager(ConfigurationManager config) {
-
-        defaultMaxRetry = config.getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES, 2);
-
-        // time is in milliseconds
-        expireTimeGap = 1000 * 60 * config.getConfig(WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP, 5l);
-
-        // let minimum time to be 1 minute
-        long timerThreadInterval = Math.max(expireTimeGap / 5, 1000 * 60);
-
-        cleanupTimer = new Timer("Failed consumer url handler", true);
-        cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
-    }
-
-    public void stop() {
-        logger.info("Stop ConsumerUrlManager");
-        if (this.cleanupTimer != null) {
-            this.cleanupTimer.cancel();
-        }
-        logger.info("ConsumerUrlManager Stopped");
-    }
-
-    public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,
-            AxisFault exception, AdditionalMessageContent headers) {
-        String url = consumerEndpointReference.getAddress();
-
-        RunTimeStatistics.addNewFailedDeliverTime(timeTaken);
-        RunTimeStatistics.addFailedConsumerURL(url);
-
-        if (isEligibleToBlackList(exception)) {
-
-            synchronized (failedConsumerUrls) {
-                FailedConsumerInfo info = failedConsumerUrls.get(url);
-                if (info == null) {
-                    info = new FailedConsumerInfo();
-                    failedConsumerUrls.put(url, info);
-                }
-                info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
-            }
-
-        } else {
-
-            String errorMsg = String.format("unable to deliver message: [%s] to consumer: [%s], " + "reason: [%s]",
-                    headers.toString(), url, exception.getMessage());
-
-            logger.error(errorMsg);
-        }
-    }
-
-    public void onSucessfullDelivery(EndpointReference consumerEndpointReference, long timeTaken) {
-
-        RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
-        synchronized (failedConsumerUrls) {
-
-            FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
-
-            if (info != null) {
-                logger.debug(String.format("message was delivered to " + "previously %d times failed url : %s",
-                        info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
-            }
-        }
-    }
-
-    public boolean isUnavailable(String url) {
-        synchronized (failedConsumerUrls) {
-            FailedConsumerInfo info = failedConsumerUrls.get(url);
-            return (info != null && info.isMaxRetryCountReached());
-        }
-    }
-
-    private boolean isEligibleToBlackList(AxisFault f) {
-
-        Throwable cause = f.getCause();
-
-        if (cause == null) {
-            logger.error("unknown error occured", cause);
-            return false;
-        }
-
-        /*
-         * if timeout because of the set timeout in this class In windows, timeout cause ConnectException with
-         * "Connection timed out" message
-         */
-        if (cause instanceof SocketTimeoutException || cause.getMessage().indexOf("timed out") > 0
-                || cause instanceof NoRouteToHostException) {
-            return true;
-        }
-
-        return false;
-    }
-
-    class FailedConsumerInfo {
-
-        private int numberOfTimesTried;
-        private long expiryTime;
-
-        public void incrementNumberOfTimesTried(long expireTime) {
-            numberOfTimesTried++;
-            expiryTime = expireTime;
-        }
-
-        public void decrementNumberOfTimeTried() {
-            numberOfTimesTried--;
-        }
-
-        public int getNumberOfTimesTried() {
-            return numberOfTimesTried;
-        }
-
-        public boolean isMaxRetryCountReached() {
-            return numberOfTimesTried >= defaultMaxRetry;
-        }
-
-        public long getLastAtteptExpiryTime() {
-            return expiryTime;
-        }
-
-    }
-
-    class URLCleanUpTask extends TimerTask {
-
-        @Override
-        public void run() {
-
-            logger.debug("starting to clean up black listed consumer urls");
-            long currentTime = System.currentTimeMillis();
-
-            synchronized (failedConsumerUrls) {
-                for (Entry<String, FailedConsumerInfo> entry : failedConsumerUrls.entrySet()) {
-                    FailedConsumerInfo info = entry.getValue();
-
-                    if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime() >= currentTime) {
-
-                        info.decrementNumberOfTimeTried();
-                        logger.info("decrementing number of times" + " tried for consumer url: " + entry.getKey());
-
-                    }
-                }
-            }
-
-            logger.debug("finished cleaning black listed consumer urls");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
deleted file mode 100644
index 92b6cfe..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
-import org.apache.axiom.om.OMElement;
-
-public interface Deliverable {
-    void setProtocol(DeliveryProtocol protocol);
-
-    void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
deleted file mode 100644
index 7eb5b4f..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DeliveryProcessor {
-
-    private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
-
-    private SendingStrategy strategy;
-    private Deliverable deliverable;
-
-    private boolean running;
-    private Thread t;
-
-    public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
-        this.strategy = strategy;
-        this.deliverable = deliverable;
-    }
-
-    public void start() {
-        this.running = true;
-        this.t = new Thread(new CheckingAndSending());
-        this.t.start();
-    }
-
-    public void stop() {
-        this.running = false;
-
-        if (this.t != null) {
-            this.t.interrupt();
-
-            try {
-                this.t.join();
-            } catch (InterruptedException ie) {
-                logger.error("Wait for sending thread to finish (join) is interrupted");
-            }
-        }
-
-        WSMGParameter.OUT_GOING_QUEUE.dispose();
-    }
-
-    private class CheckingAndSending implements Runnable {
-
-        public void run() {
-            strategy.init();
-            while (running) {
-                logger.debug("run - delivery thread");
-                try {
-
-                    OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
-                    if (WSMGParameter.showTrackId)
-                        logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
-                                + ": dequeued from outgoing queue");
-
-                    strategy.addMessageToSend(outGoingMessage, deliverable);
-
-                } catch (Exception e) {
-                    logger.warn("Unexpected_exception:");
-                }
-            }
-            logger.debug("Shutdown Strategy");
-            strategy.shutdown();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
deleted file mode 100644
index 6764a42..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.util.Counter;
-import org.apache.airavata.wsmg.util.TimerThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutGoingQueue {
-
-    private static final Logger logger = LoggerFactory.getLogger(OutGoingQueue.class);
-
-    private Counter storeToOutQueueCounter;
-
-    public OutGoingQueue() {
-        if (WSMGParameter.measureMessageRate) {
-            storeToOutQueueCounter = new Counter();
-            TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
-            new Thread(timerThread).start();
-        }
-    }
-
-    // need synchronized???
-    public void storeNotification(OutGoingMessage outGoingMessage, long messageId) {
-
-        boolean loop = false;
-        do {
-            // this outgoing Queue is created inside the messenger which is
-            // intended to send the notification message to the consumer.
-            WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
-                    .getTrackId());
-            if (WSMGParameter.measureMessageRate) {
-                storeToOutQueueCounter.addCounter();
-            }
-            if (WSMGParameter.testOutGoingQueueMaxiumLength && storeToOutQueueCounter.getCounterValue() < 1000000) {
-                loop = true;
-            }else{
-                loop = false;
-            }
-        } while (loop);
-
-    }
-}