You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:25 UTC

[83/90] [abbrv] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
new file mode 100644
index 0000000..072cbb3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import java.io.StringReader;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * this class is not thread safe
+ * */
+public class SenderUtils implements Deliverable {
+
+    private static final Logger logger = LoggerFactory.getLogger(SenderUtils.class);
+
+    private static OMFactory factory = OMAbstractFactory.getOMFactory();
+
+    private ConsumerUrlManager urlManager;
+
+    private DeliveryProtocol protocol;
+
+    public SenderUtils(ConsumerUrlManager urlMan) {
+        urlManager = urlMan;
+    }
+
+    public void setProtocol(DeliveryProtocol protocol) {
+        this.protocol = protocol;
+    }
+
+    public void send(ConsumerInfo consumerInfo, OMElement notificationMessageBodyEl,
+            AdditionalMessageContent additionalMessageContent) {
+
+        if (consumerInfo.isPaused()) {
+            return;
+        }
+
+        if (notificationMessageBodyEl == null) {
+            logger.info("notification message is null, IGNORED");
+            return;
+        }
+
+        if (urlManager.isUnavailable(consumerInfo.getConsumerEprStr())) {
+            logger.info("consumer url is unavailable: " + consumerInfo.getConsumerEprStr());
+            return;
+        }
+
+        EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+        /*
+         * Extract message
+         */
+        OMElement message = null;
+        if (consumerInfo.getType().compareTo("wsnt") == 0) {
+            if (consumerInfo.isUseNotify()) {
+                message = wrapRawMessageToWsntWrappedFormat(notificationMessageBodyEl, additionalMessageContent);
+            } else {
+                message = notificationMessageBodyEl;
+            }
+        } else { // wse
+            message = notificationMessageBodyEl;
+        }
+
+        long timeElapsed = -1;
+        long startTime = -1;
+
+        startTime = System.currentTimeMillis();
+
+        try {
+
+            /*
+             * sending message out
+             */
+            protocol.deliver(consumerInfo, message, additionalMessageContent);
+
+            long finishTime = System.currentTimeMillis();
+            timeElapsed = finishTime - startTime;
+            if (WSMGParameter.showTrackId)
+                logger.info(String.format("track id = %s : delivered to: %s in %d ms",
+                        additionalMessageContent.getTrackId(), consumerReference.getAddress(), timeElapsed));
+
+            urlManager.onSucessfullDelivery(consumerReference, timeElapsed);
+        } catch (SendingException ex) {
+
+            long finishTime = System.currentTimeMillis();
+            timeElapsed = finishTime - startTime;
+
+            urlManager.onFailedDelivery(consumerReference, finishTime, timeElapsed, ex, additionalMessageContent);
+
+        }
+    }
+
+    public OMElement wrapRawMessageToWsntWrappedFormat(OMElement rawNotif,
+            AdditionalMessageContent additionalMessageContent) {
+
+        OMElement fullNotif = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS);
+
+        OMElement notificationMessageEl = factory.createOMElement("NotificationMessage", NameSpaceConstants.WSNT_NS,
+                fullNotif);
+
+        String topicElString = additionalMessageContent.getTopicElement();
+        if (topicElString != null) {
+            OMElement topicEl = null;
+            try {
+                topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+            } catch (XMLStreamException e) {
+                logger.error("XMLStreamreader exception when setting topicEl", e);
+            }
+            notificationMessageEl.addChild(topicEl);
+        }
+        String producerReferenceElString = additionalMessageContent.getProducerReference();
+        if (producerReferenceElString != null) {
+            OMElement producerReferenceEl = null;
+            try {
+                producerReferenceEl = CommonRoutines.reader2OMElement(new StringReader(producerReferenceElString));
+            } catch (XMLStreamException e) {
+                logger.error("XMLStreamException at creating producerReferenceEl", e);
+            }
+            notificationMessageEl.addChild(producerReferenceEl);
+        }
+
+        OMElement messageEl = factory.createOMElement("Message", NameSpaceConstants.WSNT_NS, notificationMessageEl);
+        messageEl.addChild(rawNotif);
+
+        return fullNotif;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
new file mode 100644
index 0000000..417cef7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.axiom.om.OMElement;
+
+public interface DeliveryProtocol {
+
+    public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+            throws SendingException;
+
+    public void setTimeout(long timeout);
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
new file mode 100644
index 0000000..c4dd24a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.axis2.AxisFault;
+
+public class SendingException extends AxisFault {
+
+    /**
+	 * 
+	 */
+    private static final long serialVersionUID = 6250791562500752579L;
+
+    public SendingException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
new file mode 100644
index 0000000..7e2568a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.util.ElementHelper;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Axis2Protocol implements DeliveryProtocol {
+
+    private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class);
+
+    private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+
+    private ServiceClient nonThreadLocalServiceClient;
+
+    long tcpConnectionTimeout;
+
+    public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+            throws SendingException {
+        EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+        /*
+         * Extract information
+         */
+        String actionString = null;
+        List<OMElement> soapHeaders = new LinkedList<OMElement>();
+        if (consumerInfo.getType().compareTo("wsnt") == 0) {
+            actionString = NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify";
+        } else { // wse
+            actionString = additionalMessageContent.getAction();
+            String topicElString = additionalMessageContent.getTopicElement();
+            if (topicElString != null) {
+                OMElement topicEl = null;
+                try {
+                    topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+                    soapHeaders.add(topicEl);
+                } catch (XMLStreamException e) {
+                    logger.error("exception at topicEl xmlStreamException", e);
+                }
+            }
+        }
+
+        ServiceClient client = null;
+        try {
+
+            client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(),
+                    soapHeaders);
+
+            client.sendRobust(message);
+
+        } catch (AxisFault ex) {
+            throw new SendingException(ex.getCause());
+        } finally {
+            if (client != null) {
+                try {
+                    client.cleanup();
+                    client.cleanupTransport();
+                } catch (AxisFault ex) {
+                    logger.error(ex.getMessage(), ex);
+                }
+            }
+        }
+    }
+
+    public void setTimeout(long timeout) {
+        this.tcpConnectionTimeout = timeout;
+    }
+
+    private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
+            List<OMElement> soapHeaders) throws AxisFault {
+
+        // not engaging addressing modules
+        ServiceClient client = getServiceClient();
+
+        SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
+        msgIdEl.setText(msgId);
+        SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
+        actionEl.setText(action);
+
+        SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
+        to.setText(consumerLocation.getAddress());
+
+        client.addHeader(actionEl);
+        client.addHeader(msgIdEl);
+        client.addHeader(to);
+
+        for (OMElement omHeader : soapHeaders) {
+            try {
+                SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
+                client.addHeader(headerBlock);
+            } catch (Exception e) {
+                throw AxisFault.makeFault(e);
+            }
+        }
+
+        Options opts = new Options();
+        opts.setTimeOutInMilliSeconds(tcpConnectionTimeout);
+        opts.setMessageId(msgId);
+        opts.setTo(consumerLocation);
+        opts.setAction(action);
+        opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
+        opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10);
+        client.setOptions(opts);
+
+        return client;
+    }
+
+    private ServiceClient getServiceClient() throws AxisFault {
+        if (nonThreadLocalServiceClient == null) {
+            nonThreadLocalServiceClient = new ServiceClient();
+        }
+        nonThreadLocalServiceClient.removeHeaders();
+        return nonThreadLocalServiceClient;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
new file mode 100644
index 0000000..9eb50cc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+
+public interface SendingStrategy {
+    void init();
+
+    void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable);
+
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
new file mode 100644
index 0000000..5236f47
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConsumerHandler implements Runnable {
+
+    private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+
+    protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+    private String consumerUrl;
+
+    private Deliverable deliverable;
+
+    public ConsumerHandler(String url, Deliverable deliverable) {
+        this.consumerUrl = url;
+        this.deliverable = deliverable;
+    }
+
+    public String getConsumerUrl() {
+        return consumerUrl;
+    }
+
+    public void submitMessage(LightweightMsg msg) {
+        try {
+            queue.put(msg);
+        } catch (InterruptedException e) {
+            log.error("Interrupted when trying to add message");
+        }
+    }
+
+    protected void send(List<LightweightMsg> list) {
+        for (LightweightMsg m : list) {
+            try {
+                OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+                deliverable.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
new file mode 100644
index 0000000..7d21fdb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FixedParallelSender implements SendingStrategy {
+
+    private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+
+    private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30;
+
+    private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>();
+    private HashMap<String, Boolean> submittedConsumerHandlers = new HashMap<String, Boolean>();
+
+    private int batchSize;
+
+    private ExecutorService threadPool;
+
+    private boolean stop;
+
+    private Thread t;
+
+    public FixedParallelSender(int poolsize, int batchsize) {
+        this.threadPool = Executors.newFixedThreadPool(poolsize);
+        this.batchSize = batchsize;
+    }
+
+    public void init() {
+        this.t = new Thread(new ChooseHandlerToSubmit());
+        this.t.start();
+    }
+
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
+        for (ConsumerInfo consumer : consumerInfoList) {
+            sendToConsumerHandler(consumer, outMessage, deliverable);
+        }
+    }
+
+    public void shutdown() {
+        log.debug("Shutting down");
+        this.stop = true;
+
+        try {
+            this.t.join();
+        } catch (InterruptedException ie) {
+            log.error("Wait for ChooseHandlerToSubmit thread to finish (join) is interrupted");
+        }
+
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            log.error("Interrupted while waiting thread pool to shutdown");
+        }
+
+        log.debug("Shut down");
+    }
+
+    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
+
+        String consumerUrl = consumer.getConsumerEprStr();
+
+        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+                message.getAdditionalMessageContent());
+
+        synchronized (activeConsumerHandlers) {
+            ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl);
+            if (handler == null) {
+                handler = new FixedParallelConsumerHandler(consumerUrl, deliverable);
+                activeConsumerHandlers.put(consumerUrl, handler);
+                submittedConsumerHandlers.put(consumerUrl, Boolean.FALSE);
+            }
+            handler.submitMessage(lwm);
+        }
+    }
+
+    class ChooseHandlerToSubmit implements Runnable {
+        private static final int SLEEP_TIME_SECONDS = 1;
+
+        public void run() {
+            /*
+             * If stop is true, we will not get any message to send from addMessageToSend() method. So,
+             * activeConsumerHandlers size will not increase but decrease only. When shutdown() is invoked, we will have
+             * to send out all messages in our queue.
+             */
+            while (!stop || activeConsumerHandlers.size() > 0) {
+
+                synchronized (activeConsumerHandlers) {
+                    Iterator<String> it = activeConsumerHandlers.keySet().iterator();
+                    while (it.hasNext()) {
+                        String key = it.next();
+                        boolean submitted = submittedConsumerHandlers.get(key);
+
+                        /*
+                         * If consumer handlers is not scheduled to run, submit it to thread pool.
+                         */
+                        if (!submitted) {
+                            threadPool.submit(activeConsumerHandlers.get(key));
+                            submittedConsumerHandlers.put(key, Boolean.TRUE);
+                        }
+                    }
+                }
+
+                try {
+                    TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+                } catch (InterruptedException e) {
+                    log.error("interrupted while waiting", e);
+                }
+            }
+        }
+    }
+
+    class FixedParallelConsumerHandler extends ConsumerHandler {
+
+        public FixedParallelConsumerHandler(String url, Deliverable deliverable) {
+            super(url, deliverable);
+        }
+
+        public void run() {
+
+            log.debug(String.format("FixedParallelConsumerHandler starting: %s", getConsumerUrl()));
+
+            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
+
+            queue.drainTo(localList, batchSize);
+
+            send(localList);
+            localList.clear();
+
+            /*
+             * Remove handler if and only if there is no message
+             */
+            synchronized (activeConsumerHandlers) {
+
+                /*
+                 * all message is sent or not, we will set it as not submitted. So, it can be put back to thread pool.
+                 */
+                submittedConsumerHandlers.put(getConsumerUrl(), Boolean.FALSE);
+
+                if (queue.size() == 0) {
+                    submittedConsumerHandlers.remove(getConsumerUrl());
+                    activeConsumerHandlers.remove(getConsumerUrl());
+
+                    log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl()));
+                }
+            }
+
+            log.debug(String.format("FixedParallelConsumerHandler done: %s,", getConsumerUrl()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
new file mode 100644
index 0000000..ca56c58
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+class LightweightMsg {
+    private ConsumerInfo consumerInfo;
+    private String payload;
+    private AdditionalMessageContent header;
+
+    public LightweightMsg(ConsumerInfo c, String pld, AdditionalMessageContent h) {
+        consumerInfo = c;
+        payload = pld;
+        header = h;
+    }
+
+    public String getPayLoad() {
+        return payload;
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    public AdditionalMessageContent getHeader() {
+        return header;
+    }
+
+    public String toString() {
+        return String.format("header: %s, consumer: %s, pld: %s", header, consumerInfo.getConsumerEprStr(), payload);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
new file mode 100644
index 0000000..cede65d
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Each subscriber (URL Endpoint) will have its own thread to send a message to
+ * 
+ */
+public class ParallelSender implements SendingStrategy {
+
+    private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
+
+    private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30;
+
+    private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>();
+
+    private ExecutorService threadPool;
+
+    public void init() {
+        this.threadPool = Executors.newCachedThreadPool();
+    }
+
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
+        for (ConsumerInfo consumer : consumerInfoList) {
+            sendToConsumerHandler(consumer, outMessage, deliverable);
+        }
+    }
+
+    public void shutdown() {
+        log.debug("Shutting down");
+
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            log.error("Interrupted while waiting thread pool to shutdown");
+        }
+        log.debug("Shut down");
+    }
+
+    private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
+        String consumerUrl = consumer.getConsumerEprStr();
+
+        LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+                message.getAdditionalMessageContent());
+
+        synchronized (activeConsumerHandlers) {
+            ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl);
+            if (handler == null) {
+                handler = new ParallelConsumerHandler(consumerUrl, deliverable);
+                activeConsumerHandlers.put(consumerUrl, handler);
+                handler.submitMessage(lwm);
+                threadPool.submit(handler);
+            } else {
+                handler.submitMessage(lwm);
+            }
+        }
+    }
+
+    class ParallelConsumerHandler extends ConsumerHandler {
+
+        private static final int MAX_UNSUCCESSFUL_DRAINS = 3;
+        private static final int SLEEP_TIME_SECONDS = 1;
+        private int numberOfUnsuccessfulDrain = 0;
+
+        public ParallelConsumerHandler(String url, Deliverable deliverable) {
+            super(url, deliverable);
+        }
+
+        public void run() {
+            log.debug(String.format("ParallelConsumerHandler starting: %s", getConsumerUrl()));
+
+            ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
+            while (true) {
+
+                /*
+                 * Try to find more message to send out
+                 */
+                if (queue.drainTo(localList) <= 0) {
+                    numberOfUnsuccessfulDrain++;
+                } else {
+                    numberOfUnsuccessfulDrain = 0;
+                }
+
+                /*
+                 * No new message for sometimes
+                 */
+                if (numberOfUnsuccessfulDrain >= MAX_UNSUCCESSFUL_DRAINS) {
+                    /*
+                     * Stop this thread if and only if there is no message
+                     */
+                    synchronized (activeConsumerHandlers) {
+                        if (queue.size() == 0) {
+                            if (activeConsumerHandlers.remove(getConsumerUrl()) != null) {
+                                log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl()));
+                            }
+                            log.debug(String.format("ParallelConsumerHandler done: %s,", getConsumerUrl()));
+                            break;
+                        }
+                    }
+                }
+
+                send(localList);
+                localList.clear();
+
+                if (numberOfUnsuccessfulDrain > 0) {
+                    waitForMessages();
+                }
+            }
+        }
+
+        private void waitForMessages() {
+            try {
+                TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+                log.debug("finished - waiting for messages");
+            } catch (InterruptedException e) {
+                log.error("interrupted while waiting for messages", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
new file mode 100644
index 0000000..380e559
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerialSender implements SendingStrategy {
+
+    private static final Logger log = LoggerFactory.getLogger(SerialSender.class);
+
+    public void init() {
+    }
+
+    public void shutdown() {
+    }
+
+    public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+        sendNotification(outMessage, deliverable);
+    }
+
+    public void sendNotification(OutGoingMessage outGoingMessage, Deliverable deliverable) {
+
+        if (outGoingMessage == null) {
+            log.error("Got a null outgoing message");
+            return;
+        }
+        String messageString = outGoingMessage.getTextMessage();
+
+        List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
+        AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
+
+        try {
+            OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
+
+            for (ConsumerInfo obj : consumerInfoList) {
+                deliverable.send(obj, messgae2Send, soapHeader);
+            }
+
+        } catch (XMLStreamException e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
new file mode 100644
index 0000000..6400f63
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+
+public class BrokerUtil {
+
+    /**
+     * Compares String {@code x} with String {@code y}. The result is {@code true} if and only if both arguments are
+     * {@code null} or String {@code x} has the same sequence of characters as String {@code y}.
+     * 
+     * @param x
+     * @param y
+     * @return {@code true} if the String {@code x} and String {@code y} are equivalent, {@code false} otherwise
+     */
+    public static boolean sameStringValue(String x, String y) {
+        return (x == null && y == null) || (x != null && y != null && x.equals(y));
+    }
+
+    public static String getTopicLocalString(String filterText) {
+
+        if (filterText == null)
+            throw new IllegalArgumentException("filter text can't be null");
+
+        String localName = null;
+
+        int pos = filterText.indexOf(':');
+
+        if (pos != -1) {
+            localName = filterText.substring(pos + 1);
+
+        } else {
+
+            localName = filterText;
+        }
+
+        return localName;
+    }
+
+    /**
+     * 
+     * @return localString
+     * @throws AxisFault
+     */
+    public static String getXPathString(OMElement xpathEl) throws AxisFault {
+
+        if (xpathEl == null) {
+            throw new IllegalArgumentException("xpath element can't be null");
+        }
+
+        OMAttribute dialectAttribute = xpathEl.getAttribute(new QName("Dialect"));
+
+        if (dialectAttribute == null) {
+            dialectAttribute = xpathEl.getAttribute(new QName("DIALECT"));
+
+        }
+        if (dialectAttribute == null) {
+            throw new AxisFault("dialect is required for subscribe");
+        }
+        String dialectString = dialectAttribute.getAttributeValue();
+        if (!dialectString.equals(WsmgCommonConstants.XPATH_DIALECT)) {
+            // System.out.println("***Unkown dialect: " + dialectString);
+            throw new AxisFault("Unkown dialect: " + dialectString);
+        }
+        String xpathLocalString = xpathEl.getText();
+        return xpathLocalString;
+    }
+
+    public static String getTopicFromRequestPath(String topicPath) {
+        if (topicPath == null)
+            return null;
+        if (topicPath.length() == 0)
+            return null;
+        if (topicPath.startsWith("/")) {
+            topicPath = topicPath.substring(1);
+            if (topicPath.length() == 0)
+                return null;
+        }
+
+        String ret = null;
+
+        int index = topicPath.indexOf(WsmgCommonConstants.TOPIC_PREFIX);
+        if (index >= 0) {
+
+            ret = topicPath.substring(index + WsmgCommonConstants.TOPIC_PREFIX.length());
+
+            if (ret.length() == 0) {
+                ret = null;
+            }
+
+        }
+
+        return ret;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
new file mode 100644
index 0000000..bcd00a4
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+//Used for stress test. use together with TimerThread
+public class Counter {
+
+    private AtomicLong counter = new AtomicLong(0);
+
+    private AtomicReference<String> otherStringValue = new AtomicReference<String>();
+
+    public void addCounter() {
+        counter.getAndIncrement();
+
+    }
+
+    public synchronized void addCounter(String otherValue) {
+        counter.getAndIncrement();
+        otherStringValue.set(otherValue);
+    }
+
+    /**
+     * @return Returns the counterValue.
+     */
+    public long getCounterValue() {
+
+        return counter.get();
+    }
+
+    /**
+     * @param counterValue
+     *            The counterValue to set.
+     */
+    public void setCounterValue(long counterValue) {
+        counter.set(counterValue);
+
+    }
+
+    /**
+     * @return Returns the otherValueString.
+     */
+    public String getOtherValueString() {
+
+        return otherStringValue.get();
+    }
+
+    /**
+     * @param otherValueString
+     *            The otherValueString to set.
+     */
+    public void setOtherValueString(String otherValueString) {
+        otherStringValue.set(otherValueString);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
new file mode 100644
index 0000000..a2c0bfc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeSet;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgVersion;
+
+public class RunTimeStatistics {
+    public static long totalMessageSize = 0;
+    public static long totalReceivedNotification = 0;
+    public static long totalSentOutNotification = 0;
+    public static long totalFailedNotification = 0;
+    public static long totalSubscriptions = 0;
+    public static long totalSubscriptionsAtStartUp = 0;
+    public static long totalUnSubscriptions = 0;
+    public static long minMessageSize = Long.MAX_VALUE;
+    public static long maxMessageSize = 0;
+    public static String startUpTime = "";
+    public static long totalSuccessfulDeliveryTime = 0;
+    public static long totalFailedDeliveryTime = 0;
+    public static long minSuccessfulDeliveryTime = Long.MAX_VALUE;
+    public static long maxSuccessfulDeliveryTime = 0;
+    public static long minFailedDeliveryTime = Long.MAX_VALUE;
+    public static long maxFailedDeliveryTime = 0;
+    public static final HashMap<String, Integer> failConsumerList = new HashMap<String, Integer>();
+
+    // public static TreeSet currentBlackList=new TreeSet();
+    // public static TreeSet previousBlackList=new TreeSet();
+
+    private static long startUpTimeInMillis;
+
+    public static synchronized void addNewNotificationMessageSize(int size) {
+        if (size < minMessageSize) {
+            minMessageSize = size;
+        }
+        if (size > maxMessageSize) {
+            maxMessageSize = size;
+        }
+        totalMessageSize += size;
+        totalReceivedNotification++;
+    }
+
+    public static synchronized void addNewSuccessfulDeliverTime(long deliveryTime) {
+        if (deliveryTime < minSuccessfulDeliveryTime) {
+            minSuccessfulDeliveryTime = deliveryTime;
+        }
+        if (deliveryTime > maxSuccessfulDeliveryTime) {
+            maxSuccessfulDeliveryTime = deliveryTime;
+        }
+        totalSuccessfulDeliveryTime += deliveryTime;
+        totalSentOutNotification++;
+    }
+
+    public static synchronized void addNewFailedDeliverTime(long deliveryTime) {
+        if (deliveryTime < minFailedDeliveryTime) {
+            minFailedDeliveryTime = deliveryTime;
+        }
+        if (deliveryTime > maxFailedDeliveryTime) {
+            maxFailedDeliveryTime = deliveryTime;
+        }
+        totalFailedDeliveryTime += deliveryTime;
+        totalFailedNotification++;
+    }
+
+    public static synchronized void addFailedConsumerURL(String url) {
+        Integer previousCount = failConsumerList.get(url);
+        if (previousCount == null) {
+            failConsumerList.put(url, 1);
+        } else {
+            previousCount++;
+            failConsumerList.put(url, previousCount);
+        }
+    }
+
+    public static void setStartUpTime() {
+        Date currentDate = new Date(); // Current date
+        startUpTime = CommonRoutines.getXsdDateTime(currentDate);
+        startUpTimeInMillis = currentDate.getTime();
+    }
+
+    public static String getHtmlString() {
+        String htmlString = "";
+
+        htmlString += "<p>Total incoming message number: <span class=\"xml-requests-count\">"
+                + totalReceivedNotification + "</span><br />\n";
+        htmlString += "Total successful outgoing message number: " + totalSentOutNotification + "<br>\n";
+        htmlString += "Total unreachable outgoing message number: " + totalFailedNotification + "<br>\n";
+        htmlString += "Total subscriptions requested: " + totalSubscriptions + "(+" + totalSubscriptionsAtStartUp
+                + " startUp)<br>\n";
+        htmlString += "Total Unsubscriptions requested: " + totalUnSubscriptions + "<br>\n";
+        htmlString += "</p>\n";
+        int averageMessageSize = 0;
+        if (totalReceivedNotification != 0) {
+            averageMessageSize = (int) (totalMessageSize / totalReceivedNotification);
+        }
+        htmlString += "<p>Average message size: " + averageMessageSize + " bytes<br>\n";
+        htmlString += "Max message size: " + maxMessageSize + " bytes<br>\n";
+        htmlString += "Min message size: " + minMessageSize + " bytes<br>\n";
+        htmlString += "</p>\n";
+        long averageSuccessfulDeliveryTime = 0;
+        if (totalSuccessfulDeliveryTime != 0) {
+            averageSuccessfulDeliveryTime = (totalSuccessfulDeliveryTime / totalSentOutNotification);
+        }
+        htmlString += "<p>Average Successful Delivery Time: " + averageSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "Max Successful Delivery Time: " + maxSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "Min Successful Delivery Time: " + minSuccessfulDeliveryTime + " ms<br>\n";
+        htmlString += "</p>\n";
+        long averageFailedDeliveryTime = 0;
+        if (totalFailedDeliveryTime != 0) {
+            averageFailedDeliveryTime = (totalFailedDeliveryTime / totalFailedNotification);
+        }
+        htmlString += "<p>Average Unreachable Delivery Time: " + averageFailedDeliveryTime + " ms<br>\n";
+        htmlString += "Max Unreachable Delivery Time: " + maxFailedDeliveryTime + " ms<br>\n";
+        htmlString += "Min Unreachable Delivery Time: " + minFailedDeliveryTime + " ms<br>\n";
+        htmlString += "</p>\n";
+        htmlString += "<p>Service started at: " + startUpTime + " <span class=\"starttime-seconds\">"
+                + startUpTimeInMillis + "</span> [seconds] since UNIX epoch)" + "<br />\n";
+
+        htmlString += "Version: <span class=\"service-name\">" + WsmgVersion.getImplementationVersion()
+                + "</span></p>\n";
+
+        htmlString += "<p>Total unreachable consumerUrl: " + failConsumerList.size() + " <br>\n";
+        TreeSet<String> consumerUrlList = new TreeSet<String>(failConsumerList.keySet());
+        Iterator<String> iter = consumerUrlList.iterator();
+        while (iter.hasNext()) {
+            String url = iter.next();
+            int failedCount = failConsumerList.get(url);
+            htmlString += "  " + url + " -->" + failedCount + " <br>\n";
+        }
+        htmlString += "</p>\n";
+        return htmlString;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
new file mode 100644
index 0000000..77f7c57
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public class TimerThread implements Runnable {
+    Counter counter;
+
+    long counterValue = 0;
+
+    long seqNum = 0;
+
+    String comment = "";
+
+    public TimerThread(Counter counter, String comment) {
+        this.counter = counter;
+        this.comment = comment;
+    }
+
+    public void run() {
+        long currentTime = 0;
+        long interval = 1000;
+        long lastCounter = 0;
+        long idleCount = 0;
+        // wait for about 5 sec and start from 000 time so that other thread can
+        // start together
+        currentTime = System.currentTimeMillis();
+        long launchTime = ((currentTime + 2000) / 1000) * 1000;
+        long sleepTime = launchTime - currentTime;
+        System.out.println("launchTime=" + launchTime + " SleepTime=" + sleepTime);
+        try {
+            Thread.sleep(sleepTime);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        while (true) {
+            currentTime = System.currentTimeMillis();
+            counterValue = counter.getCounterValue();
+            long receivedCount = counterValue - lastCounter;
+            lastCounter = counterValue;
+            if (receivedCount == 0) {
+                idleCount++;
+            } else {
+                idleCount = 0;
+            }
+            if (receivedCount > 0 || (receivedCount == 0 && idleCount < 3)) {
+                // System.out.println("time="+currentTime+" counter="+
+                // counter.getCounterValue()+"
+                // received="+receivedCount+comment);
+                System.out.println(seqNum + " " + counter.getCounterValue() + " " + receivedCount + comment
+                        + counter.getOtherValueString());
+            }
+            seqNum++;
+            launchTime = launchTime + interval;
+            sleepTime = launchTime - currentTime;
+            // System.out.println("launchTime="+launchTime+"
+            // SleepTime="+sleepTime);
+            if (sleepTime < 0)
+                sleepTime = 0;
+            try {
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
new file mode 100644
index 0000000..931a5bf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsEventingOperations {
+
+    RENEW("renew"), PUBLISH("publish"), GET_STATUS("getStatus"), SUBSCRIPTION_END("subscriptionEnd"), SUBSCRIBE(
+            "subscribe"), UNSUBSCRIBE("unsubscribe");
+
+    private final String name;
+
+    private WsEventingOperations(String n) {
+        name = n;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    public boolean equals(String s) {
+        return name.equals(s);
+    }
+
+    public static WsEventingOperations valueFrom(String s) {
+        for (WsEventingOperations status : WsEventingOperations.values()) {
+            if (status.toString().equalsIgnoreCase(s)) {
+                return status;
+            }
+
+        }
+
+        throw new RuntimeException("invalid WsEventingOperation:- " + s);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
new file mode 100644
index 0000000..c771134
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsNotificationOperations {
+
+    NOTIFY("notify"), SUBSCRIBE("subscribe"), GET_CURRENT_MSG("getCurrentMessage"), PAUSE_REQUEST("gause"), RESUME_REQUEST(
+            "resume"), PAUSE_SUBSCRIPTION("pauseSubscription"), RESUME_SUBSCRIPTION("resumeSubscription"), REGISTER_PUBLISHER(
+            "registerPublisher"), UNSUBSCRIBE("unsubscribe");
+
+    private final String name;
+
+    private WsNotificationOperations(String n) {
+        name = n;
+    }
+
+    public String toString() {
+        return name;
+    }
+
+    public boolean equals(String s) {
+        return name.equals(s);
+    }
+
+    public static WsNotificationOperations valueFrom(String s) {
+        for (WsNotificationOperations status : WsNotificationOperations.values()) {
+            if (status.toString().equalsIgnoreCase(s)) {
+                return status;
+            }
+
+        }
+
+        throw new RuntimeException("invalid Ws notification Operation:- " + s);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
new file mode 100755
index 0000000..5663ebf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+delete from disQ;
+delete from MaxIDTable;
+delete from MinIDTable;
+delete from specialSubscription;
+delete from subscription;
+delete from msgbox;

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
new file mode 100644
index 0000000..80b51a4
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+CREATE TABLE SUBSCRIPTION (
+                SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '',          
+                TOPICS VARCHAR(255) DEFAULT '',                               
+                XPATH VARCHAR(200) DEFAULT '',                       
+                CONSUMERADDRESS VARCHAR(255) DEFAULT '',                      
+                REFERENCEPROPERTIES BLOB,
+                CONTENT BLOB,                                                     
+                WSRM INTEGER NOT NULL DEFAULT 0 ,                                  
+                CREATIONTIME TIMESTAMP NOT NULL  
+              );
+CREATE TABLE SPECIALSUBSCRIPTION (                              
+                       SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '',              
+                       TOPICS VARCHAR(255) DEFAULT '',                               
+                       XPATH VARCHAR(200) DEFAULT '',                                
+                       CONSUMERADDRESS VARCHAR(255) DEFAULT '',                      
+                       REFERENCEPROPERTIES BLOB,                                     
+                       CONTENT BLOB,                                                     
+                       WSRM INTEGER NOT NULL DEFAULT 0,                         
+                       CREATIONTIME TIMESTAMP NOT NULL 
+                     );               
+
+	
+CREATE TABLE DISQ (                       
+          ID BIGINT GENERATED BY DEFAULT AS IDENTITY,
+          TRACKID VARCHAR(100) DEFAULT NULL,      
+          MESSAGE BLOB,                           
+          STATUS INTEGER  DEFAULT NULL,    
+          TOPIC VARCHAR(255) DEFAULT '',     
+          PRIMARY KEY  (ID)                       
+        );	
+	
+CREATE TABLE MAXIDTABLE(
+	MAXID INTEGER
+	);
+	
+CREATE TABLE MINIDTABLE(
+	MINID INTEGER
+	);
+	

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
new file mode 100755
index 0000000..cb506ef
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ 
+CREATE DATABASE IF NOT EXISTS wsmg;
+CREATE TABLE `subscription` (
+                `SubscriptionId` varchar(200) NOT NULL default '',
+                `Topics` varchar(255) default '',
+                `XPath` varchar(200) default '',
+                `ConsumerAddress` varchar(255) default '',
+                `ReferenceProperties` blob,
+                `content` blob,
+                `wsrm` tinyint(1) NOT NULL default '0',
+                `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'
+              );
+CREATE TABLE `specialSubscription` (
+                       `SubscriptionId` varchar(200) NOT NULL default '',
+                       `Topics` varchar(255) default '',
+                       `XPath` varchar(200) default '',
+                       `ConsumerAddress` varchar(255) default '',
+                       `ReferenceProperties` blob,
+                       `content` blob,
+                       `wsrm` tinyint(1) NOT NULL default '0',
+                       `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'
+                     );
+
+
+CREATE TABLE `disQ` (
+          `id` bigint(11) NOT NULL auto_increment,
+          `trackId` varchar(100) default NULL,
+          `message` longblob,
+          `status` int(11) default NULL,
+          `topic` varchar(255) default '',
+          PRIMARY KEY  (`id`)
+        );
+
+CREATE TABLE MaxIDTable(
+       maxID integer
+       );
+
+CREATE TABLE MinIDTable(
+       minID integer
+       );
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/services.xml
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/services.xml b/modules/ws-messenger/messagebroker/src/main/resources/services.xml
new file mode 100644
index 0000000..229262c
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/services.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<serviceGroup>
+    <service name="EventingService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+        <operation name="renew">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+            <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew
+            </actionMapping>
+            <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="getStatus">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+            <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus
+            </actionMapping>
+            <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="subscriptionEnd">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+            <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd
+            </actionMapping>
+        </operation>
+
+        <operation name="subscribe">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+            <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
+            </actionMapping>
+            <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="unsubscribe">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+            <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe
+            </actionMapping>
+            <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="publish">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingPublishMsgReceiver" />
+            <actionMapping>http://org.apache.airavata/WseNotification
+            </actionMapping>
+        </operation>
+
+    </service>
+
+    <service name="NotificationService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+        <operation name="notify">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/Notify
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/NotifyResponse
+            </outputActionMapping>
+        </operation>
+        
+        <operation name="subscribe">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequest
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequestResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="getCurrentMessage">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageRequest
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageResponse
+            </outputActionMapping>
+        </operation>
+        
+        <operation name="pauseSubscription">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubsriptionRequest
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubscriptionResponse
+            </outputActionMapping>
+        </operation>
+        
+        <operation name="resumeSubscription">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubsriptionRequest
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubscriptionResponse
+            </outputActionMapping>
+        </operation>
+
+        <operation name="unsubscribe">
+            <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+            <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubsribeRequest
+            </actionMapping>
+            <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubscribeResponse
+            </outputActionMapping>
+        </operation>
+
+    </service>
+
+    <parameter name="configuration.file.name" locked="false">airavata-server.properties</parameter>
+
+</serviceGroup>

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
new file mode 100644
index 0000000..4867ba7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.Test;
+
+public class BrokerWSETest extends TestCase implements ConsumerNotificationHandler {
+
+    private static int port = TestUtilServer.TESTING_PORT;
+    static Properties configs = new Properties();
+
+    public void handleNotification(SOAPEnvelope msgEnvelope) {
+        System.out.println("Received " + msgEnvelope);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        TestUtilServer.start(null, null);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        TestUtilServer.stop();
+    }
+
+    @Test
+    public void testRoundTrip() throws InterruptedException {
+
+        try {
+
+            String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+            long value = System.currentTimeMillis();
+            String msg = String.format("<msg> current time is : %d </msg>", value);
+
+            WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+            wseMsgBrokerClient.init(brokerEPR);
+            int consumerPort = TestUtilServer.getAvailablePort();
+
+            String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
+
+            assertTrue(consumerEPRs.length > 0);
+
+            String topic = "WseRoundTripTestTopic";
+
+            String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topic, null);
+            System.out.println("topic sub id = " + subscriptionID);
+
+            try {
+                wseMsgBrokerClient.publish(topic, msg);
+                wseMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>"));
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+
+            Thread.sleep(2000);
+
+            try {
+                wseMsgBrokerClient.unSubscribe(subscriptionID);
+            } catch (AxisFault e) {
+                e.printStackTrace();
+                fail(e.getMessage());
+            }
+            wseMsgBrokerClient.shutdownConsumerService();
+
+        } catch (AxisFault e) {
+            e.printStackTrace();
+            try {
+                System.in.read();
+            } catch (IOException e1) {
+                e1.printStackTrace();
+            }
+            fail("unexpected exception occured");
+        }
+        System.out.println("Broker roundtrip done");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
new file mode 100644
index 0000000..baebd31
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WsntMsgBrokerClient;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.Test;
+
+public class BrokerWSNTTest extends TestCase implements ConsumerNotificationHandler {
+
+    static Properties configs = new Properties();
+
+    public void handleNotification(SOAPEnvelope msgEnvelope) {
+        System.out.println("Received " + msgEnvelope);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        TestUtilServer.start(null, null);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        TestUtilServer.stop();
+    }
+
+    @Test
+    public void testRoundTrip() throws InterruptedException {
+
+        try {
+            long value = System.currentTimeMillis();
+            String msg = String.format("<msg> current time is : %d </msg>", value);
+
+            WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
+
+            int consumerPort = 6767;
+
+            String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/NotificationService";
+            wsntMsgBrokerClient.init(brokerEPR);
+            String[] consumerEPRs = wsntMsgBrokerClient.startConsumerService(consumerPort, this);
+
+            assertTrue(consumerEPRs.length > 0);
+
+            String topic = "/WsntRoundTripTestTopic";
+
+            String topicSubscriptionID = wsntMsgBrokerClient.subscribe(consumerEPRs[0], topic, null);
+            System.out.println("topic subscription id: " + topicSubscriptionID);
+
+            try {
+                wsntMsgBrokerClient.publish(topic, msg);
+                wsntMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>"));
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+
+            Thread.sleep(2000);
+
+            try {
+                wsntMsgBrokerClient.unSubscribe(topicSubscriptionID);
+            } catch (AxisFault e) {
+                e.printStackTrace();
+                fail(e.getMessage());
+            }
+
+            wsntMsgBrokerClient.shutdownConsumerService();
+
+        } catch (AxisFault e) {
+            e.printStackTrace();
+            try {
+                System.in.read();
+            } catch (IOException e1) {
+                e1.printStackTrace();
+            }
+
+            fail("unexpected exception occured");
+        }
+        System.out.println("Broker roundtrip done");
+
+    }
+}