You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:29 UTC
[87/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
new file mode 100644
index 0000000..a2eb271
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/MsgBoxServiceSkeleton.java
@@ -0,0 +1,186 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox;
+
+import java.io.StringReader;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.airavata.wsmg.commons.MsgBoxQNameConstants;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
+import org.apache.airavata.wsmg.msgbox.util.ConfigKeys;
+import org.apache.airavata.wsmg.msgbox.util.MsgBoxCommonConstants;
+import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ServiceContext;
+import org.apache.axis2.service.Lifecycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.beans.editors.LongEditor;
+
+/**
+ * Service class for MsgBoxService this get called by MsgBoxServiceMessageReceiverInOut with ProcessingContext
+ */
+public class MsgBoxServiceSkeleton implements Lifecycle {
+
+ private static final Logger logger = LoggerFactory.getLogger(MsgBoxServiceSkeleton.class);
+ private static final String TRUE = Boolean.toString(true);
+ private static final String FALSE = Boolean.toString(false);
+ private static OMFactory factory = OMAbstractFactory.getOMFactory();
+ private MsgBoxStorage storage;
+ private Thread deletingThread;
+ private boolean stop;
+
+ public void init(ServiceContext context) throws AxisFault {
+ this.storage = (MsgBoxStorage) context.getConfigurationContext().getProperty(
+ MsgBoxCommonConstants.MSGBOX_STORAGE);
+ logger.debug("Starting cleanup thread for Message Box...");
+ deletingThread = new Thread(new DeleteOldMessageRunnable(context.getConfigurationContext().getProperty(ConfigKeys.MSG_PRESV_INTERVAL)));
+ deletingThread.start();
+ }
+
+ public void destroy(ServiceContext context) {
+ this.storage = null;
+
+ // stop Deleting thread
+ this.stop = true;
+ this.deletingThread.interrupt();
+
+ try {
+ deletingThread.join();
+ } catch (Exception e) {
+ logger.error("Cannot shutdown cleanup thread", e);
+ }
+ }
+
+ public OMElement createMsgBox() throws Exception {
+ try {
+ String createdMsgBoxId = storage.createMsgBox();
+
+ logger.debug("MsgBox created:" + createdMsgBoxId);
+
+ /*
+ * Output response
+ */
+ OMElement dd = factory.createOMElement(MsgBoxQNameConstants.CREATE_MSGBOX_RESP_QNAME);
+ OMElement url = factory.createOMElement(MsgBoxQNameConstants.MSG_BOXID_QNAME);
+ url.setText(createdMsgBoxId);
+ dd.addChild(url);
+ return dd;
+ } catch (Exception e) {
+ logger.error("Error creating the message box", e);
+ AxisFault f = new AxisFault("Error creating the message box", "6000", e);
+ throw f;
+ }
+
+ }
+
+ public OMElement storeMessages(String msgBoxAddr, String messageID, String soapAction, OMElement message)
+ throws Exception {
+ OMElement resp = factory.createOMElement(MsgBoxQNameConstants.STOREMSG_RESP_QNAME);
+ OMElement status = factory.createOMElement(MsgBoxQNameConstants.MSGBOX_STATUS_QNAME);
+ try {
+ storage.putMessageIntoMsgBox(msgBoxAddr, messageID, soapAction, message);
+
+ logger.debug("Put Message to MsgBox:" + msgBoxAddr + " with messageID:" + messageID);
+
+ status.setText(TRUE);
+ } catch (SQLException e) {
+ logger.error("Error while storing message: " + message + " in msgbx: " + msgBoxAddr, e);
+ status.setText(FALSE);
+
+ // FIXME: Should we throw exception?? or client will read false
+ // status
+ }
+ resp.addChild(status);
+ resp.declareNamespace(NameSpaceConstants.MSG_BOX);
+ return resp;
+ }
+
+ public OMElement takeMessages(String msgBoxAddr) throws Exception {
+ try {
+ OMElement respEl = factory.createOMElement(MsgBoxQNameConstants.TAKE_MSGBOX_RESP_QNAME);
+ OMElement messageSet = factory.createOMElement(MsgBoxQNameConstants.MSGBOX_MESSAGE_QNAME);
+
+ List<String> list = storage.takeMessagesFromMsgBox(msgBoxAddr);
+ if (list != null && list.size() != 0) {
+ for (String string : list) {
+ messageSet.addChild(MsgBoxUtils.reader2OMElement(new StringReader(string)));
+ }
+ logger.debug("Take all messages from MsgBox:" + msgBoxAddr);
+ } else {
+ logger.debug(" no messages.. ");
+ }
+ respEl.addChild(messageSet);
+ respEl.declareNamespace(NameSpaceConstants.MSG_BOX);
+ return respEl;
+ } catch (Exception e) {
+ logger.error("Error taking mesages of message box: " + msgBoxAddr, e);
+ throw e;
+ }
+ }
+
+ public OMElement destroyMsgBox(String msgBoxAddr) throws Exception {
+ OMElement respEl = factory.createOMElement(MsgBoxQNameConstants.DESTROY_MSGBOX_RESP_QNAME);
+ OMElement statusEl = factory.createOMElement(MsgBoxQNameConstants.MSGBOX_STATUS_QNAME);
+ String addr = msgBoxAddr;
+ try {
+ storage.destroyMsgBox(addr);
+ logger.debug("Destry MsgBox:" + msgBoxAddr);
+ statusEl.setText(TRUE);
+ } catch (Exception e) {
+ logger.warn("Error while delete msgbx: " + msgBoxAddr, e);
+ statusEl.setText(FALSE);
+
+ // FIXME: Should we throw exception?? or client will read false
+ // status
+ }
+ respEl.addChild(statusEl);
+ return respEl;
+ }
+
+ class DeleteOldMessageRunnable implements Runnable {
+ long longInterval = 60 * 60 * 1000;
+ DeleteOldMessageRunnable(Object inveral){
+ longInterval = (Long)inveral;
+ }
+ public void run() {
+ while (!stop) {
+ try {
+ // try to remove old message
+ if (storage != null) {
+ storage.removeAncientMessages();
+ }
+ Thread.sleep(longInterval);
+ } catch (Exception e) {
+ logger.warn("Msgbox cleanup thread is interrupted to close");
+ }
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
new file mode 100644
index 0000000..e095e3c
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/MsgBoxStorage.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.Storage;
+
+import java.util.List;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * Message Box storage backend. This has implemented in two ways in-memory and database.
+ */
+public interface MsgBoxStorage {
+ public String createMsgBox() throws Exception;
+
+ public void destroyMsgBox(String key) throws Exception;
+
+ /**
+ * IMPORTANT::: List retrieved from this method is sorted by time in ascending order i.e the newest message will
+ * appear as the last item in the list.
+ *
+ * @param key
+ * @return
+ * @throws Exception
+ */
+ public List<String> takeMessagesFromMsgBox(String key) throws Exception;
+
+ public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
+ throws Exception;
+
+ /**
+ * The ancientness is defined in the db.config file.
+ */
+ public void removeAncientMessages() throws Exception;
+
+ /**
+ * Clean up method
+ */
+ public void dispose();
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
new file mode 100644
index 0000000..2cec984
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/DatabaseStorageImpl.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.Storage.dbpool;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.SQLNonTransientConnectionException;
+import java.util.List;
+import java.util.UUID;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.commons.storage.DatabaseCreator;
+import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
+import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Database message Storage Implementation, if airavata-server.properties configured to use database this will set as the storage
+ * for MsgBoxSerivceSkeleton
+ */
+public class DatabaseStorageImpl implements MsgBoxStorage {
+
+ private static final Logger logger = LoggerFactory.getLogger(DatabaseStorageImpl.class);
+
+ private static final String TABLE_NAME_TO_CHECK = "msgbox";
+
+ private JdbcStorage db;
+
+ public DatabaseStorageImpl(String jdbcUrl, String jdbcDriver, long timeOfOldMessage) {
+ db = new JdbcStorage(10, 50, jdbcUrl, jdbcDriver, true);
+
+ Connection conn = null;
+ try {
+
+ /*
+ * Check database
+ */
+ conn = db.connect();
+ if (!DatabaseCreator.isDatabaseStructureCreated(TABLE_NAME_TO_CHECK, conn)) {
+ DatabaseCreator.createMsgBoxDatabase(conn);
+ logger.info("New Database created for Message Box");
+ } else {
+ logger.debug("Database already created for Message Box!");
+ }
+
+ MessageBoxDB.initialize(db, timeOfOldMessage);
+
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new RuntimeException("Database failure");
+ } finally {
+ db.closeConnection(conn);
+ }
+ }
+
+ public String createMsgBox() throws SQLException, IOException {
+ String uuid = UUID.randomUUID().toString();
+ MessageBoxDB.getInstance().createMsgBx(uuid);
+ return uuid;
+ }
+
+ public void destroyMsgBox(String key) throws Exception {
+ try {
+ MessageBoxDB.getInstance().deleteMessageBox(key);
+ } catch (SQLException e) {
+ throw new Exception("Could not destroy the message box with key " + key, e);
+ }
+ }
+
+ public List<String> takeMessagesFromMsgBox(String key) throws Exception {
+ List<String> list = null;
+
+ try {
+ list = MessageBoxDB.getInstance().removeAllMessagesforClient(key);
+
+ }catch(SQLNonTransientConnectionException e){
+ logger.warn("Database connection is interrupted");
+ } catch (SQLException e) {
+ throw new Exception("Error reading the message with the key " + key, e);
+ } catch (IOException e) {
+ throw new Exception("Error reading the message with the key " + key, e);
+ }
+
+ return list;
+ }
+
+ public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
+ throws SQLException, IOException, XMLStreamException {
+ MessageBoxDB.getInstance().addMessage(msgBoxID, messageID, soapAction, message);
+ }
+
+ public void removeAncientMessages() throws Exception {
+ MessageBoxDB.getInstance().removeAncientMessages();
+ }
+
+ public void dispose() {
+ if (db != null) {
+ db.closeAllConnections();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
new file mode 100644
index 0000000..9cf15c8
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/dbpool/MessageBoxDB.java
@@ -0,0 +1,265 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.Storage.dbpool;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.commons.storage.JdbcStorage;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the core class which used by DatabaseStorageImpl to perform all the service operations, DatabaseStorageImpl
+ * class simply use this class in its operation methods to perform the actual functionality.
+ */
+public class MessageBoxDB {
+
+ private static final String MSGBOXES_TABLENAME = "msgBoxes";
+ private static final String MSGBOX_TABLENAME = "msgbox";
+
+ private static final String MSGBOX_TABLE_ID = "MSGBOXID";
+
+ private static final Logger logger = LoggerFactory.getLogger(MessageBoxDB.class);
+
+ private static Set<String> msgBoxids;
+
+ public static final String SELECT_ALL_FROM_MSGBOXES = "SELECT * FROM " + MSGBOXES_TABLENAME;
+
+ public static final String SQL_CREATE_MSGBOXES_STATEMENT = "INSERT INTO " + MSGBOXES_TABLENAME
+ + " (msgboxid) VALUES (?)";
+
+ public static final String SQL_DELETE_MSGBOXES_STATEMENT = "DELETE FROM " + MSGBOXES_TABLENAME
+ + " WHERE msgboxid = ?";
+
+ public static final String SQL_STORE_MESSAGE_STATEMENT = "INSERT INTO " + MSGBOX_TABLENAME
+ + " (content, msgboxid, messageid,soapaction) VALUES (?,?,?,?)";
+
+ public static final String SQL_SELECT_MSGBOX_STATEMENT = "SELECT * FROM " + MSGBOX_TABLENAME
+ + " WHERE msgboxid = ? ORDER BY time";
+
+ public static final String SQL_DELETE_MSGBOX_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME + " WHERE msgboxid = ?";
+
+ public static final String SQL_DELETE_ANCIENT_STATEMENT = "DELETE FROM " + MSGBOX_TABLENAME
+ + " WHERE {fn TIMESTAMPDIFF(SQL_TSI_SECOND, time, CURRENT_TIMESTAMP) }*1000000 > ?";
+
+ private JdbcStorage db;
+
+ private static MessageBoxDB instance;
+
+ private long time;
+
+ private MessageBoxDB(JdbcStorage db, long time) {
+ this.db = db;
+ this.time = time;
+ }
+
+ public static synchronized MessageBoxDB initialize(JdbcStorage db, long time) throws SQLException {
+ if (instance == null) {
+ instance = new MessageBoxDB(db, time);
+ setMsgBoxidList(db);
+ }
+ return instance;
+ }
+
+ public static MessageBoxDB getInstance() {
+ if (instance == null) {
+ throw new RuntimeException("Please initialize this object first using initialize(JdbcStorage, long)");
+ }
+ return instance;
+ }
+
+ public synchronized void createMsgBx(String messageBoxId) throws SQLException, IOException {
+ if (!msgBoxids.contains(messageBoxId)) {
+
+ Connection connection = null;
+ try {
+ logger.debug(MSGBOXES_TABLENAME + ":" + messageBoxId);
+
+ connection = db.connect();
+ PreparedStatement statement = connection.prepareStatement(SQL_CREATE_MSGBOXES_STATEMENT);
+ statement.setString(1, messageBoxId);
+ db.executeUpdateAndClose(statement);
+ db.commitAndFree(connection);
+
+ msgBoxids.add(messageBoxId);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
+ } else {
+ throw new IOException("The message box ID requested already exists");
+ }
+ }
+
+ public synchronized void addMessage(String msgBoxID, String messageID, String soapAction, OMElement message)
+ throws SQLException, IOException, XMLStreamException {
+ if (msgBoxids.contains(msgBoxID)) {
+
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SQL_STORE_MESSAGE_STATEMENT);
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(output);
+ out.writeObject(message.toStringWithConsume());
+ byte[] buffer = output.toByteArray();
+ ByteArrayInputStream in = new ByteArrayInputStream(buffer);
+ stmt.setBinaryStream(1, in, buffer.length);
+ stmt.setString(2, msgBoxID);
+ stmt.setString(3, messageID);
+ stmt.setString(4, soapAction);
+
+ db.executeUpdateAndClose(stmt);
+ db.commitAndFree(connection);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
+ } else {
+ throw new IOException("Currently a messagebox is not available with given message box id :" + msgBoxID + message.toStringWithConsume());
+ }
+ }
+
+ public synchronized void deleteMessageBox(String msgBoxId) throws SQLException {
+
+ if (msgBoxids.contains(msgBoxId)) {
+
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement statement = connection.prepareStatement(SQL_DELETE_MSGBOXES_STATEMENT);
+ statement.setString(1, msgBoxId);
+ db.executeUpdateAndClose(statement);
+ statement = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+ statement.setString(1, msgBoxId);
+ db.executeUpdateAndClose(statement);
+
+ // commit
+ db.commitAndFree(connection);
+
+ // remove from set
+ msgBoxids.remove(msgBoxId);
+
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ throw sql;
+ }
+ }
+ }
+
+ public synchronized List<String> removeAllMessagesforClient(String msgBoxId) throws SQLException, IOException,
+ ClassNotFoundException, XMLStreamException {
+ ArrayList<String> list = new ArrayList<String>();
+ if (msgBoxids.contains(msgBoxId)) {
+
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ PreparedStatement stmt2 = null;
+ try {
+ connection = db.connect();
+ stmt = connection.prepareStatement(SQL_SELECT_MSGBOX_STATEMENT);
+ stmt.setString(1, msgBoxId);
+ ResultSet resultSet = stmt.executeQuery();
+ while (resultSet.next()) {
+ InputStream in = resultSet.getAsciiStream("content");
+ ObjectInputStream s = new ObjectInputStream(in);
+ String xmlString = (String) s.readObject();
+ logger.debug(xmlString);
+ list.add(xmlString);
+ }
+
+ /*
+ * Delete all retrieved messages
+ */
+ stmt2 = connection.prepareStatement(SQL_DELETE_MSGBOX_STATEMENT);
+ stmt2.setString(1, msgBoxId);
+ stmt2.executeUpdate();
+
+ // commit
+ db.commit(connection);
+ } catch(SQLNonTransientConnectionException e){
+ logger.warn("Database connection is interrupted");
+ } catch (SQLException sql) {
+ db.rollback(connection);
+ throw sql;
+ } finally {
+
+ /*
+ * If there is error during query, close everything and throw error
+ */
+ db.quietlyClose(connection, stmt, stmt2);
+ }
+ }
+ return list;
+ }
+
+ public synchronized void removeAncientMessages() {
+ Connection connection = null;
+ try {
+ connection = db.connect();
+ PreparedStatement stmt = connection.prepareStatement(SQL_DELETE_ANCIENT_STATEMENT);
+ stmt.setLong(1, this.time);
+ db.executeUpdateAndCloseWithPrintLogMessages(stmt, false);
+ db.commitAndFree(connection);
+ } catch (SQLException sql) {
+ db.rollbackAndFree(connection);
+ logger.error("Caught exception while removing old entries from msgbox db table", sql);
+ }
+ }
+
+ private static void setMsgBoxidList(JdbcStorage db) throws SQLException {
+ msgBoxids = new HashSet<String>();
+
+ Connection connection = null;
+ PreparedStatement stmt = null;
+ try {
+ connection = db.connect();
+ stmt = connection.prepareStatement(SELECT_ALL_FROM_MSGBOXES);
+ ResultSet resultSet = stmt.executeQuery();
+ while (resultSet.next()) {
+ msgBoxids.add(resultSet.getString("msgboxid"));
+ }
+ db.commit(connection);
+ } catch (SQLException e) {
+ db.rollback(connection);
+ throw e;
+ } finally {
+ db.quietlyClose(connection, stmt);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java
new file mode 100644
index 0000000..7406ce4
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/Storage/memory/InMemoryImpl.java
@@ -0,0 +1,133 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.Storage.memory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.airavata.wsmg.msgbox.Storage.MsgBoxStorage;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the in memory storage implementation for MsgBoxService, this will be initialized if airavata-server.properties is
+ * configured not to use database implementation.
+ */
+public class InMemoryImpl implements MsgBoxStorage {
+ private static final Logger logger = LoggerFactory.getLogger(InMemoryImpl.class);
+
+ private HashMap<String, List<Content>> map = new HashMap<String, List<Content>>();
+
+ private long time;
+
+ public InMemoryImpl(long time) {
+ this.time = time;
+ }
+
+ public String createMsgBox() throws Exception {
+ synchronized (map) {
+ String clientid = UUID.randomUUID().toString();
+ if (map.containsKey(clientid))
+ throw new Exception("Message Box is existed with key:" + clientid);
+ map.put(clientid, new ArrayList<Content>());
+ return clientid;
+ }
+ }
+
+ public void destroyMsgBox(String key) throws Exception {
+ synchronized (map) {
+ map.remove(key);
+ }
+ }
+
+ public List<String> takeMessagesFromMsgBox(String key) throws Exception {
+ synchronized (map) {
+ List<Content> x = map.get(key);
+ ArrayList<String> result = new ArrayList<String>(x.size());
+ for (Content content : x) {
+ result.add(content.getContent());
+ }
+ map.put(key, new ArrayList<Content>());
+ return result;
+ }
+ }
+
+ public void putMessageIntoMsgBox(String msgBoxID, String messageID, String soapAction, OMElement message)
+ throws Exception {
+ synchronized (map) {
+ if (!map.containsKey(msgBoxID)) {
+ throw new IllegalArgumentException("no message box with key " + msgBoxID + " to store the msg");
+ }
+ List<Content> list = map.get(msgBoxID);
+ list.add(new Content(message.toStringWithConsume(), System.currentTimeMillis()));
+ logger.debug("Message Stored in list with key " + msgBoxID);
+ }
+ }
+
+ public void removeAncientMessages() {
+ /*
+ * O(n^2) algorithms. Better performance can be achieved with more Cache.
+ */
+ synchronized (map) {
+ long currentTime = System.currentTimeMillis();
+ Iterator<List<Content>> it = map.values().iterator();
+ while (it.hasNext()) {
+ Iterator<Content> itToRemove = it.next().iterator();
+ while (itToRemove.hasNext()) {
+ Content content = itToRemove.next();
+ if (currentTime - this.time > content.getTime()) {
+ itToRemove.remove();
+ }
+ }
+ }
+ }
+ }
+
+ public void dispose() {
+ synchronized (map) {
+ map.clear();
+ }
+ }
+
+ class Content {
+ private String content;
+ private long time;
+
+ public Content(String content, long time) {
+ this.content = content;
+ this.time = time;
+ }
+
+ public String getContent() {
+ return content;
+ }
+
+ public long getTime() {
+ return time;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java
new file mode 100644
index 0000000..51a60ae
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/StoreMessageHandler.java
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.commons.MsgBoxQNameConstants;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.AddressingFaultsHelper;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.AxisOperation;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.dispatchers.AddressingBasedDispatcher;
+import org.apache.axis2.engine.Phase;
+import org.apache.axis2.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Dispatcher is used to validate the incoming message, this is set to Handler list in MsgBoxServiceLifeCycle.
+ */
+public class StoreMessageHandler extends AddressingBasedDispatcher {
+ private static final Logger logger = LoggerFactory.getLogger(StoreMessageHandler.class);
+ private static final String WSMG_MSGSTORE_SOAP_ACTION = MsgBoxQNameConstants.STOREMSG_QNAME.getNamespaceURI()
+ + MsgBoxQNameConstants.STOREMSG_QNAME.getLocalPart();
+ private static final String ADDRESSING_VALIDATE_ACTION = "addressing.validateAction";
+
+ private Phase addressingPhase;
+ private AxisOperation messageBoxOperation;
+
+ public org.apache.axis2.engine.Handler.InvocationResponse invoke(MessageContext msgContext) throws AxisFault {
+
+ InvocationResponse response = InvocationResponse.CONTINUE;
+ if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null) {
+
+ boolean validateAction = JavaUtils.isTrue(msgContext.getProperty(ADDRESSING_VALIDATE_ACTION), true);
+ msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(false));
+
+ response = super.invoke(msgContext);
+
+ if (isForMessageBoxService(msgContext))
+ validateMsgBoxStoreOperation(msgContext);
+ if (validateAction)
+ checkAction(msgContext);
+ msgContext.setProperty(ADDRESSING_VALIDATE_ACTION, Boolean.valueOf(validateAction));
+
+ }
+ return response;
+ }
+
+ private void validateMsgBoxStoreOperation(MessageContext msgContext) {
+ if (msgContext.getAxisOperation() == null) {
+ AxisService service = msgContext.getAxisService();
+ AxisOperation storeMsgOperation = getMessageBoxOperation(service);
+
+ msgContext.setAxisOperation(storeMsgOperation);
+ }
+ }
+
+ private boolean isForMessageBoxService(MessageContext msgContext) {
+ return msgContext.getAxisService() != null && msgContext.getAxisService().getName().equals("MsgBoxService");
+ }
+
+ private AxisOperation getMessageBoxOperation(AxisService msgBoxService) {
+ if (messageBoxOperation == null)
+ messageBoxOperation = msgBoxService.getOperationBySOAPAction(WSMG_MSGSTORE_SOAP_ACTION);
+ return messageBoxOperation;
+ }
+
+ private void checkAction(MessageContext msgContext) throws AxisFault {
+
+ Phase addPhase = getAddressingPhase(msgContext);
+
+ if (addPhase == null) {
+ logger.error("unable to locate addressing phase object");
+ }
+ if (msgContext != null) {
+ if (msgContext.getCurrentPhaseIndex() + 1 == addPhase.getHandlerCount()) {
+ if (msgContext.getAxisService() == null || msgContext.getAxisOperation() == null)
+ AddressingFaultsHelper.triggerActionNotSupportedFault(msgContext, msgContext.getWSAAction());
+ }
+ }
+ }
+
+ private Phase getAddressingPhase(MessageContext context) {
+
+ if (addressingPhase == null) {
+
+ List<Phase> inFlowPhases = context.getConfigurationContext().getAxisConfiguration().getPhasesInfo()
+ .getINPhases();
+
+ for (Phase p : inFlowPhases) {
+ if (p.getName().equalsIgnoreCase("Addressing")) {
+ addressingPhase = p;
+ }
+ }
+
+ }
+
+ return addressingPhase;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java
new file mode 100644
index 0000000..7069f40
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/ConfigKeys.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.util;
+
+public class ConfigKeys {
+// public static final String CONFIG_FILE_NAME = "airavata-server.properties";
+ public static final String MSG_PRESV_DAYS = "messagePreservationDays";
+ public static final String MSG_PRESV_HRS = "messagePreservationHours";
+ public static final String MSG_PRESV_MINS = "messagePreservationMinutes";
+ public static final String MSG_BOX_JDBC_DRIVER = "msgBox.jdbc.driver";
+ public static final String MSG_BOX_JDBC_URL = "msgBox.jdbc.url";
+ public static final String USE_DATABASE_STORAGE = "msgBox.usedatabase";
+ public static final String MSG_PRESV_INTERVAL_DAYS = "messagePreservationIntervalDays";
+ public static final String MSG_PRESV_INTERVAL_HRS = "messagePreservationIntervalHours";
+ public static final String MSG_PRESV_INTERVAL_MINS = "messagePreservationIntervalMinutes";
+ public static final String MSG_PRESV_INTERVAL = "messagePreservationInterval";
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java
new file mode 100644
index 0000000..cd63b83
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxCommonConstants.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.util;
+
+public class MsgBoxCommonConstants {
+ public static final String MSGBOX_STORAGE = "msgbox.storage";
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxOperations.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxOperations.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxOperations.java
new file mode 100644
index 0000000..6e74c2e
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxOperations.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.util;
+
+public enum MsgBoxOperations {
+ STORE_MSGS("storeMessages"), DESTROY_MSGBOX("destroyMsgBox"), TAKE_MSGS("takeMessages"), CREATE_MSGBOX(
+ "createMsgBox");
+
+ private final String name;
+
+ private MsgBoxOperations(String n) {
+ name = n;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public boolean equals(String s) {
+ return name.equals(s);
+ }
+
+ public static MsgBoxOperations valueFrom(String s) {
+ for (MsgBoxOperations status : MsgBoxOperations.values()) {
+ if (status.toString().equalsIgnoreCase(s)) {
+ return status;
+ }
+
+ }
+
+ throw new RuntimeException("invalid WsEventingOperation:- " + s);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java
new file mode 100644
index 0000000..8b523a0
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/java/org/apache/airavata/wsmg/msgbox/util/MsgBoxUtils.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox.util;
+
+import java.io.Reader;
+
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.impl.builder.StAXSOAPModelBuilder;
+
+public class MsgBoxUtils {
+
+ public static SOAPEnvelope reader2SOAPEnvilope(Reader reader) throws XMLStreamException {
+ XMLInputFactory inputFactory = XMLInputFactory.newInstance();
+ XMLStreamReader inflow = inputFactory.createXMLStreamReader(reader);
+
+ StAXSOAPModelBuilder builder = new StAXSOAPModelBuilder(inflow);
+ SOAPEnvelope omEnvelope = builder.getSOAPEnvelope();
+ return omEnvelope;
+ }
+
+ public static OMElement reader2OMElement(Reader reader) throws XMLStreamException {
+ XMLInputFactory inputFactory = XMLInputFactory.newInstance();
+ XMLStreamReader inflow = inputFactory.createXMLStreamReader(reader);
+
+ StAXOMBuilder builder = new StAXOMBuilder(inflow);
+ OMElement omElement = builder.getDocumentElement();
+ return omElement;
+ }
+
+ public static String formatMessageBoxUrl(String msgBoxServiceUrl, String msgboxId) {
+ return msgBoxServiceUrl.endsWith("/") ? msgBoxServiceUrl + "clientid/" + msgboxId : msgBoxServiceUrl
+ + "/clientid/" + msgboxId;
+ }
+
+ public static String formatURLString(String url) {
+
+ if (url == null) {
+ throw new IllegalArgumentException("url can't be null");
+ }
+
+ if (url.indexOf("//") < 0) {
+ url = "http://" + url; // use default http
+ }
+ return url;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-derby.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-derby.sql b/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-derby.sql
new file mode 100644
index 0000000..d1ff141
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-derby.sql
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE TABLE MSGBOXES (
+ MSGBOXID VARCHAR(100) NOT NULL DEFAULT '',
+ PRIMARY KEY (MSGBOXID)
+);
+
+CREATE TABLE MSGBOX (
+ ID INTEGER NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+ CONTENT BLOB,
+ MSGBOXID VARCHAR(100) NOT NULL DEFAULT '',
+ MESSAGEID VARCHAR(100) DEFAULT '',
+ SOAPACTION VARCHAR(100) DEFAULT '',
+ TIME TIMESTAMP ,
+ PRIMARY KEY (ID))
+;
+
+CREATE TRIGGER MESSAGE_TIME
+ AFTER INSERT ON MSGBOX
+ REFERENCING NEW AS NEW
+ FOR EACH ROW MODE DB2SQL
+ UPDATE MSGBOX SET TIME = CURRENT_TIMESTAMP WHERE ID = NEW.ID;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-mysql.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-mysql.sql b/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-mysql.sql
new file mode 100755
index 0000000..a2f730b
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/resources/database_scripts/msgBox-mysql.sql
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE DATABASE IF NOT EXISTS wsmg;
+
+DROP TABLE IF EXISTS msgBoxes;
+
+CREATE TABLE `msgBoxes` (
+ `msgboxid` varchar(100) NOT NULL default '',
+ PRIMARY KEY (`msgboxid`)
+) ENGINE=MyISAM DEFAULT CHARSET=latin1;
+
+DROP TABLE IF EXISTS msgbox;
+
+CREATE TABLE `msgbox` (
+ `id` int(11) NOT NULL auto_increment,
+ `content` longblob NOT NULL,
+ `msgboxid` varchar(100) NOT NULL default '""',
+ `messageid` varchar(100) default '""',
+ `soapaction` varchar(100) default '""',
+ `time` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
+ PRIMARY KEY (`id`),
+ KEY `MSGBOXID` (`msgboxid`)
+) ENGINE=MyISAM AUTO_INCREMENT=7665 DEFAULT CHARSET=latin1;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/main/resources/services.xml
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/main/resources/services.xml b/modules/ws-messenger/messagebox/src/main/resources/services.xml
new file mode 100644
index 0000000..18dc22e
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/main/resources/services.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<serviceGroup>
+ <service name="MsgBoxService" scope="application" class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceLifeCycle">
+ <messageReceivers>
+ <messageReceiver mep="http://www.w3.org/ns/wsdl/in-out" class="org.apache.airavata.wsmg.msgbox.MsgBoxServiceMessageReceiverInOut" />
+ </messageReceivers>
+ <parameter name="ServiceClass">org.apache.airavata.wsmg.msgbox.MsgBoxServiceSkeleton
+ </parameter>
+ <parameter name="useOriginalwsdl">false</parameter>
+ <parameter name="modifyUserWSDLPortAddress">true</parameter>
+ <operation name="storeMessages" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/ws-messenger/msgbox/2011/">
+ <actionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/storeMessages
+ </actionMapping>
+ <outputActionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/storeMessagesResponse
+ </outputActionMapping>
+ </operation>
+ <operation name="destroyMsgBox" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/ws-messenger/msgbox/2011/">
+ <actionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/destroyMsgBox
+ </actionMapping>
+ <outputActionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/destroyMsgBoxResponse
+ </outputActionMapping>
+ </operation>
+ <operation name="takeMessages" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/ws-messenger/msgbox/2011/">
+ <actionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/takeMessages
+ </actionMapping>
+ <outputActionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/takeMessagesResponse
+ </outputActionMapping>
+ </operation>
+ <operation name="createMsgBox" mep="http://www.w3.org/ns/wsdl/in-out" namespace="http://org.apache.airavata/ws-messenger/msgbox/2011/">
+ <actionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/createMsgBox
+ </actionMapping>
+ <outputActionMapping>http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/createMsgBoxResponse
+ </outputActionMapping>
+ </operation>
+ </service>
+
+</serviceGroup>
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/InMemoryMessageBoxServer.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/InMemoryMessageBoxServer.java b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/InMemoryMessageBoxServer.java
new file mode 100644
index 0000000..0ba8d43
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/InMemoryMessageBoxServer.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.ConfigurationContextFactory;
+import org.apache.axis2.description.AxisService;
+import org.apache.axis2.description.InOutAxisOperation;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.axis2.transport.http.SimpleHTTPServer;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public class InMemoryMessageBoxServer {
+ private static int count = 0;
+
+ private static SimpleHTTPServer receiver;
+
+ public static int TESTING_PORT = 7630;
+
+ public static final String FAILURE_MESSAGE = "Intentional Failure";
+
+ public static synchronized void deployService(AxisService service) throws AxisFault {
+ receiver.getConfigurationContext().getAxisConfiguration().addService(service);
+ }
+
+ public static synchronized void unDeployService(QName service) throws AxisFault {
+ receiver.getConfigurationContext().getAxisConfiguration().removeService(service.getLocalPart());
+ }
+
+ public static synchronized void start(String repository, String axis2xml) throws Exception {
+ if (count == 0) {
+ ConfigurationContext er = getNewConfigurationContext(repository, axis2xml);
+ TESTING_PORT = getAvailablePort();
+ receiver = new SimpleHTTPServer(er, TESTING_PORT);
+
+ try {
+ receiver.start();
+ System.out.print("Server started on port " + TESTING_PORT + ".....");
+ } catch (Exception e) {
+ throw AxisFault.makeFault(e);
+ }
+
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e1) {
+ throw new AxisFault("Thread interuptted", e1);
+ }
+ startMessageBox();
+ }
+ count++;
+ }
+
+ public static void startMessageBox() throws Exception {
+
+ /*
+ * Imitate service.xml
+ */
+ AxisService axisService = new AxisService("MsgBoxService");
+ axisService.setServiceLifeCycle(new MsgBoxServiceLifeCycle());
+ axisService.addParameter("ServiceClass", "org.apache.airavata.wsmg.msgbox.MsgBoxServiceSkeleton");
+ createOperation(axisService, "storeMessages", new MsgBoxServiceMessageReceiverInOut(),
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/storeMessages",
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/storeMessagesResponse");
+ createOperation(axisService, "destroyMsgBox", new MsgBoxServiceMessageReceiverInOut(),
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/destroyMsgBox",
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/destroyMsgBoxResponse");
+ createOperation(axisService, "takeMessages", new MsgBoxServiceMessageReceiverInOut(),
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/takeMessages",
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/takeMessagesResponse");
+ createOperation(axisService, "createMsgBox", new MsgBoxServiceMessageReceiverInOut(),
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/createMsgBox",
+ "http://org.apache.airavata/ws-messenger/msgbox/2011/MsgBoxPT/createMsgBoxResponse");
+
+ InMemoryMessageBoxServer.deployService(axisService);
+
+ new MsgBoxServiceLifeCycle().startUp(InMemoryMessageBoxServer.getConfigurationContext(), axisService);
+
+ }
+
+ public static void createOperation(AxisService axisService, String name, MessageReceiver messageReceiver,
+ String inputAction, String outputAction) {
+ InOutAxisOperation operation1 = new InOutAxisOperation(new QName(name));
+ operation1.setMessageReceiver(messageReceiver);
+ operation1.setOutputAction(outputAction);
+ axisService.addOperation(operation1);
+ if (inputAction != null) {
+ axisService.mapActionToOperation(inputAction, operation1);
+ }
+ }
+
+ public static ConfigurationContext getNewConfigurationContext(String repository, String axis2xml) throws Exception {
+ return ConfigurationContextFactory.createConfigurationContextFromFileSystem(repository, axis2xml);
+ }
+
+ public static synchronized void stop() throws AxisFault {
+ if (count == 1) {
+ receiver.stop();
+ while (receiver.isRunning()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ // nothing to do here
+ }
+ }
+ count = 0;
+ // tp.doStop();
+ System.out.print("Server stopped .....");
+ } else {
+ count--;
+ }
+ receiver.getConfigurationContext().terminate();
+ }
+
+ public static ConfigurationContext getConfigurationContext() {
+ return receiver.getConfigurationContext();
+ }
+
+ public static String prefixBaseDirectory(String path) {
+ return path;
+ }
+
+ public static int getAvailablePort(){
+ ServerSocket serverSocket = null;
+ try {
+ serverSocket = new ServerSocket(0);
+ serverSocket.close();
+ } catch (IOException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ return serverSocket.getLocalPort();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MessageBoxStressTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MessageBoxStressTest.java b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MessageBoxStressTest.java
new file mode 100644
index 0000000..715b90c
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MessageBoxStressTest.java
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox;
+
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.UUID;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient;
+import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.ServiceClient;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MessageBoxStressTest extends TestCase {
+ private int port = InMemoryMessageBoxServer.TESTING_PORT;
+ private long timeout = 5000L;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ InMemoryMessageBoxServer.start(null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testMessageBox() throws Exception {
+
+ MsgBoxClient user = new MsgBoxClient();
+ port = InMemoryMessageBoxServer.TESTING_PORT;
+ // test publish with Epr
+ EndpointReference msgBoxEpr = user.createMessageBox("http://localhost:" + port
+ + "/axis2/services/MsgBoxService", timeout);
+
+ System.out.println(msgBoxEpr.toString());
+ user.storeMessage(msgBoxEpr, timeout,
+ MsgBoxUtils.reader2OMElement(new StringReader("<test>A simple test message</test>")));
+
+ Iterator<OMElement> iterator = user.takeMessagesFromMsgBox(msgBoxEpr, timeout);
+ int i = 0;
+ if (iterator != null)
+ while (iterator.hasNext()) {
+ i++;
+ System.out.println("Retrieved message :" + i);
+ System.out.println(iterator.next().toStringWithConsume());
+ }
+
+
+ // test invocations with id encoded in the Url
+// port = InMemoryMessageBoxServer.getAvailablePort();
+// user = new MsgBoxClient();
+// msgBoxEpr = user.createMessageBox("http://localhost:" + port + "/axis2/services/MsgBoxService", timeout);
+ String msgBoxId = UUID.randomUUID().toString();
+ String address = msgBoxEpr.getAddress();
+ int biginIndex = address.indexOf("clientid");
+ if (biginIndex != -1) {
+ msgBoxId = address.substring(biginIndex + "clientid".length() + 1);
+ }
+ System.out.println("MSGBOX ID:" + msgBoxId);
+
+ String mesgboxUrl = "http://localhost:" + port + "/axis2/services/MsgBoxService/clientid/" + msgBoxId;
+
+ OMElement request = OMAbstractFactory.getOMFactory().createOMElement(new QName("foo"));
+ request.setText("bar");
+ ServiceClient client = null;
+
+ try {
+ client = new ServiceClient();
+ System.out.println(mesgboxUrl);
+ client.getOptions().setTo(new EndpointReference(mesgboxUrl));
+ OMElement response = client.sendReceive(request);
+ } finally {
+ client.cleanupTransport();
+ }
+
+ iterator = user.takeMessagesFromMsgBox(new EndpointReference(mesgboxUrl), timeout);
+ assertTrue(iterator.hasNext());
+ while (iterator.hasNext()) {
+ i++;
+ System.out.println("Retrieved message :" + i);
+ System.out.println(iterator.next().toStringWithConsume());
+ }
+ assertFalse(iterator.hasNext());
+
+ System.out.println("Delete message box response : " + user.deleteMsgBox(msgBoxEpr, timeout));
+ System.out.println("All tests Done");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
new file mode 100644
index 0000000..b3a3f4c
--- /dev/null
+++ b/modules/ws-messenger/messagebox/src/test/java/org/apache/airavata/wsmg/msgbox/MsgBoxTest.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.msgbox;
+
+import java.io.StringReader;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient;
+import org.apache.airavata.wsmg.msgbox.util.MsgBoxUtils;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.addressing.EndpointReference;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class MsgBoxTest extends TestCase {
+
+ private int port = InMemoryMessageBoxServer.TESTING_PORT;
+ private long timeout = 5000L;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ InMemoryMessageBoxServer.start(null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testMessageBox() throws Exception {
+
+ MsgBoxClient user = new MsgBoxClient();
+ StringBuilder builder = new StringBuilder();
+ port = InMemoryMessageBoxServer.TESTING_PORT;
+ EndpointReference msgBoxEpr = user.createMessageBox("http://localhost:" + port
+ + "/axis2/services/MsgBoxService", timeout);
+
+ for (int i = 0; i < 10; i++) {
+
+ builder.delete(0, builder.capacity());
+ Random x = new Random();
+ for (int j = 0; j < x.nextInt(50); j++) {
+ builder.append("123456789");
+ }
+
+ String msg = String.format("<msg><seq>%d</seq><fill>%s</fill></msg>", i, builder.toString());
+
+ user.storeMessage(msgBoxEpr, timeout, MsgBoxUtils.reader2OMElement(new StringReader(msg)));
+
+ Thread.sleep(200L);
+ }
+
+ Iterator<OMElement> iterator = null;
+
+ try {
+ iterator = user.takeMessagesFromMsgBox(msgBoxEpr, timeout);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if (iterator != null)
+ while (iterator.hasNext()) {
+ System.out.println(iterator.next().toStringWithConsume());
+ }
+
+ System.out.println("Delete message box response : " + user.deleteMsgBox(msgBoxEpr, 5000L));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/pom.xml
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/pom.xml b/modules/ws-messenger/messagebroker/pom.xml
new file mode 100644
index 0000000..df779a1
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (theĆ "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <parent>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-ws-messenger</artifactId>
+ <version>0.12-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>airavata-message-broker</artifactId>
+ <name>Airavata Message Broker</name>
+ <url>http://airavata.apache.org/</url>
+ <packaging>jar</packaging>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>${antrun.version}</version>
+ <executions>
+ <execution>
+ <id>restore-persistence</id>
+ <phase>prepare-package</phase>
+ <configuration>
+ <tasks>
+ <copy file="${project.build.outputDirectory}/services.xml" tofile="${project.build.outputDirectory}/META-INF/services.xml" />
+ </tasks>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messenger-commons</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-client-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-common-utils</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messenger-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.ogce</groupId>
+ <artifactId>yfilter</artifactId>
+ <version>${yfilter.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>wsdl4j</groupId>
+ <artifactId>wsdl4j</artifactId>
+ <version>1.5.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.axis2</groupId>
+ <artifactId>axis2</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.axis2</groupId>
+ <artifactId>axis2-transport-http</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.axis2</groupId>
+ <artifactId>axis2-transport-local</artifactId>
+ </dependency>
+
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ <!-- Test -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-server-configuration</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+ </properties>
+</project>
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
new file mode 100644
index 0000000..9277dcc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AbstractBrokerMsgReceiver.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.util.List;
+
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.receivers.AbstractMessageReceiver;
+import org.apache.axis2.util.MessageContextBuilder;
+
+public abstract class AbstractBrokerMsgReceiver extends AbstractMessageReceiver {
+
+ protected abstract MessageContext process(MessageContext inMsgContext, String operationName) throws AxisFault;
+
+ @Override
+ protected void invokeBusinessLogic(MessageContext inMsgContext) throws AxisFault {
+
+ String operationName = getOperationName(inMsgContext);
+ MessageContext outMsgContext = process(inMsgContext, operationName);
+
+ if (outMsgContext != null) {
+ outMsgContext.setTo(null);
+ super.replicateState(inMsgContext);
+ AxisEngine.send(outMsgContext);
+
+ }
+
+ }
+
+ protected String getOperationName(MessageContext inMsg) throws AxisFault {
+
+ org.apache.axis2.description.AxisOperation op = inMsg.getOperationContext().getAxisOperation();
+ if (op == null) {
+ throw new AxisFault(
+ "Operation is not located, if this is doclit style the SOAP-ACTION should specified via the SOAP Action to use the RawXMLProvider");
+ }
+
+ java.lang.String operationName = null;
+ if ((op.getName() == null)
+ || ((operationName = org.apache.axis2.util.JavaUtils.xmlNameToJava(op.getName().getLocalPart())) == null)) {
+ throw new AxisFault("invalid operation found");
+ }
+
+ return operationName;
+ }
+
+ protected MessageContext createOutputMessageContext(MessageContext inMsg, ProcessingContext processingContext)
+ throws AxisFault {
+
+ MessageContext outMsgContext = MessageContextBuilder.createOutMessageContext(inMsg);
+ outMsgContext.getOperationContext().addMessageContext(outMsgContext);
+
+ SOAPEnvelope outputEnvelope = getSOAPFactory(inMsg).getDefaultEnvelope();
+
+ if (processingContext.getRespMessage() != null) {
+
+ outputEnvelope.getBody().addChild(processingContext.getRespMessage());
+
+ if (processingContext.getResponseMsgNamespaces() != null) {
+ declareResponseMsgNamespace(outputEnvelope, processingContext.getResponseMsgNamespaces());
+ }
+ }
+
+ outMsgContext.setEnvelope(outputEnvelope);
+ return outMsgContext;
+ }
+
+ private void declareResponseMsgNamespace(SOAPEnvelope outputEnvelope, List<OMNamespace> namespaces) {
+
+ if (!namespaces.contains(NameSpaceConstants.WSA_NS)) {
+ namespaces.add(NameSpaceConstants.WSA_NS);// declare WSA by default
+ }
+
+ for (OMNamespace ns : namespaces) {
+ outputEnvelope.declareNamespace(ns);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
new file mode 100644
index 0000000..acf8f0f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/AdditionalMessageContent.java
@@ -0,0 +1,131 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.io.Serializable;
+
+public class AdditionalMessageContent implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = -5163025283681463108L;
+
+ String action;
+
+ String messageID;
+
+ String topicElement;
+
+ String producerReference;
+ String trackId;
+
+ /**
+ * @param action
+ * @param messageID
+ */
+ public AdditionalMessageContent(String action, String messageID) {
+ super();
+ // TODO Auto-generated constructor stub
+ this.action = action;
+ this.messageID = messageID;
+ }
+
+ /**
+ * @return Returns the action.
+ */
+ public String getAction() {
+ return action;
+ }
+
+ /**
+ * @param action
+ * The action to set.
+ */
+ public void setAction(String action) {
+ this.action = action;
+ }
+
+ /**
+ * @return Returns the messageID.
+ */
+ public String getMessageID() {
+ return messageID;
+ }
+
+ /**
+ * @param messageID
+ * The messageID to set.
+ */
+ public void setMessageID(String messageID) {
+ this.messageID = messageID;
+ }
+
+ /**
+ * @return Returns the producerReference.
+ */
+ public String getProducerReference() {
+ return producerReference;
+ }
+
+ /**
+ * @param producerReference
+ * The producerReference to set.
+ */
+ public void setProducerReference(String producerReference) {
+ this.producerReference = producerReference;
+ }
+
+ /**
+ * @return Returns the topicElement.
+ */
+ public String getTopicElement() {
+ return topicElement;
+ }
+
+ /**
+ * @param topicElement
+ * The topicElement to set.
+ */
+ public void setTopicElement(String topicElement) {
+ this.topicElement = topicElement;
+ }
+
+ /**
+ * @return Returns the trackId.
+ */
+ public String getTrackId() {
+ return trackId;
+ }
+
+ /**
+ * @param trackId
+ * The trackId to set.
+ */
+ public void setTrackId(String trackId) {
+ this.trackId = trackId;
+ }
+
+ public String toString() {
+ return String.format("msgId = %s, trackId = %s, topic = %s", messageID, trackId, topicElement);
+ }
+
+}