You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:26 UTC
[84/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
new file mode 100644
index 0000000..60a9705
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgPersistantStorage.java
@@ -0,0 +1,773 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.BatchUpdateException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator.DatabaseType;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.util.Counter;
+import org.apache.airavata.wsmg.util.TimerThread;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WsmgPersistantStorage implements WsmgStorage, WsmgQueue {
+ private static final Logger logger = LoggerFactory.getLogger(WsmgPersistantStorage.class);
+
+ /*
+ * Table name
+ */
+ private static final String TABLE_NAME_TO_CHECK = SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+ private Counter storeToDBCounter = new Counter();
+
+ private JdbcStorage db;
+
+ public WsmgPersistantStorage(String jdbcUrl, String jdbcDriver) {
+
+ db = new JdbcStorage(jdbcUrl, jdbcDriver);
+
+ Connection conn = null;
+ try {
+ /*
+ * Check database
+ */
+ conn = db.connect();
+ if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
+ DatabaseCreator.createMsgBrokerDatabase(conn);
+ logger.info("New Database created for Message Broker");
+ } else {
+ logger.debug("Database already created for Message Broker!");
+ }
+
+ if (WSMGParameter.measureMessageRate) {
+ TimerThread timerThread = new TimerThread(storeToDBCounter, " StoreSubScriptionToDBCounter");
+ new Thread(timerThread).start();
+ }
+
+ initMessageQueueStorage();
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("Database failure");
+ } finally {
+ db.closeConnection(conn);
+ }
+ }
+
+ public void dispose() {
+ if (db != null) {
+ db.closeAllConnections();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.airavata.wsmg.commons.storage.WsmgStorage#getAllSubscription()
+ */
+ public List<SubscriptionEntry> getAllSubscription() {
+
+ ArrayList<SubscriptionEntry> ret = new ArrayList<SubscriptionEntry>();
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+
+ // get number of row first and increase the arrayList size for
+ // better performance
+ int size = db.countRow(SubscriptionConstants.TABLE_NAME_EXPIRABLE_SUBCRIPTIONS, "*");
+
+ conn = db.connect();
+ stmt = conn.prepareStatement(SubscriptionConstants.EXP_SELECT_QUERY);
+ ResultSet rs = stmt.executeQuery();
+ ret.ensureCapacity(size);
+
+ if (rs != null) {
+
+ /*
+ * Buffer data
+ */
+ int nRead;
+ byte[] buffer = new byte[1024];
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+
+ while (rs.next()) {
+ SubscriptionEntry subscriptionEntry = new SubscriptionEntry();
+ subscriptionEntry.setSubscriptionId(rs.getString("SubscriptionId"));
+
+ /*
+ * Read Binary Stream
+ */
+ InputStream inStream = null;
+
+ try {
+ inStream = rs.getBinaryStream("content");
+ while ((nRead = inStream.read(buffer)) != -1) {
+ outStream.write(buffer, 0, nRead);
+ }
+ outStream.flush();
+
+ subscriptionEntry.setSubscribeXml(new String(outStream.toByteArray()));
+
+ } catch (IOException ie) {
+ logger.error("Unable to read XML from database", ie);
+
+ // skip this subscription entry
+ continue;
+ } finally {
+ // clear all data in outputStream
+ outStream.reset();
+
+ // close database stream
+ if (inStream != null) {
+ try {
+ inStream.close();
+ } catch (Exception e) {
+ logger.error("Cannot close database stream", e);
+ }
+ }
+ }
+
+ ret.add(subscriptionEntry);
+
+ }
+ }
+ } catch (SQLException ex) {
+ logger.error("sql exception occured", ex);
+ } finally {
+ db.quietlyClose(conn, stmt);
+ }
+ return ret;
+ }
+
+ public int insert(SubscriptionState subscription) {
+ String address = subscription.getConsumerReference().getAddress();
+ Map<QName, OMElement> referenceParametersMap = subscription.getConsumerReference().getAllReferenceParameters();
+
+ String consumerReferenceParameters = null;
+ if (referenceParametersMap == null) {
+ consumerReferenceParameters = "";
+ } else {
+
+ StringBuffer buffer = new StringBuffer();
+
+ for (Iterator<OMElement> ite = referenceParametersMap.values().iterator(); ite.hasNext();) {
+ OMElement currentReferenceParameter = ite.next();
+
+ try {
+ buffer.append(currentReferenceParameter.toStringWithConsume());
+ } catch (XMLStreamException se) {
+ logger.error("unable to convert reference parameter", se);
+ }
+
+ }
+ consumerReferenceParameters = buffer.toString();
+ }
+
+ int policyValue = WsmgCommonConstants.WSRM_POLICY_FALSE;
+ if (subscription.isWsrmPolicy()) {
+ policyValue = WsmgCommonConstants.WSRM_POLICY_TRUE;
+ }
+
+ Timestamp now = new Timestamp(System.currentTimeMillis());
+
+ int result = 0;
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+
+ connection = db.connect();
+ stmt = connection.prepareStatement(SubscriptionConstants.EXP_INSERT_SQL_QUERY);
+
+ stmt.setString(1, subscription.getId());
+ stmt.setBinaryStream(2, new ByteArrayInputStream(subscription.getSubscribeXml().getBytes()), subscription
+ .getSubscribeXml().getBytes().length);
+ stmt.setInt(3, policyValue);
+ stmt.setString(4, subscription.getLocalTopic());
+ stmt.setString(5, subscription.getXpathString());
+ stmt.setString(6, address);
+ stmt.setBinaryStream(7, new ByteArrayInputStream(consumerReferenceParameters.getBytes()),
+ consumerReferenceParameters.getBytes().length);
+ stmt.setTimestamp(8, now);
+ result = db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+
+ storeToDBCounter.addCounter();
+
+ } catch (SQLException ex) {
+ logger.error("sql exception occured", ex);
+ db.rollbackAndFree(connection);
+ }
+ return result;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.airavata.wsmg.commons.storage.SubscriptionStorage#delete(java .lang.String)
+ */
+ public int delete(String subscriptionId) {
+ int result = 0;
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SubscriptionConstants.EXP_DELETE_SQL_QUERY);
+ stmt.setString(1, subscriptionId);
+ result = db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ logger.error("sql exception occured", sql);
+ }
+ return result;
+ }
+
+ public void cleanup() {
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = db.connect();
+ stmt = conn.createStatement();
+ batchCleanDB(stmt, conn);
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ } finally {
+ if (db.isAutoCommit()) {
+ try {
+ conn.setAutoCommit(true);
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ db.quietlyClose(conn, stmt);
+ }
+ }
+
+ public Object blockingDequeue() throws InterruptedException {
+ while (true) {
+ try {
+ return retrive();
+ } catch (SQLException e) {
+ logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public void enqueue(Object object, String trackId) {
+
+ // Get the Max ID cache and update and unlock the table
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
+ try {
+ int nextkey;
+
+ connection = db.connect();
+
+ lockMaxMinTables(connection);
+
+ stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+
+ ResultSet result = stmt.executeQuery();
+
+ if (result.next()) {
+ nextkey = result.getInt(1);
+
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INCREMENT + (nextkey));
+ stmt2.executeUpdate();
+ } else {
+ throw new RuntimeException("MAX_ID Table is not init, redeploy the service !!!");
+ }
+
+ /**
+ * Before executing the SQL_INSERT_STATEMENT query, we need to unlock
+ * MaxIDTable and MinIDTable since we are going to insert data to another
+ * table, disQ. If we do not unlock tables, insert query fails in MySQL. But
+ * in Derby, this will execute without any issues even without unlocking
+ * tables. Since it fails with MySQL, we need to unlock the tables
+ * before executing the insert query.
+ */
+ try{
+ unLockTables(connection);
+ }catch (SQLException sql) {
+ logger.error("Cannot Unlock Table", sql);
+ }
+
+
+ /*
+ * After update MAX_ID put data into queue table
+ */
+ stmt3 = connection.prepareStatement(QueueContants.SQL_INSERT_STATEMENT);
+ stmt3.setInt(1, nextkey);
+ stmt3.setString(2, trackId);
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(output);
+ out.writeObject(object);
+ byte[] buffer = output.toByteArray();
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+ stmt3.setBinaryStream(3, in, buffer.length);
+ stmt3.executeUpdate();
+ db.commit(connection);
+ } catch (SQLException sqlEx) {
+ db.rollback(connection);
+ logger.error("unable to enque the message in persistant storage", sqlEx);
+ } catch (IOException ioEx) {
+ db.rollback(connection);
+ logger.error("unable to enque the message in persistant storage", ioEx);
+ } finally {
+ try {
+ unLockTables(connection);
+ } catch (SQLException sql) {
+ logger.error("Cannot Unlock Table", sql);
+ }
+
+ db.quietlyClose(connection, stmt, stmt2, stmt3);
+ }
+ }
+
+ private void initMessageQueueStorage() throws SQLException {
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
+ PreparedStatement stmt4 = null;
+ try {
+ connection = db.connect();
+
+ lockMaxMinTables(connection);
+
+ /*
+ * Get Max ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+ ResultSet result = stmt.executeQuery();
+ if (!result.next()) {
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_INSERT);
+ stmt2.executeUpdate();
+ }
+
+ /*
+ * Get Min ID
+ */
+ stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+ result = stmt3.executeQuery();
+ if (!result.next()) {
+ stmt4 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INSERT);
+ stmt4.executeUpdate();
+ }
+ db.commit(connection);
+ } catch (SQLException sqle) {
+ db.rollback(connection);
+ throw sqle;
+ } finally {
+ try {
+ unLockTables(connection);
+ } catch (SQLException sql) {
+ logger.error("Cannot Unlock Table", sql);
+ }
+
+ db.quietlyClose(connection, stmt, stmt2, stmt3, stmt4);
+ }
+ }
+
+ private Object retrive() throws SQLException, IOException, InterruptedException {
+ long wait = 1000;
+ int nextkey = -1;
+ int maxid = -2;
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ PreparedStatement stmt3 = null;
+ ResultSet result = null;
+ while (true) {
+ try {
+ connection = db.connect();
+
+ lockMaxMinTables(connection);
+
+ /*
+ * Get Min ID
+ */
+ stmt = connection.prepareStatement(QueueContants.SQL_MIN_ID_SEPERATE_TABLE);
+ result = stmt.executeQuery();
+ if (result.next()) {
+ nextkey = result.getInt(1);
+ } else {
+ throw new RuntimeException("Queue init has failed earlier");
+ }
+
+ /*
+ * Get Max ID
+ */
+ stmt2 = connection.prepareStatement(QueueContants.SQL_MAX_ID_SEPERATE_TABLE);
+ result = stmt2.executeQuery();
+ if (result.next()) {
+ maxid = result.getInt(1);
+ } else {
+ throw new RuntimeException("Queue init has failed earlier");
+ }
+
+ /*
+ * Update value and exit the loop
+ */
+ if (maxid > nextkey) {
+ stmt3 = connection.prepareStatement(QueueContants.SQL_MIN_ID_INCREMENT + (nextkey));
+ stmt3.executeUpdate();
+ logger.debug("Update MIN ID by one");
+ db.commit(connection);
+ break;
+ }
+
+ db.commit(connection);
+ } catch (SQLException sql) {
+ db.rollback(connection);
+ throw sql;
+ } finally {
+ try {
+ unLockTables(connection);
+ } catch (SQLException sql) {
+ sql.printStackTrace();
+ logger.error("Cannot Unlock Table", sql);
+ }
+
+ db.quietlyClose(connection, stmt, stmt2, stmt3);
+ }
+
+ /*
+ * Sleep if there is nothing to do
+ */
+ try {
+ wait = Math.min((wait + 1000), QueueContants.FINAL_WAIT_IN_MILI);
+ logger.debug("Wait=" + wait);
+ Thread.sleep(wait);
+ } catch (InterruptedException e) {
+ logger.warn("Queue is interrupted to close");
+ throw e;
+ }
+ }
+
+ /*
+ * Create Subscription Object from MIN_ID and delete data in table
+ */
+ Object resultObj = null;
+ int key = -1;
+ try {
+ connection = db.connect();
+ stmt = connection.prepareStatement(QueueContants.SQL_SELECT_STATEMENT + nextkey);
+ result = stmt.executeQuery();
+ if (result.next()) {
+ key = result.getInt(1);
+ InputStream in = result.getAsciiStream(2);
+ ObjectInputStream s = new ObjectInputStream(in);
+ try {
+ resultObj = s.readObject();
+ } catch (ClassNotFoundException e) {
+ logger.error("Cannot Deserialize Object from Database, ClassNotFound. ", e);
+ }
+ } else {
+ throw new RuntimeException(
+ "MAX_ID and MIN_ID are inconsistent with subscription table, need to reset all data value");
+ }
+
+ try {
+ String query = QueueContants.SQL_DELETE_STATEMENT + key;
+ stmt2 = connection.prepareStatement(query);
+ stmt2.executeUpdate();
+ db.commit(connection);
+ } catch (SQLException sqle) {
+ db.rollback(connection);
+ throw sqle;
+ }
+ } finally {
+ db.quietlyClose(connection, stmt, stmt2);
+ }
+ return resultObj;
+ }
+
+ private void batchCleanDB(Statement stmt, Connection con) throws SQLException {
+ DatabaseType databaseType = DatabaseType.other;
+ int[] aiupdateCounts = new int[0];
+ boolean bError = false;
+ try {
+
+ con.setAutoCommit(false);
+
+ stmt.clearBatch();
+
+ int totalStatement = 0;
+
+ try {
+ databaseType = DatabaseCreator.getDatabaseType(con);
+ } catch (Exception e) {
+ logger.error("Error evaluating database type", e);
+ }
+ // add SQL statements
+ if (DatabaseType.mysql.equals(databaseType)) {
+ stmt.addBatch("lock tables disQ write, MaxIDTable write, MinIDTable write;");
+ totalStatement++;
+ } else if (DatabaseType.derby.equals(databaseType)) {
+ stmt.addBatch("lock table disQ in exclusive mode;");
+ totalStatement++;
+ stmt.addBatch("lock table MaxIDTable in exclusive mode;");
+ totalStatement++;
+ stmt.addBatch("lock table MinIDTable in exclusive mode;");
+ totalStatement++;
+ }
+ stmt.addBatch("Delete from disQ;");
+ totalStatement++;
+ stmt.addBatch("Delete from MaxIDTable;");
+ totalStatement++;
+ stmt.addBatch("Delete from MinIDTable;");
+ totalStatement++;
+
+ aiupdateCounts = new int[totalStatement];
+
+ // execute the statements
+ aiupdateCounts = stmt.executeBatch();
+
+ } catch (BatchUpdateException bue) {
+ bError = true;
+ aiupdateCounts = bue.getUpdateCounts();
+ logger.error("SQLException: " + bue.getMessage());
+ logger.error("SQLState: " + bue.getSQLState());
+ logger.error("Message: " + bue.getMessage());
+ logger.error("Vendor: " + bue.getErrorCode());
+ logger.info("Update counts: ");
+
+ for (int i = 0; i < aiupdateCounts.length; i++) {
+ logger.error(aiupdateCounts[i] + " ");
+ }
+
+ SQLException SQLe = bue;
+ while (SQLe != null) {
+ SQLe = SQLe.getNextException();
+ logger.error(SQLe.getMessage(), SQLe);
+ }
+ } catch (SQLException SQLe) {
+ bError = true;
+ throw SQLe;
+ } finally {
+ // determine operation result
+ for (int i = 0; !bError && i < aiupdateCounts.length; i++) {
+ int iProcessed = aiupdateCounts[i];
+ /**
+ * The int values that can be returned in the update counts array are: <br/>
+ * -3--Operation error. A driver has the option to stop at the first error and throw a
+ * BatchUpdateException or to report the error and continue. This value is only seen in the latter case. <br/>
+ * -2--The operation was successful, but the number of rows affected is unknown. <br/>
+ * Zero--DDL statement or no rows affected by the operation. Greater than zero--Operation was
+ * successful, number of rows affected by the operation.
+ */
+ if (iProcessed < 0 && iProcessed != -2) {
+ // error on statement
+ logger.info("Error batch." + iProcessed);
+ bError = true;
+ }
+ }
+
+ if (bError) {
+ con.rollback();
+ } else {
+ con.commit();
+ }
+
+ /*
+ * Unlock table after rollback and commit, since it is not automatic in MySql
+ */
+
+ if (DatabaseType.mysql.equals(databaseType)) {
+ PreparedStatement prepareStmt = con.prepareCall("unlock tables;");
+ db.executeUpdateAndClose(prepareStmt);
+ }
+ } // end finally
+ logger.info("Queue is cleaned.");
+ }
+
+ private void lockMaxMinTables(Connection connection) throws SQLException {
+ DatabaseType databaseType = DatabaseType.other;
+ try {
+ databaseType = DatabaseCreator.getDatabaseType(connection);
+ } catch (Exception e) {
+ logger.error("Error evaluating database type", e);
+ }
+
+ /*
+ * Must turn off auto commit
+ */
+ connection.setAutoCommit(false);
+ String sql = null;
+ Statement stmt = null;
+ try {
+ switch (databaseType) {
+ case derby:
+ sql = "LOCK TABLE " + QueueContants.TABLE_NAME_MAXID + " IN EXCLUSIVE MODE";
+ String sql2 = "LOCK TABLE " + QueueContants.TABLE_NAME_MINID + " IN EXCLUSIVE MODE";
+ stmt = connection.createStatement();
+ stmt.addBatch(sql);
+ stmt.addBatch(sql2);
+ stmt.executeBatch();
+ break;
+ case mysql:
+ sql = "lock tables " + QueueContants.TABLE_NAME_MAXID + " write" + "," + QueueContants.TABLE_NAME_MINID
+ + " write";
+ stmt = connection.createStatement();
+ stmt.executeQuery(sql);
+ break;
+ default:
+ return;
+ }
+
+ } finally {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ }
+ }
+
+ private void unLockTables(Connection connection) throws SQLException {
+ DatabaseType databaseType = DatabaseType.other;
+ try {
+ databaseType = DatabaseCreator.getDatabaseType(connection);
+ } catch (Exception e) {
+ logger.error("Error evaluating database type", e);
+ }
+
+ try {
+ switch (databaseType) {
+ case derby:
+ /*
+ * Derby doesn't have explicit unlock SQL It uses commit or rollback as a unlock mechanism, so make sure
+ * that connection is always commited or rollbacked
+ */
+ break;
+ case mysql:
+ String sql = "unlock tables";
+ PreparedStatement stmt = null;
+ try {
+ stmt = connection.prepareStatement(sql);
+ stmt.executeQuery();
+ db.commit(connection);
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ }
+ break;
+ default:
+ return;
+ }
+ } finally {
+ /*
+ * Set auto commit when needed
+ */
+ if (db.isAutoCommit()) {
+ connection.setAutoCommit(true);
+ }
+ }
+ }
+
+ private static class SubscriptionConstants {
+
+ public static final String TABLE_NAME_EXPIRABLE_SUBCRIPTIONS = "subscription";
+
+ public static final String TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS = "specialSubscription";
+
+ public static final String EXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+ + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+ + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+ public static final String EXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS
+ + " WHERE SubscriptionId= ?";
+
+ public static final String EXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_EXPIRABLE_SUBCRIPTIONS;
+
+ public static final String NONEXP_INSERT_SQL_QUERY = "INSERT INTO " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+ + "(SubscriptionId, content, wsrm, Topics, XPath, ConsumerAddress, ReferenceProperties, CreationTime) "
+ + "VALUES( ? , ? , ? , ? , ? , ? , ? , ?)";
+
+ public static final String NONEXP_DELETE_SQL_QUERY = "DELETE FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS
+ + " WHERE SubscriptionId= ?";
+
+ public static final String NONEXP_SELECT_QUERY = "SELECT * FROM " + TABLE_NAME_NON_EXPIRABLE_SUBCRIPTIONS;
+ }
+
+ private static class QueueContants {
+ public static final int FINAL_WAIT_IN_MILI = 5000;
+
+ public static final String TABLE_NAME = "disQ";
+
+ public static final String TABLE_NAME_MAXID = "MaxIDTable";
+
+ public static final String TABLE_NAME_MINID = "MinIDTable";
+
+ public static final int STATUS_OPEN = 0;
+
+ public static final String SQL_INSERT_STATEMENT = "INSERT INTO " + TABLE_NAME
+ + " (id, trackId, message, status) " + "VALUES (?,?,?," + STATUS_OPEN + ")";
+
+ public static String SQL_DELETE_STATEMENT = "DELETE FROM " + TABLE_NAME + " WHERE id=";
+
+ public static String SQL_SELECT_STATEMENT = "SELECT id,message FROM " + TABLE_NAME + " WHERE id=";
+
+ public static String SQL_MAX_ID_SEPERATE_TABLE = "SELECT maxID FROM " + TABLE_NAME_MAXID;
+
+ public static String SQL_MIN_ID_SEPERATE_TABLE = "SELECT minID FROM " + TABLE_NAME_MINID;
+
+ public static String SQL_MAX_ID_INSERT = "INSERT INTO " + TABLE_NAME_MAXID + " (maxID) VALUES (1)";
+
+ public static String SQL_MIN_ID_INSERT = "INSERT INTO " + TABLE_NAME_MINID + " (minID) VALUES (1)";
+
+ public static String SQL_MAX_ID_INCREMENT = "UPDATE " + TABLE_NAME_MAXID + " SET maxID = maxID+1 WHERE maxID =";
+
+ public static String SQL_MIN_ID_INCREMENT = "UPDATE " + TABLE_NAME_MINID + " SET minID = minID+1 WHERE minID =";
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
new file mode 100644
index 0000000..5430b33
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgQueue.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+public interface WsmgQueue {
+
+ void cleanup();
+
+ void enqueue(Object object, String trackId);
+
+ Object blockingDequeue() throws InterruptedException;
+
+ void dispose();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
new file mode 100644
index 0000000..2a1d1cb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/storage/WsmgStorage.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.storage;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionEntry;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public interface WsmgStorage {
+
+ List<SubscriptionEntry> getAllSubscription();
+
+ int insert(SubscriptionState subscription);
+
+ int delete(String subscriptionId);
+
+ void dispose();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
new file mode 100644
index 0000000..d3be422
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/commons/util/OMElementComparator.java
@@ -0,0 +1,177 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.util.BrokerUtil;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Compare two OMElement with its namespace, attributes, children, and text. Current implementation supports ignore
+ * namespace checking i.e. if the namespace is in the list, it is skipped and return as equals.
+ */
+public class OMElementComparator {
+
+ private static final Logger log = LoggerFactory.getLogger(OMElementComparator.class);
+
+ private static List<String> ignorableNamespaceList = new ArrayList<String>();
+
+ private OMElementComparator() {
+ }
+
+ public void addIgnorableNamespace(String nsURI) {
+ ignorableNamespaceList.add(nsURI);
+ }
+
+ public void clearIgnorableNamespaces() {
+ ignorableNamespaceList.clear();
+ }
+
+ public static boolean compare(OMElement elementOne, OMElement elementTwo) {
+
+ if (isIgnorable(elementOne) || isIgnorable(elementTwo)) {
+ // ignore if the elements belong to any of the ignorable namespaces
+ // list
+ return true;
+ } else if (elementOne == null && elementTwo == null) {
+ log.debug("Both Elements are null.");
+ return true;
+ } else if (elementOne == null || elementTwo == null) {
+ log.debug("One of item to compare is null");
+ return false;
+ }
+
+ return BrokerUtil.sameStringValue(elementOne.getLocalName(), elementTwo.getLocalName())
+ && compare(elementOne.getNamespace(), elementTwo.getNamespace())
+ && compareAttibutes(elementOne, elementTwo)
+ /*
+ * Trimming the value of the XMLElement is not correct since this compare method cannot be used to
+ * compare element contents with trailing and leading whitespaces BUT for the practical side of tests
+ * and to get the current tests working we have to trim() the contents
+ */
+ && BrokerUtil.sameStringValue(elementOne.getText().trim(), elementTwo.getText().trim())
+ && compareChildren(elementOne, elementTwo);
+ }
+
+ private static boolean isIgnorable(OMElement elt) {
+ if (elt != null) {
+ OMNamespace namespace = elt.getNamespace();
+ if (namespace != null) {
+ return ignorableNamespaceList.contains(namespace.getNamespaceURI());
+ } else {
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ private static boolean compareChildren(OMElement elementOne, OMElement elementTwo) {
+ HashMap<QName, OMElement> map = new HashMap<QName, OMElement>();
+ Iterator oneIter = elementOne.getChildElements();
+ while (oneIter.hasNext()) {
+ OMElement elementOneChild = (OMElement) oneIter.next();
+ OMElement elementTwoChild = elementTwo.getFirstChildWithName(elementOneChild.getQName());
+ if (!compare(elementOneChild, elementTwoChild)) {
+ return false;
+ }
+
+ /*
+ * Cache for later access
+ */
+ map.put(elementOneChild.getQName(), elementOneChild);
+ }
+
+ /*
+ * Case the second element has more elements than the first
+ */
+ Iterator twoIter = elementTwo.getChildElements();
+ while (twoIter.hasNext()) {
+ OMElement elementTwoChild = (OMElement) twoIter.next();
+ if (!isIgnorable(elementTwoChild) && !map.containsKey(elementTwoChild.getQName())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static boolean compareAttibutes(OMElement elementOne, OMElement elementTwo) {
+ int elementOneAtribCount = 0;
+ int elementTwoAtribCount = 0;
+ Iterator oneIter = elementOne.getAllAttributes();
+ while (oneIter.hasNext()) {
+
+ /*
+ * This catches a case where the first one has more items than the second one (one.attributes.size >
+ * two.attributes.size) and a case where the first and the second have a different attributes.
+ * (one.attributes.size == two.attributes.size)
+ */
+ OMAttribute omAttribute = (OMAttribute) oneIter.next();
+ OMAttribute attr = elementTwo.getAttribute(omAttribute.getQName());
+ if (attr == null) {
+ log.debug("Attribute " + omAttribute + " is not found in both elements.");
+ return false;
+ }
+ /*
+ * Count attributes in the first item
+ */
+ elementOneAtribCount++;
+ }
+
+ /*
+ * Count attributes in the second item
+ */
+ Iterator elementTwoIter = elementTwo.getAllAttributes();
+ while (elementTwoIter.hasNext()) {
+ elementTwoIter.next();
+ elementTwoAtribCount++;
+ }
+
+ /*
+ * This catches a case where the second one has more items than the first one. (two.attributes.size >
+ * one.attributes.size)
+ */
+ log.debug("Number of Attributes are equal? : " + (elementOneAtribCount == elementTwoAtribCount));
+ return elementOneAtribCount == elementTwoAtribCount;
+ }
+
+ /*
+ * Compare only URI not prefix
+ */
+ private static boolean compare(OMNamespace x, OMNamespace y) {
+ log.debug("Compare namespace:" + x + " with " + y);
+ return (x == null && y == null)
+ || (x != null && y != null && BrokerUtil.sameStringValue(x.getNamespaceURI(), y.getNamespaceURI()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
new file mode 100644
index 0000000..06d435a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WSMGParameter.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.config;
+
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
+
+public class WSMGParameter {
+
+ /**
+ * Global variable for the Out Going queue (contains message to send to subscribers)
+ */
+ public static WsmgQueue OUT_GOING_QUEUE = null; // default=null
+
+ public static final boolean testOutGoingQueueMaxiumLength = false; // default=false
+
+ // enable or disable the TimerThread that displays the message rate
+ public static final boolean measureMessageRate = false; // default=false
+
+ public static final boolean enableAutoCleanSubscriptions = false; // default=true
+
+ public static final boolean debugYFilter = false;
+
+ public static final boolean cleanQueueonStartUp = false; // default=true
+ public static final boolean requireSubscriptionRenew = true;
+ public static final long expirationTime = 1000 * 60 * 60 * 72; // 72 hours
+
+ public static final boolean showTrackId = false;
+ public static final String versionSetUpNote = "Added_Sub_Timeout";
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
new file mode 100644
index 0000000..c12f460
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/config/WsmgConfigurationContext.java
@@ -0,0 +1,118 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.airavata.wsmg.broker.NotificationProcessor;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionManager;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.commons.storage.WsmgQueue;
+import org.apache.airavata.wsmg.commons.storage.WsmgStorage;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.matching.XPath.YFilterMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+
+public class WsmgConfigurationContext {
+
+ private OutGoingQueue outgoingQueue = null;
+
+ private List<AbstractMessageMatcher> messageMatchers = new LinkedList<AbstractMessageMatcher>();
+
+ private ReentrantReadWriteLock messegeMatchersLock = new ReentrantReadWriteLock();
+
+ private ConfigurationManager configurationManager;
+
+ private SubscriptionManager subscriptionMan;
+
+ private NotificationProcessor notificationProcessor;
+
+ private WsmgStorage storage;
+
+ private WsmgQueue queue;
+
+ public WsmgConfigurationContext() {
+ outgoingQueue = new OutGoingQueue();
+ setDirectFilter();
+ }
+
+ private void setDirectFilter() {
+ messageMatchers.add(new YFilterMessageMatcher());
+ // messageMatchers.add(new DirectWsntMessageMatcher(subscriptions,
+ // publisherRegistrationDB));
+ }
+
+ public List<AbstractMessageMatcher> getMessageMatchers() {
+ return messageMatchers;
+ }
+
+ public OutGoingQueue getOutgoingQueue() {
+ return outgoingQueue;
+ }
+
+ public ConfigurationManager getConfigurationManager() {
+ return configurationManager;
+ }
+
+ public SubscriptionManager getSubscriptionManager() {
+ return subscriptionMan;
+ }
+
+ public NotificationProcessor getNotificationProcessor() {
+ return notificationProcessor;
+ }
+
+ public void setConfigurationManager(ConfigurationManager configMan) {
+ this.configurationManager = configMan;
+ }
+
+ public void setSubscriptionManager(SubscriptionManager subMan) {
+ this.subscriptionMan = subMan;
+ }
+
+ public void setNotificationProcessor(NotificationProcessor processor) {
+ this.notificationProcessor = processor;
+ }
+
+ public WsmgStorage getStorage() {
+ return storage;
+ }
+
+ public void setStorage(WsmgStorage s) {
+ storage = s;
+ }
+
+ public WsmgQueue getQueue() {
+ return queue;
+ }
+
+ public void setQueue(WsmgQueue s) {
+ queue = s;
+ }
+
+ public ReentrantReadWriteLock getMessegeMatcherLock() {
+ return messegeMatchersLock;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
new file mode 100644
index 0000000..dbd16f6
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/AbstractMessageMatcher.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+
+public abstract class AbstractMessageMatcher {
+
+ protected Map<String, String> currentMessageCache;
+
+ private ReentrantReadWriteLock consumerListLock = new ReentrantReadWriteLock();
+
+ // infer types of
+ // key and value
+ public AbstractMessageMatcher() {
+ this.currentMessageCache = new HashMap<String, String>();
+ }
+
+ public abstract void start(String carrierLocation);
+
+ // Message can be either String or XmlElement. Added XMLElement for
+ // performance consideration so that if not using queue,
+ // we don't need to serialize to String
+ // If we already serialized to String because of the using queue, we don't
+ // have to change back to XMLElement until the delivery to consumers
+
+ public abstract void populateMatches(String wsntMessageConverterClassName,
+ AdditionalMessageContent additionalMessageContent, String message, String topic,
+ List<ConsumerInfo> matchedConsumers);
+
+ public abstract int handleUnsubscribe(String subscriptionId);
+
+ public abstract void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId);
+
+ public String handleGetCurrentMessage(String topic) {
+ String currentMessage = currentMessageCache.get(topic);
+ return currentMessage;
+ }
+
+ public void readLockUnlockConsumers(boolean lock) {
+ ReadLock readlock = consumerListLock.readLock();
+ lockUnlock(readlock, lock);
+ }
+
+ public void writeLockUnlockConsumers(boolean lock) {
+ WriteLock writeLock = consumerListLock.writeLock();
+ lockUnlock(writeLock, lock);
+ }
+
+ private void lockUnlock(Lock l, boolean lock) {
+
+ if (lock) {
+ l.lock();
+ } else {
+ l.unlock();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
new file mode 100644
index 0000000..1881968
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterInfo.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.berkeley.cs.db.yfilter.filter.EXfilterBasic;
+import edu.berkeley.cs.db.yfilter.filter.SystemGlobals;
+import edu.berkeley.cs.db.yfilterplus.queryparser.Query;
+import edu.berkeley.cs.db.yfilterplus.queryparser.XPQuery;
+import edu.berkeley.cs.db.yfilterplus.xmltree.XMLTree;
+
+public class YFilterInfo {
+ private static final Logger logger = LoggerFactory.getLogger(YFilterInfo.class);
+
+ private EXfilterBasic yfilter = new EXfilterBasic();
+ private HashMap<Integer, String> yFilterIdToXPath = new HashMap<Integer, String>();
+ private HashMap<Integer, Query> yFilterIdToQuery = new HashMap<Integer, Query>();
+ private HashMap<String, Integer> xPathToYFilterId = new HashMap<String, Integer>();
+ private ConsumerListManager consumerListmanager = new ConsumerListManager();
+ private int index = 0;
+ private int counter = 0;
+
+ public EXfilterBasic getYfilter() {
+ return yfilter;
+ }
+
+ public void setYfilter(EXfilterBasic yfilter) {
+ this.yfilter = yfilter;
+ }
+
+ public HashMap<Integer, String> getYFilterIdToXPath() {
+ return yFilterIdToXPath;
+ }
+
+ public void setYFilterIdToXPath(HashMap<Integer, String> filterIdToXPath) {
+ yFilterIdToXPath = filterIdToXPath;
+ }
+
+ public void addXPathQuery(String xpathExpression, String subscriptionId, SubscriptionState subscribeRequest)
+ throws RuntimeException {
+ index++;
+ counter++;
+ if (WSMGParameter.debugYFilter)
+ logger.debug("QueryExp=" + xpathExpression);
+
+ Integer yFilterIdObj = xPathToYFilterId.get(xpathExpression);
+ int yFilterId = -1;
+ if (yFilterIdObj != null) {
+ yFilterId = yFilterIdObj.intValue();
+ } else {
+ Query query = XPQuery.parseQuery(xpathExpression, index);
+ if (query == null) {
+ throw new RuntimeException("Invalid XPath expression:" + xpathExpression);
+ }
+ if (WSMGParameter.debugYFilter)
+ logger.debug("addSubscription " + xpathExpression + " query :" + query);
+ yFilterId = yfilter.addQuery(query);
+ if (WSMGParameter.debugYFilter)
+ yfilter.printQueryIndex();
+ xPathToYFilterId.put(xpathExpression, Integer.valueOf(yFilterId));
+ yFilterIdToXPath.put(new Integer(yFilterId), xpathExpression);
+ yFilterIdToQuery.put(yFilterId, query);
+ }
+ if (WSMGParameter.debugYFilter)
+ logger.debug("YFilterId=" + yFilterId);
+
+ consumerListmanager.addToConsumerList(xpathExpression, subscribeRequest, subscriptionId);
+ }
+
+ public int removeSubscription(String subscriptionId) {
+
+ String xPath = consumerListmanager.getTokenBySubscriptionId(subscriptionId);
+ int result = consumerListmanager.removeFromConsumerList(subscriptionId, xPath);
+ if (result == 0) {
+ return 0;
+ }
+ int currentConsumerCount = consumerListmanager.getConsumerListByToken(xPath).size();
+ if (currentConsumerCount == 0) {
+ Integer yFilterId = xPathToYFilterId.get(xPath);
+ Query q = yFilterIdToQuery.get(yFilterId);
+ yfilter.deleteQuery(q, q.getQueryId());
+ yFilterIdToQuery.remove(yFilterId);
+ }
+ counter--;
+ return result;
+ }
+
+ public List<ConsumerInfo> getMatchingConsumerList(String messageString) {
+ List<ConsumerInfo> matchingConsumerList = new LinkedList<ConsumerInfo>();
+ XMLTree tree = new XMLTree(new java.io.StringReader(messageString));
+ if (WSMGParameter.debugYFilter)
+ tree.print();
+ yfilter.setEventSequence(tree.getEvents());
+ yfilter.startParsing();
+
+ // print the matched queries //
+ if (SystemGlobals.hasQueries) {
+ if (WSMGParameter.debugYFilter)
+
+ yfilter.printQueryResults(System.out);
+ } else {
+ System.out.println("no match");
+ return matchingConsumerList;
+ }
+
+ Iterator<Integer> it = (Iterator<Integer>) yfilter.getMatchedQueries().iterator();
+ while (it.hasNext()) {
+ Integer qid = it.next();
+
+ String xpath = yFilterIdToXPath.get(qid);
+ ConsumerList consumerList = consumerListmanager.getConsumerListByToken(xpath);
+
+ if (consumerList != null) {// has subscription to this topic
+ matchingConsumerList.addAll(consumerList.getConsumerList());
+ }
+ }
+ yfilter.clear();
+ return matchingConsumerList;
+ }
+
+ public int getCounter() {
+ return counter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
new file mode 100644
index 0000000..0f59ec1
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/XPath/YFilterMessageMatcher.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class YFilterMessageMatcher extends AbstractMessageMatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(YFilterMessageMatcher.class);
+
+ private OutGoingQueue outGoingQueue = null;
+
+ private HashMap<String, YFilterInfo> topicToYFilterInfo = new HashMap<String, YFilterInfo>();
+ private HashMap<String, String> subIdToTopic = new HashMap<String, String>();
+
+ // used for topic only subscription, so that we don't have to create a
+ // YFilter object
+ private ConsumerListManager consumerListmanager = new ConsumerListManager();
+
+ public YFilterMessageMatcher() {
+ super();
+ }
+
+ public void start(String carrierLocation) {
+ currentMessageCache = new Hashtable<String, String>();
+ }
+
+ @Override
+ public void populateMatches(String wsntMessageConverterClassName,
+ AdditionalMessageContent additionalMessageContent, String message, String topic,
+ List<ConsumerInfo> matchedConsumers) {
+
+ assert (matchedConsumers != null);
+
+ if (WSMGParameter.debugYFilter)
+ logger.info("Message In YFilterAdapter=" + message);
+
+ // Important Get a Read Lock....
+ readLockUnlockConsumers(true);
+ try {
+
+ // 1, Topic only
+ ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
+ if (topicConsumerList != null) {// has subscription to this topic
+
+ ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
+ matchedConsumers.addAll(list);
+ }
+ // 2, wild card topic only
+ ConsumerList wildcardConsumerList = consumerListmanager
+ .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
+ if (wildcardConsumerList != null) {// has wildcard subscriptions
+ List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
+ if (wildCardConsumerInfoList != null) {
+ // System.out.println("ConsumerListSize2="+wildCardConsumerInfoList.size());
+ matchedConsumers.addAll(wildCardConsumerInfoList);
+ }
+ }
+ // 3, topic with Xpath
+ YFilterInfo yfilterInfo = topicToYFilterInfo.get(topic);
+ if (yfilterInfo != null) {
+ List<ConsumerInfo> topicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
+ if (topicAndXPathConsumerInfoList != null) {
+ // System.out.println("ConsumerListSize3="+topicAndXPathConsumerInfoList.size());
+ matchedConsumers.addAll(topicAndXPathConsumerInfoList);
+ }
+ }
+ // 4, wild card topic with Xpath (XPath only)
+ yfilterInfo = topicToYFilterInfo.get(WsmgCommonConstants.WILDCARD_TOPIC);
+ if (yfilterInfo != null) {
+ List<ConsumerInfo> wildcardTopicAndXPathConsumerInfoList = yfilterInfo.getMatchingConsumerList(message);
+ if (wildcardTopicAndXPathConsumerInfoList != null) {
+ // System.out.println("ConsumerListSize4="+wildcardTopicAndXPathConsumerInfoList.size());
+ matchedConsumers.addAll(wildcardTopicAndXPathConsumerInfoList);
+ }
+ }
+
+ } finally {
+
+ // Release the Read Lock...
+ readLockUnlockConsumers(false);
+ }
+
+ }
+
+ public int handleUnsubscribe(String subscriptionId) {
+
+ int ret = 1;
+
+ writeLockUnlockConsumers(true);
+ try {
+ String topicExpression = subIdToTopic.get(subscriptionId);
+ if (subscriptionId.startsWith("T")) { // Topic only
+ consumerListmanager.removeFromConsumerList(subscriptionId, topicExpression);
+ } else {
+ YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
+ if (yfilterInfo != null) {
+ yfilterInfo.removeSubscription(subscriptionId);
+ if (yfilterInfo.getCounter() == 0) {
+ yfilterInfo = null;
+ topicToYFilterInfo.remove(topicExpression);
+ }
+ } else {
+ System.out.println("ERROR: Cannot find subscription with the subId=" + subscriptionId);
+ ret = 0;
+ }
+ }
+ } finally {
+ writeLockUnlockConsumers(false);
+ }
+
+ return ret;
+ }
+
+ public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
+
+ // Get the write lock
+ writeLockUnlockConsumers(true);
+ try {
+
+ String topicExpression = subscribeRequest.getLocalTopic();
+ subIdToTopic.put(subscriptionId, topicExpression);
+
+ String xpathExpression = subscribeRequest.getXpathString();
+ if (xpathExpression == null || xpathExpression.length() == 0) { // Topic
+ // only
+ consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
+ } else {
+ YFilterInfo yfilterInfo = topicToYFilterInfo.get(topicExpression);
+ if (yfilterInfo == null) {
+ yfilterInfo = new YFilterInfo();
+ topicToYFilterInfo.put(topicExpression, yfilterInfo);
+ }
+ yfilterInfo.addXPathQuery(xpathExpression, subscriptionId, subscribeRequest);
+ }
+
+ if (outGoingQueue == null) {
+ outGoingQueue = subscribeRequest.getOutGoingQueue();
+ }
+
+ } finally {
+ // release the write lock
+ writeLockUnlockConsumers(false);
+ }
+
+ return;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
new file mode 100644
index 0000000..ba50e45
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/matching/simpleTopic/DirectWsntMessageMatcher.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.simpleTopic;
+
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.broker.ConsumerList;
+import org.apache.airavata.wsmg.broker.ConsumerListManager;
+import org.apache.airavata.wsmg.broker.subscription.SubscriptionState;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
+import org.apache.airavata.wsmg.messenger.OutGoingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DirectWsntMessageMatcher extends AbstractMessageMatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(DirectWsntMessageMatcher.class);
+
+ private ConsumerListManager consumerListmanager = new ConsumerListManager();
+
+ private OutGoingQueue outGoingQueue = null;
+
+ public DirectWsntMessageMatcher() {
+ super();
+ }
+
+ public void start(String carrierLocation) {
+ currentMessageCache = new Hashtable<String, String>();
+ }
+
+ public void handleSubscribe(SubscriptionState subscribeRequest, String subscriptionId) {
+
+ String topicExpression = subscribeRequest.getLocalTopic();
+ if (topicExpression == null || topicExpression.length() == 0) {
+ logger.error("ERROR:WsntAdapterConnection creation failed.");
+ return;
+ }
+
+ writeLockUnlockConsumers(true);
+
+ try {
+ consumerListmanager.addToConsumerList(topicExpression, subscribeRequest, subscriptionId);
+ if (outGoingQueue == null) {
+ outGoingQueue = subscribeRequest.getOutGoingQueue();
+ }
+ } finally {
+ writeLockUnlockConsumers(false);
+ }
+
+ return;
+
+ }
+
+ public int handleUnsubscribe(String subscriptionId) {
+
+ int ret = 0;
+
+ writeLockUnlockConsumers(true);
+ try {
+ ret = consumerListmanager.removeFromConsumerList(subscriptionId, null);
+ } finally {
+ writeLockUnlockConsumers(false);
+ }
+
+ return ret;
+ }
+
+ @Override
+ public void populateMatches(String wsntMessageConverterClassName,
+ AdditionalMessageContent additionalMessageContent, String message, String topic,
+ List<ConsumerInfo> matchedConsumers) {
+
+ assert (matchedConsumers != null);
+
+ readLockUnlockConsumers(true);
+
+ try {
+
+ ConsumerList topicConsumerList = consumerListmanager.getConsumerListByToken(topic);
+ ConsumerList wildcardConsumerList = consumerListmanager
+ .getConsumerListByToken(WsmgCommonConstants.WILDCARD_TOPIC);
+ if (topicConsumerList != null) {// has subscription to this topic
+
+ ArrayList<ConsumerInfo> list = topicConsumerList.getConsumerList();
+
+ matchedConsumers.addAll(list);
+ }
+ if (wildcardConsumerList != null) {// has wildcard subscriptions
+ List<ConsumerInfo> wildCardConsumerInfoList = wildcardConsumerList.getConsumerList();
+ if (wildCardConsumerInfoList != null) {
+ matchedConsumers.addAll(wildCardConsumerInfoList);
+ }
+ }
+
+ } finally {
+ readLockUnlockConsumers(false);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
new file mode 100644
index 0000000..c9b255f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/ConsumerUrlManager.java
@@ -0,0 +1,195 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
+import java.util.Map.Entry;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.airavata.wsmg.commons.config.ConfigurationManager;
+import org.apache.airavata.wsmg.util.RunTimeStatistics;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerUrlManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ConsumerUrlManager.class);
+
+ private ConcurrentHashMap<String, FailedConsumerInfo> failedConsumerUrls = new ConcurrentHashMap<String, FailedConsumerInfo>();
+
+ private final int defaultMaxRetry;
+
+ private long expireTimeGap; // milliseconds
+
+ private Timer cleanupTimer;
+
+ public ConsumerUrlManager(ConfigurationManager config) {
+
+ defaultMaxRetry = config.getConfig(WsmgCommonConstants.CONFIG_MAX_MESSAGE_DELIVER_RETRIES, 2);
+
+ // time is in milliseconds
+ expireTimeGap = 1000 * 60 * config.getConfig(WsmgCommonConstants.CONFIG_CONSUMER_URL_EXPIRATION_TIME_GAP, 5l);
+
+ // let minimum time to be 1 minute
+ long timerThreadInterval = Math.max(expireTimeGap / 5, 1000 * 60);
+
+ cleanupTimer = new Timer("Failed consumer url handler", true);
+ cleanupTimer.scheduleAtFixedRate(new URLCleanUpTask(), 0, timerThreadInterval);
+ }
+
+ public void stop() {
+ logger.info("Stop ConsumerUrlManager");
+ if (this.cleanupTimer != null) {
+ this.cleanupTimer.cancel();
+ }
+ logger.info("ConsumerUrlManager Stopped");
+ }
+
+ public void onFailedDelivery(EndpointReference consumerEndpointReference, long timeFinished, long timeTaken,
+ AxisFault exception, AdditionalMessageContent headers) {
+ String url = consumerEndpointReference.getAddress();
+
+ RunTimeStatistics.addNewFailedDeliverTime(timeTaken);
+ RunTimeStatistics.addFailedConsumerURL(url);
+
+ if (isEligibleToBlackList(exception)) {
+
+ synchronized (failedConsumerUrls) {
+ FailedConsumerInfo info = failedConsumerUrls.get(url);
+ if (info == null) {
+ info = new FailedConsumerInfo();
+ failedConsumerUrls.put(url, info);
+ }
+ info.incrementNumberOfTimesTried(timeFinished + expireTimeGap);
+ }
+
+ } else {
+
+ String errorMsg = String.format("unable to deliver message: [%s] to consumer: [%s], " + "reason: [%s]",
+ headers.toString(), url, exception.getMessage());
+
+ logger.error(errorMsg);
+ }
+ }
+
+ public void onSucessfullDelivery(EndpointReference consumerEndpointReference, long timeTaken) {
+
+ RunTimeStatistics.addNewSuccessfulDeliverTime(timeTaken);
+ synchronized (failedConsumerUrls) {
+
+ FailedConsumerInfo info = failedConsumerUrls.remove(consumerEndpointReference.getAddress());
+
+ if (info != null) {
+ logger.debug(String.format("message was delivered to " + "previously %d times failed url : %s",
+ info.getNumberOfTimesTried(), consumerEndpointReference.getAddress()));
+ }
+ }
+ }
+
+ public boolean isUnavailable(String url) {
+ synchronized (failedConsumerUrls) {
+ FailedConsumerInfo info = failedConsumerUrls.get(url);
+ return (info != null && info.isMaxRetryCountReached());
+ }
+ }
+
+ private boolean isEligibleToBlackList(AxisFault f) {
+
+ Throwable cause = f.getCause();
+
+ if (cause == null) {
+ logger.error("unknown error occured", cause);
+ return false;
+ }
+
+ /*
+ * if timeout because of the set timeout in this class In windows, timeout cause ConnectException with
+ * "Connection timed out" message
+ */
+ if (cause instanceof SocketTimeoutException || cause.getMessage().indexOf("timed out") > 0
+ || cause instanceof NoRouteToHostException) {
+ return true;
+ }
+
+ return false;
+ }
+
+ class FailedConsumerInfo {
+
+ private int numberOfTimesTried;
+ private long expiryTime;
+
+ public void incrementNumberOfTimesTried(long expireTime) {
+ numberOfTimesTried++;
+ expiryTime = expireTime;
+ }
+
+ public void decrementNumberOfTimeTried() {
+ numberOfTimesTried--;
+ }
+
+ public int getNumberOfTimesTried() {
+ return numberOfTimesTried;
+ }
+
+ public boolean isMaxRetryCountReached() {
+ return numberOfTimesTried >= defaultMaxRetry;
+ }
+
+ public long getLastAtteptExpiryTime() {
+ return expiryTime;
+ }
+
+ }
+
+ class URLCleanUpTask extends TimerTask {
+
+ @Override
+ public void run() {
+
+ logger.debug("starting to clean up black listed consumer urls");
+ long currentTime = System.currentTimeMillis();
+
+ synchronized (failedConsumerUrls) {
+ for (Entry<String, FailedConsumerInfo> entry : failedConsumerUrls.entrySet()) {
+ FailedConsumerInfo info = entry.getValue();
+
+ if (info.isMaxRetryCountReached() && info.getLastAtteptExpiryTime() >= currentTime) {
+
+ info.decrementNumberOfTimeTried();
+ logger.info("decrementing number of times" + " tried for consumer url: " + entry.getKey());
+
+ }
+ }
+ }
+
+ logger.debug("finished cleaning black listed consumer urls");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
new file mode 100644
index 0000000..92b6cfe
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/Deliverable.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.axiom.om.OMElement;
+
+public interface Deliverable {
+ void setProtocol(DeliveryProtocol protocol);
+
+ void send(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
new file mode 100644
index 0000000..7eb5b4f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/DeliveryProcessor.java
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeliveryProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(DeliveryProcessor.class);
+
+ private SendingStrategy strategy;
+ private Deliverable deliverable;
+
+ private boolean running;
+ private Thread t;
+
+ public DeliveryProcessor(Deliverable deliverable, SendingStrategy strategy) {
+ this.strategy = strategy;
+ this.deliverable = deliverable;
+ }
+
+ public void start() {
+ this.running = true;
+ this.t = new Thread(new CheckingAndSending());
+ this.t.start();
+ }
+
+ public void stop() {
+ this.running = false;
+
+ if (this.t != null) {
+ this.t.interrupt();
+
+ try {
+ this.t.join();
+ } catch (InterruptedException ie) {
+ logger.error("Wait for sending thread to finish (join) is interrupted");
+ }
+ }
+
+ WSMGParameter.OUT_GOING_QUEUE.dispose();
+ }
+
+ private class CheckingAndSending implements Runnable {
+
+ public void run() {
+ strategy.init();
+ while (running) {
+ logger.debug("run - delivery thread");
+ try {
+
+ OutGoingMessage outGoingMessage = (OutGoingMessage) WSMGParameter.OUT_GOING_QUEUE.blockingDequeue();
+
+ if (WSMGParameter.showTrackId)
+ logger.debug(outGoingMessage.getAdditionalMessageContent().getTrackId()
+ + ": dequeued from outgoing queue");
+
+ strategy.addMessageToSend(outGoingMessage, deliverable);
+
+ } catch (Exception e) {
+ logger.warn("Unexpected_exception:");
+ }
+ }
+ logger.debug("Shutdown Strategy");
+ strategy.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
new file mode 100644
index 0000000..6764a42
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/OutGoingQueue.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.util.Counter;
+import org.apache.airavata.wsmg.util.TimerThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OutGoingQueue {
+
+ private static final Logger logger = LoggerFactory.getLogger(OutGoingQueue.class);
+
+ private Counter storeToOutQueueCounter;
+
+ public OutGoingQueue() {
+ if (WSMGParameter.measureMessageRate) {
+ storeToOutQueueCounter = new Counter();
+ TimerThread timerThread = new TimerThread(storeToOutQueueCounter, " StoreToOutQueueCounter");
+ new Thread(timerThread).start();
+ }
+ }
+
+ // need synchronized???
+ public void storeNotification(OutGoingMessage outGoingMessage, long messageId) {
+
+ boolean loop = false;
+ do {
+ // this outgoing Queue is created inside the messenger which is
+ // intended to send the notification message to the consumer.
+ WSMGParameter.OUT_GOING_QUEUE.enqueue(outGoingMessage, outGoingMessage.getAdditionalMessageContent()
+ .getTrackId());
+ if (WSMGParameter.measureMessageRate) {
+ storeToOutQueueCounter.addCounter();
+ }
+ if (WSMGParameter.testOutGoingQueueMaxiumLength && storeToOutQueueCounter.getCounterValue() < 1000000) {
+ loop = true;
+ }else{
+ loop = false;
+ }
+ } while (loop);
+
+ }
+}