You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/01/27 16:26:58 UTC
[04/13] airavata git commit: retiring ws-messenger and remove
dependency of workflow tracking - AIRAVATA-1556, AIRAVATA-1557
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
deleted file mode 100644
index 60a9705..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
+++ /dev/null
@@ -1,773 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.sql.BatchUpdateException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import javax.xml.namespace.QName;
-import javax.xml.stream.XMLStreamException;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.util.Counter;
-import org.apache.airavata.wsmg.util.TimerThread;
-import org.apache.axiom.om.OMElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
- private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
-
- /*
- * Table name
- */
- private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
- private Counter storeToDBCounter = new Counter();
-
- private JdbcStorage db;
-
- public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
-
- db = new JdbcStorage(jdbcUrl, jdbcDriver);
-
- Connection conn = null;
- try {
- /*
- * Check database
- */
- conn = db.connect();
- if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
- DatabaseCreator.createMsgBrokerDatabase(conn);
- logger.info("New Database created for Message Broker");
- } else {
- logger.debug("Database already created for Message Broker!");
- }
-
- if (WSMGParameter.measureMessageRate) {
- TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
- new Thread(timerThread).start();
- }
-
- initMessageQueueStorage();
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
- throw new RuntimeException("Database failure");
- } finally {
- db.closeConnection(conn);
- }
- }
-
- public void dispose() {
- if (db != null) {
- db.closeAllConnections();
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
- */
- public List<SubscriptionEntry> getAllSubscription() {
-
- ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
-
- Connection conn = null;
- PreparedStatement stmt = null;
- try {
-
- // get number of row first and increase the arrayList size for
- // better performance
- int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
-
- conn = db.connect();
- stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
- ResultSet rs = stmt.executeQuery();
- ret.ensureCapacity(size);
-
- if (rs != null) {
-
- /*
- * Buffer data
- */
- int nRead;
- byte[] buffer = new byte[1024];
- ByteArrayOutputStream outStream = new ByteArrayOutputStream();
-
- while (rs.next()) {
- SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
- subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
-
- /*
- * Read Binary Stream
- */
- InputStream inStream = null;
-
- try {
- inStream = rs.getBinaryStream("content");
- while ((nRead = inStream.read(buffer)) != -1) {
- outStream.write(buffer, 0, nRead);
- }
- outStream.flush();
-
- subscriptionEntry.setSubscribeXml(new String(outStream.toByteArray()));
-
- } catch (IOException ie) {
- logger.error("Unable to read XML from database", ie);
-
- // skip this subscription entry
- continue;
- } finally {
- // clear all data in outputStream
- outStream.reset();
-
- // close database stream
- if (inStream != null) {
- try {
- inStream.close();
- } catch (Exception e) {
- logger.error("Cannot close database stream", e);
- }
- }
- }
-
- ret.add(subscriptionEntry);
-
- }
- }
- } catch (SQLException ex) {
- logger.error("sql exception occured", ex);
- } finally {
- db.quietlyClose(conn, stmt);
- }
- return ret;
- }
-
- public int insert(SubscriptionState subscription) {
- String address = subscription.getConsumerReference().getAddress();
- Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
-
- String consumerReferenceParameters = null;
- if (referenceParametersMap == null) {
- consumerReferenceParameters = "";
- } else {
-
- StringBuffer buffer = new StringBuffer();
-
- for (Iterator<OMElement> ite = referenceParametersMap.values().iterator(); ite.hasNext();) {
- OMElement currentReferenceParameter = ite.next();
-
- try {
- buffer.append(currentReferenceParameter.toStringWithConsume());
- } catch (XMLStreamException se) {
- logger.error("unable to convert reference parameter", se);
- }
-
- }
- consumerReferenceParameters = buffer.toString();
- }
-
- int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
- if (subscription.isWsrmPolicy()) {
- policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
- }
-
- Timestamp now = new Timestamp(System.currentTimeMillis());
-
- int result = 0;
- Connection connection = null;
- PreparedStatement stmt = null;
- try {
-
- connection = db.connect();
- stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
-
- stmt.setString(1, subscription.getId());
- stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
- .getSubscribeXml().getBytes().length);
- stmt.setInt(3, policyValue);
- stmt.setString(4, subscription.getLocalTopic());
- stmt.setString(5, subscription.getXpathString());
- stmt.setString(6, address);
- stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
- consumerReferenceParameters.getBytes().length);
- stmt.setTimestamp(8, now);
- result = db.executeUpdateAndClose(stmt);
- db.commitAndFree(connection);
-
- storeToDBCounter.addCounter();
-
- } catch (SQLException ex) {
- logger.error("sql exception occured", ex);
- db.rollbackAndFree(connection);
- }
- return result;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java .lang.String)
- */
- public int delete(String subscriptionId) {
- int result = 0;
- Connection connection = null;
- try {
- connection = db.connect();
- PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
- stmt.setString(1, subscriptionId);
- result = db.executeUpdateAndClose(stmt);
- db.commitAndFree(connection);
- } catch (SQLException sql) {
- db.rollbackAndFree(connection);
- logger.error("sql exception occured", sql);
- }
- return result;
- }
-
- public void cleanup() {
- Connection conn = null;
- Statement stmt = null;
- try {
- conn = db.connect();
- stmt = conn.createStatement();
- batchCleanDB(stmt, conn);
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- } finally {
- if (db.isAutoCommit()) {
- try {
- conn.setAutoCommit(true);
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- }
- }
- db.quietlyClose(conn, stmt);
- }
- }
-
- public Object blockingDequeue() throws InterruptedException {
- while (true) {
- try {
- return retrive();
- } catch (SQLException e) {
- logger.error(e.getMessage(), e);
- e.printStackTrace();
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- e.printStackTrace();
- }
- }
- }
-
- public void enqueue(Object object, String trackId) {
-
- // Get the Max ID cache and update and unlock the table
- Connection connection = null;
- PreparedStatement stmt = null;
- PreparedStatement stmt2 = null;
- PreparedStatement stmt3 = null;
- try {
- int nextkey;
-
- connection = db.connect();
-
- lockMaxMinTables(connection);
-
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
-
- ResultSet result = stmt.executeQuery();
-
- if (result.next()) {
- nextkey = result.getInt(1);
-
- stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
- stmt2.executeUpdate();
- } else {
- throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
- }
-
- /**
- * Before executing the SQL_INSERT_STATEMENT query, we need to unlock
- * MaxIDTable and MinIDTable since we are going to insert data to another
- * table, disQ. If we do not unlock tables, insert query fails in MySQL. But
- * in Derby, this will execute without any issues even without unlocking
- * tables. Since it fails with MySQL, we need to unlock the tables
- * before executing the insert query.
- */
- try{
- unLockTables(connection);
- }catch (SQLException sql) {
- logger.error("Cannot Unlock Table", sql);
- }
-
-
- /*
- * After update MAX_ID put data into queue table
- */
- stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
- stmt3.setInt(1, nextkey);
- stmt3.setString(2, trackId);
-
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(output);
- out.writeObject(object);
- byte[] buffer = output.toByteArray();
- ByteArrayInputStream in = new ByteArrayInputStream(buffer);
- stmt3.setBinaryStream(3, in, buffer.length);
- stmt3.executeUpdate();
- db.commit(connection);
- } catch (SQLException sqlEx) {
- db.rollback(connection);
- logger.error("unable to enque the message in persistant storage", sqlEx);
- } catch (IOException ioEx) {
- db.rollback(connection);
- logger.error("unable to enque the message in persistant storage", ioEx);
- } finally {
- try {
- unLockTables(connection);
- } catch (SQLException sql) {
- logger.error("Cannot Unlock Table", sql);
- }
-
- db.quietlyClose(connection, stmt, stmt2, stmt3);
- }
- }
-
- private void initMessageQueueStorage() throws SQLException {
- Connection connection = null;
- PreparedStatement stmt = null;
- PreparedStatement stmt2 = null;
- PreparedStatement stmt3 = null;
- PreparedStatement stmt4 = null;
- try {
- connection = db.connect();
-
- lockMaxMinTables(connection);
-
- /*
- * Get Max ID
- */
- stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
- ResultSet result = stmt.executeQuery();
- if (!result.next()) {
- stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
- stmt2.executeUpdate();
- }
-
- /*
- * Get Min ID
- */
- stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
- result = stmt3.executeQuery();
- if (!result.next()) {
- stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
- stmt4.executeUpdate();
- }
- db.commit(connection);
- } catch (SQLException sqle) {
- db.rollback(connection);
- throw sqle;
- } finally {
- try {
- unLockTables(connection);
- } catch (SQLException sql) {
- logger.error("Cannot Unlock Table", sql);
- }
-
- db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
- }
- }
-
- private Object retrive() throws SQLException, IOException, InterruptedException {
- long wait = 1000;
- int nextkey = -1;
- int maxid = -2;
- Connection connection = null;
- PreparedStatement stmt = null;
- PreparedStatement stmt2 = null;
- PreparedStatement stmt3 = null;
- ResultSet result = null;
- while (true) {
- try {
- connection = db.connect();
-
- lockMaxMinTables(connection);
-
- /*
- * Get Min ID
- */
- stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
- result = stmt.executeQuery();
- if (result.next()) {
- nextkey = result.getInt(1);
- } else {
- throw new RuntimeException("Queue init has failed earlier");
- }
-
- /*
- * Get Max ID
- */
- stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
- result = stmt2.executeQuery();
- if (result.next()) {
- maxid = result.getInt(1);
- } else {
- throw new RuntimeException("Queue init has failed earlier");
- }
-
- /*
- * Update value and exit the loop
- */
- if (maxid > nextkey) {
- stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
- stmt3.executeUpdate();
- logger.debug("Update MIN ID by one");
- db.commit(connection);
- break;
- }
-
- db.commit(connection);
- } catch (SQLException sql) {
- db.rollback(connection);
- throw sql;
- } finally {
- try {
- unLockTables(connection);
- } catch (SQLException sql) {
- sql.printStackTrace();
- logger.error("Cannot Unlock Table", sql);
- }
-
- db.quietlyClose(connection, stmt, stmt2, stmt3);
- }
-
- /*
- * Sleep if there is nothing to do
- */
- try {
- wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
- logger.debug("Wait=" + wait);
- Thread.sleep(wait);
- } catch (InterruptedException e) {
- logger.warn("Queue is interrupted to close");
- throw e;
- }
- }
-
- /*
- * Create Subscription Object from MIN_ID and delete data in table
- */
- Object resultObj = null;
- int key = -1;
- try {
- connection = db.connect();
- stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
- result = stmt.executeQuery();
- if (result.next()) {
- key = result.getInt(1);
- InputStream in = result.getAsciiStream(2);
- ObjectInputStream s = new ObjectInputStream(in);
- try {
- resultObj = s.readObject();
- } catch (ClassNotFoundException e) {
- logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
- }
- } else {
- throw new RuntimeException(
- "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
- }
-
- try {
- String query = QueueContants.SQL_DELETE_STATEMENT + key;
- stmt2 = connection.prepareStatement(query);
- stmt2.executeUpdate();
- db.commit(connection);
- } catch (SQLException sqle) {
- db.rollback(connection);
- throw sqle;
- }
- } finally {
- db.quietlyClose(connection, stmt, stmt2);
- }
- return resultObj;
- }
-
- private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
- DatabaseType databaseType = DatabaseType.other;
- int[] aiupdateCounts = new int[0];
- boolean bError = false;
- try {
-
- con.setAutoCommit(false);
-
- stmt.clearBatch();
-
- int totalStatement = 0;
-
- try {
- databaseType = DatabaseCreator.getDatabaseType(con);
- } catch (Exception e) {
- logger.error("Error evaluating database type", e);
- }
- // add SQL statements
- if (DatabaseType.mysql.equals(databaseType)) {
- stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
- totalStatement++;
- } else if (DatabaseType.derby.equals(databaseType)) {
- stmt.addBatch("lock table disQ in exclusive mode;");
- totalStatement++;
- stmt.addBatch("lock table MaxIDTable in exclusive mode;");
- totalStatement++;
- stmt.addBatch("lock table MinIDTable in exclusive mode;");
- totalStatement++;
- }
- stmt.addBatch("Delete from disQ;");
- totalStatement++;
- stmt.addBatch("Delete from MaxIDTable;");
- totalStatement++;
- stmt.addBatch("Delete from MinIDTable;");
- totalStatement++;
-
- aiupdateCounts = new int[totalStatement];
-
- // execute the statements
- aiupdateCounts = stmt.executeBatch();
-
- } catch (BatchUpdateException bue) {
- bError = true;
- aiupdateCounts = bue.getUpdateCounts();
- logger.error("SQLException: " + bue.getMessage());
- logger.error("SQLState: " + bue.getSQLState());
- logger.error("Message: " + bue.getMessage());
- logger.error("Vendor: " + bue.getErrorCode());
- logger.info("Update counts: ");
-
- for (int i = 0; i < aiupdateCounts.length; i++) {
- logger.error(aiupdateCounts[i] + " ");
- }
-
- SQLException SQLe = bue;
- while (SQLe != null) {
- SQLe = SQLe.getNextException();
- logger.error(SQLe.getMessage(), SQLe);
- }
- } catch (SQLException SQLe) {
- bError = true;
- throw SQLe;
- } finally {
- // determine operation result
- for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
- int iProcessed = aiupdateCounts[i];
- /**
- * The int values that can be returned in the update counts array are: <br/>
- * -3--Operation error. A driver has the option to stop at the first error and throw a
- * BatchUpdateException or to report the error and continue. This value is only seen in the latter case. <br/>
- * -2--The operation was successful, but the number of rows affected is unknown. <br/>
- * Zero--DDL statement or no rows affected by the operation. Greater than zero--Operation was
- * successful, number of rows affected by the operation.
- */
- if (iProcessed < 0 && iProcessed != -2) {
- // error on statement
- logger.info("Error batch." + iProcessed);
- bError = true;
- }
- }
-
- if (bError) {
- con.rollback();
- } else {
- con.commit();
- }
-
- /*
- * Unlock table after rollback and commit, since it is not automatic in MySql
- */
-
- if (DatabaseType.mysql.equals(databaseType)) {
- PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
- db.executeUpdateAndClose(prepareStmt);
- }
- } // end finally
- logger.info("Queue is cleaned.");
- }
-
- private void lockMaxMinTables(Connection connection) throws SQLException {
- DatabaseType databaseType = DatabaseType.other;
- try {
- databaseType = DatabaseCreator.getDatabaseType(connection);
- } catch (Exception e) {
- logger.error("Error evaluating database type", e);
- }
-
- /*
- * Must turn off auto commit
- */
- connection.setAutoCommit(false);
- String sql = null;
- Statement stmt = null;
- try {
- switch (databaseType) {
- case derby:
- sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
- String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
- stmt = connection.createStatement();
- stmt.addBatch(sql);
- stmt.addBatch(sql2);
- stmt.executeBatch();
- break;
- case mysql:
- sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
- + " write";
- stmt = connection.createStatement();
- stmt.executeQuery(sql);
- break;
- default:
- return;
- }
-
- } finally {
- if (stmt != null && !stmt.isClosed()) {
- stmt.close();
- }
- }
- }
-
- private void unLockTables(Connection connection) throws SQLException {
- DatabaseType databaseType = DatabaseType.other;
- try {
- databaseType = DatabaseCreator.getDatabaseType(connection);
- } catch (Exception e) {
- logger.error("Error evaluating database type", e);
- }
-
- try {
- switch (databaseType) {
- case derby:
- /*
- * Derby doesn't have explicit unlock SQL It uses commit or rollback as a unlock mechanism, so make sure
- * that connection is always commited or rollbacked
- */
- break;
- case mysql:
- String sql = "unlock tables";
- PreparedStatement stmt = null;
- try {
- stmt = connection.prepareStatement(sql);
- stmt.executeQuery();
- db.commit(connection);
- } finally {
- if (stmt != null) {
- stmt.close();
- }
- }
- break;
- default:
- return;
- }
- } finally {
- /*
- * Set auto commit when needed
- */
- if (db.isAutoCommit()) {
- connection.setAutoCommit(true);
- }
- }
- }
-
- private static class SubscriptionConstants {
-
- public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
-
- public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
-
- public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
- + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
- + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
-
- public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
- + " WHERE SubscriptionId= ?";
-
- public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
-
- public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
- + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
- + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
-
- public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
- + " WHERE SubscriptionId= ?";
-
- public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
- }
-
- private static class QueueContants {
- public static final int FINAL_WAIT_IN_MILI = 5000;
-
- public static final String TABLE_NAME = "disQ";
-
- public static final String TABLE_NAME_MAXID = "MaxIDTable";
-
- public static final String TABLE_NAME_MINID = "MinIDTable";
-
- public static final int STATUS_OPEN = 0;
-
- public static final String SQL_INSERT_STATEMENT = "INSERT INTO " + TABLE_NAME
- + " (id, trackId, message, status) " + "VALUES (?,?,?," + STATUS_OPEN + ")";
-
- public static String SQL_DELETE_STATEMENT = "DELETE FROM " + TABLE_NAME + " WHERE id=";
-
- public static String SQL_SELECT_STATEMENT = "SELECT id,message FROM " + TABLE_NAME + " WHERE id=";
-
- public static String SQL_MAX_ID_SEPERATE_TABLE = "SELECT maxID FROM " + TABLE_NAME_MAXID;
-
- public static String SQL_MIN_ID_SEPERATE_TABLE = "SELECT minID FROM " + TABLE_NAME_MINID;
-
- public static String SQL_MAX_ID_INSERT = "INSERT INTO " + TABLE_NAME_MAXID + " (maxID) VALUES (1)";
-
- public static String SQL_MIN_ID_INSERT = "INSERT INTO " + TABLE_NAME_MINID + " (minID) VALUES (1)";
-
- public static String SQL_MAX_ID_INCREMENT = "UPDATE " + TABLE_NAME_MAXID + " SET maxID = maxID+1 WHERE maxID =";
-
- public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
deleted file mode 100644
index 5430b33..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-public interface WsmgQueue {
-
- void cleanup();
-
- void enqueue(Object object, String trackId);
-
- Object blockingDequeue() throws InterruptedException;
-
- void dispose();
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
deleted file mode 100644
index 2a1d1cb..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.storage;
-
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-
-public interface WsmgStorage {
-
- List<SubscriptionEntry> getAllSubscription();
-
- int insert(SubscriptionState subscription);
-
- int delete(String subscriptionId);
-
- void dispose();
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
deleted file mode 100644
index d3be422..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.commons.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.util.BrokerUtil;
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMNamespace;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Compare two OMElement with its namespace, attributes, children, and text. Current implementation supports ignore
- * namespace checking i.e. if the namespace is in the list, it is skipped and return as equals.
- */
-public class OMElementComparator {
-
- private static final Logger log = LoggerFactory.getLogger(OMElementComparator.class);
-
- private static List<String> ignorableNamespaceList = new ArrayList<String>();
-
- private OMElementComparator() {
- }
-
- public void addIgnorableNamespace(String nsURI) {
- ignorableNamespaceList.add(nsURI);
- }
-
- public void clearIgnorableNamespaces() {
- ignorableNamespaceList.clear();
- }
-
- public static boolean compare(OMElement elementOne, OMElement elementTwo) {
-
- if (isIgnorable(elementOne) || isIgnorable(elementTwo)) {
- // ignore if the elements belong to any of the ignorable namespaces
- // list
- return true;
- } else if (elementOne == null && elementTwo == null) {
- log.debug("Both Elements are null.");
- return true;
- } else if (elementOne == null || elementTwo == null) {
- log.debug("One of item to compare is null");
- return false;
- }
-
- return BrokerUtil.sameStringValue(elementOne.getLocalName(), elementTwo.getLocalName())
- && compare(elementOne.getNamespace(), elementTwo.getNamespace())
- && compareAttibutes(elementOne, elementTwo)
- /*
- * Trimming the value of the XMLElement is not correct since this compare method cannot be used to
- * compare element contents with trailing and leading whitespaces BUT for the practical side of tests
- * and to get the current tests working we have to trim() the contents
- */
- && BrokerUtil.sameStringValue(elementOne.getText().trim(), elementTwo.getText().trim())
- && compareChildren(elementOne, elementTwo);
- }
-
- private static boolean isIgnorable(OMElement elt) {
- if (elt != null) {
- OMNamespace namespace = elt.getNamespace();
- if (namespace != null) {
- return ignorableNamespaceList.contains(namespace.getNamespaceURI());
- } else {
- return false;
- }
- } else {
- return false;
- }
- }
-
- private static boolean compareChildren(OMElement elementOne, OMElement elementTwo) {
- HashMap<QName, OMElement> map = new HashMap<QName, OMElement>();
- Iterator oneIter = elementOne.getChildElements();
- while (oneIter.hasNext()) {
- OMElement elementOneChild = (OMElement) oneIter.next();
- OMElement elementTwoChild = elementTwo.getFirstChildWithName(elementOneChild.getQName());
- if (!compare(elementOneChild, elementTwoChild)) {
- return false;
- }
-
- /*
- * Cache for later access
- */
- map.put(elementOneChild.getQName(), elementOneChild);
- }
-
- /*
- * Case the second element has more elements than the first
- */
- Iterator twoIter = elementTwo.getChildElements();
- while (twoIter.hasNext()) {
- OMElement elementTwoChild = (OMElement) twoIter.next();
- if (!isIgnorable(elementTwoChild) && !map.containsKey(elementTwoChild.getQName())) {
- return false;
- }
- }
-
- return true;
- }
-
- private static boolean compareAttibutes(OMElement elementOne, OMElement elementTwo) {
- int elementOneAtribCount = 0;
- int elementTwoAtribCount = 0;
- Iterator oneIter = elementOne.getAllAttributes();
- while (oneIter.hasNext()) {
-
- /*
- * This catches a case where the first one has more items than the second one (one.attributes.size >
- * two.attributes.size) and a case where the first and the second have a different attributes.
- * (one.attributes.size == two.attributes.size)
- */
- OMAttribute omAttribute = (OMAttribute) oneIter.next();
- OMAttribute attr = elementTwo.getAttribute(omAttribute.getQName());
- if (attr == null) {
- log.debug("Attribute " + omAttribute + " is not found in both elements.");
- return false;
- }
- /*
- * Count attributes in the first item
- */
- elementOneAtribCount++;
- }
-
- /*
- * Count attributes in the second item
- */
- Iterator elementTwoIter = elementTwo.getAllAttributes();
- while (elementTwoIter.hasNext()) {
- elementTwoIter.next();
- elementTwoAtribCount++;
- }
-
- /*
- * This catches a case where the second one has more items than the first one. (two.attributes.size >
- * one.attributes.size)
- */
- log.debug("Number of Attributes are equal? : " + (elementOneAtribCount == elementTwoAtribCount));
- return elementOneAtribCount == elementTwoAtribCount;
- }
-
- /*
- * Compare only URI not prefix
- */
- private static boolean compare(OMNamespace x, OMNamespace y) {
- log.debug("Compare namespace:" + x + " with " + y);
- return (x == null && y == null)
- || (x != null && y != null && BrokerUtil.sameStringValue(x.getNamespaceURI(), y.getNamespaceURI()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
deleted file mode 100644
index 06d435a..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.config;
-
-import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
-
-public class WSMGParameter {
-
- /**
- * Global variable for the Out Going queue (contains message to send to subscribers)
- */
- public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
-
- public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
-
- // enable or disable the TimerThread that displays the message rate
- public static final boolean measureMessageRate = false; // default=false
-
- public static final boolean enableAutoCleanSubscriptions = false; // default=true
-
- public static final boolean debugYFilter = false;
-
- public static final boolean cleanQueueonStartUp = false; // default=true
- public static final boolean requireSubscriptionRenew = true;
- public static final long expirationTime = 1000 * 60 * 60 * 72; // 72 hours
-
- public static final boolean showTrackId = false;
- public static final String versionSetUpNote = "Added_Sub_Timeout";
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
deleted file mode 100644
index c12f460..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.config;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.airavata.wsmg.broker.NotificationProcessor;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
-import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-
-public class WsmgConfigurationContext {
-
- private OutGoingQueue outgoingQueue = null;
-
- private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
-
- private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
-
- private ConfigurationManager configurationManager;
-
- private SubscriptionManager subscriptionMan;
-
- private NotificationProcessor notificationProcessor;
-
- private WsmgStorage storage;
-
- private WsmgQueue queue;
-
- public WsmgConfigurationContext() {
- outgoingQueue = new OutGoingQueue();
- setDirectFilter();
- }
-
- private void setDirectFilter() {
- messageMatchers.add(new YFilterMessageMatcher());
- // messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
- // publisherRegistrationDB));
- }
-
- public List<AbstractMessageMatcher> getMessageMatchers() {
- return messageMatchers;
- }
-
- public OutGoingQueue getOutgoingQueue() {
- return outgoingQueue;
- }
-
- public ConfigurationManager getConfigurationManager() {
- return configurationManager;
- }
-
- public SubscriptionManager getSubscriptionManager() {
- return subscriptionMan;
- }
-
- public NotificationProcessor getNotificationProcessor() {
- return notificationProcessor;
- }
-
- public void setConfigurationManager(ConfigurationManager configMan) {
- this.configurationManager = configMan;
- }
-
- public void setSubscriptionManager(SubscriptionManager subMan) {
- this.subscriptionMan = subMan;
- }
-
- public void setNotificationProcessor(NotificationProcessor processor) {
- this.notificationProcessor = processor;
- }
-
- public WsmgStorage getStorage() {
- return storage;
- }
-
- public void setStorage(WsmgStorage s) {
- storage = s;
- }
-
- public WsmgQueue getQueue() {
- return queue;
- }
-
- public void setQueue(WsmgQueue s) {
- queue = s;
- }
-
- public ReentrantReadWriteLock getMessegeMatcherLock() {
- return messegeMatchersLock;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
deleted file mode 100644
index dbd16f6..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-
-public abstract class AbstractMessageMatcher {
-
- protected Map<String, String> currentMessageCache;
-
- private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
-
- // infer types of
- // key and value
- public AbstractMessageMatcher() {
- this.currentMessageCache = new HashMap<String, String>();
- }
-
- public abstract void start(String carrierLocation);
-
- // Message can be either String or XmlElement. Added XMLElement for
- // performance consideration so that if not using queue,
- // we don't need to serialize to String
- // If we already serialized to String because of the using queue, we don't
- // have to change back to XMLElement until the delivery to consumers
-
- public abstract void populateMatches(String wsntMessageConverterClassName,
- AdditionalMessageContent additionalMessageContent, String message, String topic,
- List<ConsumerInfo> matchedConsumers);
-
- public abstract int handleUnsubscribe(String subscriptionId);
-
- public abstract void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId);
-
- public String handleGetCurrentMessage(String topic) {
- String currentMessage = currentMessageCache.get(topic);
- return currentMessage;
- }
-
- public void readLockUnlockConsumers(boolean lock) {
- ReadLock readlock = consumerListLock.readLock();
- lockUnlock(readlock, lock);
- }
-
- public void writeLockUnlockConsumers(boolean lock) {
- WriteLock writeLock = consumerListLock.writeLock();
- lockUnlock(writeLock, lock);
- }
-
- private void lockUnlock(Lock l, boolean lock) {
-
- if (lock) {
- l.lock();
- } else {
- l.unlock();
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
deleted file mode 100644
index 1881968..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.XPath;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import edu.berkeley.cs.db.yfilter.filter.EXfilterBasic;
-import edu.berkeley.cs.db.yfilter.filter.SystemGlobals;
-import edu.berkeley.cs.db.yfilterplus.queryparser.Query;
-import edu.berkeley.cs.db.yfilterplus.queryparser.XPQuery;
-import edu.berkeley.cs.db.yfilterplus.xmltree.XMLTree;
-
-public class YFilterInfo {
- private static final Logger logger = LoggerFactory.getLogger(YFilterInfo.class);
-
- private EXfilterBasic yfilter = new EXfilterBasic();
- private HashMap<Integer, String> yFilterIdToXPath = new HashMap<Integer, String>();
- private HashMap<Integer, Query> yFilterIdToQuery = new HashMap<Integer, Query>();
- private HashMap<String, Integer> xPathToYFilterId = new HashMap<String, Integer>();
- private ConsumerListManager consumerListmanager = new ConsumerListManager();
- private int index = 0;
- private int counter = 0;
-
- public EXfilterBasic getYfilter() {
- return yfilter;
- }
-
- public void setYfilter(EXfilterBasic yfilter) {
- this.yfilter = yfilter;
- }
-
- public HashMap<Integer, String> getYFilterIdToXPath() {
- return yFilterIdToXPath;
- }
-
- public void setYFilterIdToXPath(HashMap<Integer, String> filterIdToXPath) {
- yFilterIdToXPath = filterIdToXPath;
- }
-
- public void addXPathQuery(String xpathExpression, String subscriptionId, SubscriptionState subscribeRequest)
- throws RuntimeException {
- index++;
- counter++;
- if (WSMGParameter.debugYFilter)
- logger.debug("QueryExp=" + xpathExpression);
-
- Integer yFilterIdObj = xPathToYFilterId.get(xpathExpression);
- int yFilterId = -1;
- if (yFilterIdObj != null) {
- yFilterId = yFilterIdObj.intValue();
- } else {
- Query query = XPQuery.parseQuery(xpathExpression, index);
- if (query == null) {
- throw new RuntimeException("Invalid XPath expression:" + xpathExpression);
- }
- if (WSMGParameter.debugYFilter)
- logger.debug("addSubscription " + xpathExpression + " query :" + query);
- yFilterId = yfilter.addQuery(query);
- if (WSMGParameter.debugYFilter)
- yfilter.printQueryIndex();
- xPathToYFilterId.put(xpathExpression, Integer.valueOf(yFilterId));
- yFilterIdToXPath.put(new Integer(yFilterId), xpathExpression);
- yFilterIdToQuery.put(yFilterId, query);
- }
- if (WSMGParameter.debugYFilter)
- logger.debug("YFilterId=" + yFilterId);
-
- consumerListmanager.addToConsumerList(xpathExpression, subscribeRequest, subscriptionId);
- }
-
- public int removeSubscription(String subscriptionId) {
-
- String xPath = consumerListmanager.getTokenBySubscriptionId(subscriptionId);
- int result = consumerListmanager.removeFromConsumerList(subscriptionId, xPath);
- if (result == 0) {
- return 0;
- }
- int currentConsumerCount = consumerListmanager.getConsumerListByToken(xPath).size();
- if (currentConsumerCount == 0) {
- Integer yFilterId = xPathToYFilterId.get(xPath);
- Query q = yFilterIdToQuery.get(yFilterId);
- yfilter.deleteQuery(q, q.getQueryId());
- yFilterIdToQuery.remove(yFilterId);
- }
- counter--;
- return result;
- }
-
- public List<ConsumerInfo> getMatchingConsumerList(String messageString) {
- List<ConsumerInfo> matchingConsumerList = new LinkedList<ConsumerInfo>();
- XMLTree tree = new XMLTree(new java.io.StringReader(messageString));
- if (WSMGParameter.debugYFilter)
- tree.print();
- yfilter.setEventSequence(tree.getEvents());
- yfilter.startParsing();
-
- // print the matched queries //
- if (SystemGlobals.hasQueries) {
- if (WSMGParameter.debugYFilter)
-
- yfilter.printQueryResults(System.out);
- } else {
- System.out.println("no match");
- return matchingConsumerList;
- }
-
- Iterator<Integer> it = (Iterator<Integer>) yfilter.getMatchedQueries().iterator();
- while (it.hasNext()) {
- Integer qid = it.next();
-
- String xpath = yFilterIdToXPath.get(qid);
- ConsumerList consumerList = consumerListmanager.getConsumerListByToken(xpath);
-
- if (consumerList != null) {// has subscription to this topic
- matchingConsumerList.addAll(consumerList.getConsumerList());
- }
- }
- yfilter.clear();
- return matchingConsumerList;
- }
-
- public int getCounter() {
- return counter;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
deleted file mode 100644
index 0f59ec1..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.XPath;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class YFilterMessageMatcher extends AbstractMessageMatcher {
-
- private static final Logger logger = LoggerFactory.getLogger(YFilterMessageMatcher.class);
-
- private OutGoingQueue outGoingQueue = null;
-
- private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
- private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
-
- // used for topic only subscription, so that we don't have to create a
- // YFilter object
- private ConsumerListManager consumerListmanager = new ConsumerListManager();
-
- public YFilterMessageMatcher() {
- super();
- }
-
- public void start(String carrierLocation) {
- currentMessageCache = new Hashtable<String, String>();
- }
-
- @Override
- public void populateMatches(String wsntMessageConverterClassName,
- AdditionalMessageContent additionalMessageContent, String message, String topic,
- List<ConsumerInfo> matchedConsumers) {
-
- assert (matchedConsumers != null);
-
- if (WSMGParameter.debugYFilter)
- logger.info("Message In YFilterAdapter=" + message);
-
- // Important Get a Read Lock....
- readLockUnlockConsumers(true);
- try {
-
- // 1, Topic only
- ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
- if (topicConsumerList != null) {// has subscription to this topic
-
- ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
- matchedConsumers.addAll(list);
- }
- // 2, wild card topic only
- ConsumerList wildcardConsumerList = consumerListmanager
- .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
- if (wildcardConsumerList != null) {// has wildcard subscriptions
- List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
- if (wildCardConsumerInfoList != null) {
- // System.out.println("ConsumerListSize2="+wildCardConsumerInfoList.size());
- matchedConsumers.addAll(wildCardConsumerInfoList);
- }
- }
- // 3, topic with Xpath
- YFilterInfo yfilterInfo = topicToYFilterInfo.get(topic);
- if (yfilterInfo != null) {
- List<ConsumerInfo> topicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
- if (topicAndXPathConsumerInfoList != null) {
- // System.out.println("ConsumerListSize3="+topicAndXPathConsumerInfoList.size());
- matchedConsumers.addAll(topicAndXPathConsumerInfoList);
- }
- }
- // 4, wild card topic with Xpath (XPath only)
- yfilterInfo = topicToYFilterInfo.get(WsmgCommonConstants.WILDCARD_TOPIC);
- if (yfilterInfo != null) {
- List<ConsumerInfo> wildcardTopicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
- if (wildcardTopicAndXPathConsumerInfoList != null) {
- // System.out.println("ConsumerListSize4="+wildcardTopicAndXPathConsumerInfoList.size());
- matchedConsumers.addAll(wildcardTopicAndXPathConsumerInfoList);
- }
- }
-
- } finally {
-
- // Release the Read Lock...
- readLockUnlockConsumers(false);
- }
-
- }
-
- public int handleUnsubscribe(String subscriptionId) {
-
- int ret = 1;
-
- writeLockUnlockConsumers(true);
- try {
- String topicExpression = subIdToTopic.get(subscriptionId);
- if (subscriptionId.startsWith("T")) { // Topic only
- consumerListmanager.removeFromConsumerList(subscriptionId, topicExpression);
- } else {
- YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
- if (yfilterInfo != null) {
- yfilterInfo.removeSubscription(subscriptionId);
- if (yfilterInfo.getCounter() == 0) {
- yfilterInfo = null;
- topicToYFilterInfo.remove(topicExpression);
- }
- } else {
- System.out.println("ERROR: Cannot find subscription with the subId=" + subscriptionId);
- ret = 0;
- }
- }
- } finally {
- writeLockUnlockConsumers(false);
- }
-
- return ret;
- }
-
- public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
-
- // Get the write lock
- writeLockUnlockConsumers(true);
- try {
-
- String topicExpression = subscribeRequest.getLocalTopic();
- subIdToTopic.put(subscriptionId, topicExpression);
-
- String xpathExpression = subscribeRequest.getXpathString();
- if (xpathExpression == null || xpathExpression.length() == 0) { // Topic
- // only
- consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
- } else {
- YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
- if (yfilterInfo == null) {
- yfilterInfo = new YFilterInfo();
- topicToYFilterInfo.put(topicExpression, yfilterInfo);
- }
- yfilterInfo.addXPathQuery(xpathExpression, subscriptionId, subscribeRequest);
- }
-
- if (outGoingQueue == null) {
- outGoingQueue = subscribeRequest.getOutGoingQueue();
- }
-
- } finally {
- // release the write lock
- writeLockUnlockConsumers(false);
- }
-
- return;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
deleted file mode 100644
index ba50e45..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.matching.simpleTopic;
-
-import java.util.ArrayList;
-import java.util.Hashtable;
-import java.util.List;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.broker.ConsumerList;
-import org.apache.airavata.wsmg.broker.ConsumerListManager;
-import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
-import org.apache.airavata.wsmg.messenger.OutGoingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectWsntMessageMatcher extends AbstractMessageMatcher {
-
- private static final Logger logger = LoggerFactory.getLogger(DirectWsntMessageMatcher.class);
-
- private ConsumerListManager consumerListmanager = new ConsumerListManager();
-
- private OutGoingQueue outGoingQueue = null;
-
- public DirectWsntMessageMatcher() {
- super();
- }
-
- public void start(String carrierLocation) {
- currentMessageCache = new Hashtable<String, String>();
- }
-
- public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
-
- String topicExpression = subscribeRequest.getLocalTopic();
- if (topicExpression == null || topicExpression.length() == 0) {
- logger.error("ERROR:WsntAdapterConnection creation failed.");
- return;
- }
-
- writeLockUnlockConsumers(true);
-
- try {
- consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
- if (outGoingQueue == null) {
- outGoingQueue = subscribeRequest.getOutGoingQueue();
- }
- } finally {
- writeLockUnlockConsumers(false);
- }
-
- return;
-
- }
-
- public int handleUnsubscribe(String subscriptionId) {
-
- int ret = 0;
-
- writeLockUnlockConsumers(true);
- try {
- ret = consumerListmanager.removeFromConsumerList(subscriptionId, null);
- } finally {
- writeLockUnlockConsumers(false);
- }
-
- return ret;
- }
-
- @Override
- public void populateMatches(String wsntMessageConverterClassName,
- AdditionalMessageContent additionalMessageContent, String message, String topic,
- List<ConsumerInfo> matchedConsumers) {
-
- assert (matchedConsumers != null);
-
- readLockUnlockConsumers(true);
-
- try {
-
- ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
- ConsumerList wildcardConsumerList = consumerListmanager
- .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
- if (topicConsumerList != null) {// has subscription to this topic
-
- ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
-
- matchedConsumers.addAll(list);
- }
- if (wildcardConsumerList != null) {// has wildcard subscriptions
- List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
- if (wildCardConsumerInfoList != null) {
- matchedConsumers.addAll(wildCardConsumerInfoList);
- }
- }
-
- } finally {
- readLockUnlockConsumers(false);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
deleted file mode 100644
index c9b255f..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.messenger;
-
-import java.net.NoRouteToHostException;
-import java.net.SocketTimeoutException;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
-import org.apache.airavata.wsmg.util.RunTimeStatistics;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ConsumerUrlManager {
-
- private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
-
- private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new ConcurrentHashMap<String, FailedConsumerInfo>();
-
- private final int defaultMaxRetry;
-
- private long expireTimeGap; // milliseconds
-
- private Timer cleanupTimer;
-
- public ConsumerUrlManager(ConfigurationManager config) {
-
- defaultMaxRetry = config.getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES, 2);
-
- // time is in milliseconds
- expireTimeGap = 1000 * 60 * config.getConfig(WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP, 5l);
-
- // let minimum time to be 1 minute
- long timerThreadInterval = Math.max(expireTimeGap / 5, 1000 * 60);
-
- cleanupTimer = new Timer("Failed consumer url handler", true);
- cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
- }
-
- public void stop() {
- logger.info("Stop ConsumerUrlManager");
- if (this.cleanupTimer != null) {
- this.cleanupTimer.cancel();
- }
- logger.info("ConsumerUrlManager Stopped");
- }
-
- public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,
- AxisFault exception, AdditionalMessageContent headers) {
- String url = consumerEndpointReference.getAddress();
-
- RunTimeStatistics.addNewFailedDeliverTime(timeTaken);
- RunTimeStatistics.addFailedConsumerURL(url);
-
- if (isEligibleToBlackList(exception)) {
-
- synchronized (failedConsumerUrls) {
- FailedConsumerInfo info = failedConsumerUrls.get(url);
- if (info == null) {
- info = new FailedConsumerInfo();
- failedConsumerUrls.put(url, info);
- }
- info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
- }
-
- } else {
-
- String errorMsg = String.format("unable to deliver message: [%s] to consumer: [%s], " + "reason: [%s]",
- headers.toString(), url, exception.getMessage());
-
- logger.error(errorMsg);
- }
- }
-
- public void onSucessfullDelivery(EndpointReference consumerEndpointReference, long timeTaken) {
-
- RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
- synchronized (failedConsumerUrls) {
-
- FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
-
- if (info != null) {
- logger.debug(String.format("message was delivered to " + "previously %d times failed url : %s",
- info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
- }
- }
- }
-
- public boolean isUnavailable(String url) {
- synchronized (failedConsumerUrls) {
- FailedConsumerInfo info = failedConsumerUrls.get(url);
- return (info != null && info.isMaxRetryCountReached());
- }
- }
-
- private boolean isEligibleToBlackList(AxisFault f) {
-
- Throwable cause = f.getCause();
-
- if (cause == null) {
- logger.error("unknown error occured", cause);
- return false;
- }
-
- /*
- * if timeout because of the set timeout in this class In windows, timeout cause ConnectException with
- * "Connection timed out" message
- */
- if (cause instanceof SocketTimeoutException || cause.getMessage().indexOf("timed out") > 0
- || cause instanceof NoRouteToHostException) {
- return true;
- }
-
- return false;
- }
-
- class FailedConsumerInfo {
-
- private int numberOfTimesTried;
- private long expiryTime;
-
- public void incrementNumberOfTimesTried(long expireTime) {
- numberOfTimesTried++;
- expiryTime = expireTime;
- }
-
- public void decrementNumberOfTimeTried() {
- numberOfTimesTried--;
- }
-
- public int getNumberOfTimesTried() {
- return numberOfTimesTried;
- }
-
- public boolean isMaxRetryCountReached() {
- return numberOfTimesTried >= defaultMaxRetry;
- }
-
- public long getLastAtteptExpiryTime() {
- return expiryTime;
- }
-
- }
-
- class URLCleanUpTask extends TimerTask {
-
- @Override
- public void run() {
-
- logger.debug("starting to clean up black listed consumer urls");
- long currentTime = System.currentTimeMillis();
-
- synchronized (failedConsumerUrls) {
- for (Entry<String, FailedConsumerInfo> entry : failedConsumerUrls.entrySet()) {
- FailedConsumerInfo info = entry.getValue();
-
- if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime() >= currentTime) {
-
- info.decrementNumberOfTimeTried();
- logger.info("decrementing number of times" + " tried for consumer url: " + entry.getKey());
-
- }
- }
- }
-
- logger.debug("finished cleaning black listed consumer urls");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
deleted file mode 100644
index 92b6cfe..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
-import org.apache.airavata.wsmg.broker.ConsumerInfo;
-import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
-import org.apache.axiom.om.OMElement;
-
-public interface Deliverable {
- void setProtocol(DeliveryProtocol protocol);
-
- void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
deleted file mode 100644
index 7eb5b4f..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DeliveryProcessor {
-
- private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
-
- private SendingStrategy strategy;
- private Deliverable deliverable;
-
- private boolean running;
- private Thread t;
-
- public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
- this.strategy = strategy;
- this.deliverable = deliverable;
- }
-
- public void start() {
- this.running = true;
- this.t = new Thread(new CheckingAndSending());
- this.t.start();
- }
-
- public void stop() {
- this.running = false;
-
- if (this.t != null) {
- this.t.interrupt();
-
- try {
- this.t.join();
- } catch (InterruptedException ie) {
- logger.error("Wait for sending thread to finish (join) is interrupted");
- }
- }
-
- WSMGParameter.OUT_GOING_QUEUE.dispose();
- }
-
- private class CheckingAndSending implements Runnable {
-
- public void run() {
- strategy.init();
- while (running) {
- logger.debug("run - delivery thread");
- try {
-
- OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
-
- if (WSMGParameter.showTrackId)
- logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
- + ": dequeued from outgoing queue");
-
- strategy.addMessageToSend(outGoingMessage, deliverable);
-
- } catch (Exception e) {
- logger.warn("Unexpected_exception:");
- }
- }
- logger.debug("Shutdown Strategy");
- strategy.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
deleted file mode 100644
index 6764a42..0000000
--- a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.messenger;
-
-import org.apache.airavata.wsmg.commons.OutGoingMessage;
-import org.apache.airavata.wsmg.config.WSMGParameter;
-import org.apache.airavata.wsmg.util.Counter;
-import org.apache.airavata.wsmg.util.TimerThread;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class OutGoingQueue {
-
- private static final Logger logger = LoggerFactory.getLogger(OutGoingQueue.class);
-
- private Counter storeToOutQueueCounter;
-
- public OutGoingQueue() {
- if (WSMGParameter.measureMessageRate) {
- storeToOutQueueCounter = new Counter();
- TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
- new Thread(timerThread).start();
- }
- }
-
- // need synchronized???
- public void storeNotification(OutGoingMessage outGoingMessage, long messageId) {
-
- boolean loop = false;
- do {
- // this outgoing Queue is created inside the messenger which is
- // intended to send the notification message to the consumer.
- WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
- .getTrackId());
- if (WSMGParameter.measureMessageRate) {
- storeToOutQueueCounter.addCounter();
- }
- if (WSMGParameter.testOutGoingQueueMaxiumLength && storeToOutQueueCounter.getCounterValue() < 1000000) {
- loop = true;
- }else{
- loop = false;
- }
- } while (loop);
-
- }
-}