You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:24 UTC

[82/90] [abbrv] AIRAVATA-1124

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
new file mode 100644
index 0000000..098dd49
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons;
+
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonRoutines extends TestCase {
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    /**
+     * Test method for {@link org.apache.airavata.wsmg.commons.CommonRoutines#getXsdDateTime(java.util.Date)}.
+     */
+    @Test
+    public void testGetXsdDateTime() {
+        assertNotNull(CommonRoutines.getXsdDateTime(new Date()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
new file mode 100644
index 0000000..6f1ecbb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAddtionalWseXpathAndTopicScenarios extends TestCase {
+
+    static Properties configs = new Properties();
+
+    class NotificationReciever implements ConsumerNotificationHandler {
+
+        private BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+        private String id;
+
+        public NotificationReciever(String id) {
+            this.id = id;
+        }
+
+        public void handleNotification(SOAPEnvelope msgEnvelope) {
+            queue.add(msgEnvelope);
+            System.out.println(String.format("[reciever id: %s] %s", id, msgEnvelope));
+        }
+
+        public BlockingQueue<SOAPEnvelope> getMsgQueue() {
+            return queue;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+        configs.load(configURL.openStream());
+
+        TestUtilServer.start(null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public final void testXpathAndTopicOnlyRoundTrip() {
+
+        try {
+
+            String topic = "RoundTripTestXpathAndTopicWse";
+
+            String xpathExpression = "/c/b/a[text()=1]";
+
+            String msgFormat = "<c><b><a>%d</a></b></c>";
+
+            long value = 1;
+            String matchingMsg = String.format(msgFormat, value);
+            String unmatchingMsg = String.format(msgFormat, value + 1);
+
+            int consumerPort = TestUtilServer.getAvailablePort();
+
+            String brokerEpr = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+
+            WseMsgBrokerClient topicOnlyReceiverApi = new WseMsgBrokerClient();
+            topicOnlyReceiverApi.init(brokerEpr);
+            NotificationReciever topicOnlyMsgReceiver = new NotificationReciever("Topic Only");
+            String[] topicConsumerEPRs = topicOnlyReceiverApi.startConsumerService(consumerPort, topicOnlyMsgReceiver);
+            assertTrue("invalid consumer eprs returned", topicConsumerEPRs.length > 0);
+            String topicOnlySubId = topicOnlyReceiverApi.subscribe(topicConsumerEPRs[0], topic, null);
+            System.out.println("Topic only subscription ID: " + topicOnlySubId);
+
+            WseMsgBrokerClient xpathAndTopicReceiverApi = new WseMsgBrokerClient();
+            xpathAndTopicReceiverApi.init(brokerEpr);
+            NotificationReciever topicAndXpathMsgReceiver = new NotificationReciever("Topic And Xpath");
+            String[] topicAndXpathConsumerEPRs = xpathAndTopicReceiverApi.startConsumerService(consumerPort + 1,
+                    topicAndXpathMsgReceiver);
+            assertTrue("invalid consumer eprs returned", topicAndXpathConsumerEPRs.length > 0);
+            String topicAndXpathSubId = xpathAndTopicReceiverApi.subscribe(topicAndXpathConsumerEPRs[0], topic,
+                    xpathExpression);
+            System.out.println("Xpath and Topic subscription ID: " + topicAndXpathSubId);
+
+            WseMsgBrokerClient senderApi = new WseMsgBrokerClient();
+            senderApi.init(brokerEpr);
+
+            try {
+
+                senderApi.publish(topic, AXIOMUtil.stringToOM(matchingMsg));
+                senderApi.publish(topic, AXIOMUtil.stringToOM(unmatchingMsg));
+
+                Thread.sleep(5000);
+
+                assertTrue("topic only reciever should get all messages" + topicOnlyMsgReceiver.getMsgQueue().size(),
+                        topicOnlyMsgReceiver.getMsgQueue().size() == 2);
+
+                assertTrue("xpath and topic reciever should only get one message"
+                        + topicAndXpathMsgReceiver.getMsgQueue().size(),
+                        topicAndXpathMsgReceiver.getMsgQueue().size() == 1);
+            } catch (XMLStreamException x) {
+                fail("Error while creating OMElement");
+            } catch (InterruptedException e) {
+                fail("interrupted while waiting for message");
+            }
+
+            topicOnlyReceiverApi.unSubscribe(topicOnlySubId);
+            topicOnlyReceiverApi.shutdownConsumerService();
+
+            xpathAndTopicReceiverApi.unSubscribe(topicAndXpathSubId);
+            xpathAndTopicReceiverApi.shutdownConsumerService();
+
+        } catch (AxisFault e) {
+            e.printStackTrace();
+            fail("unexpected exception occured");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
new file mode 100644
index 0000000..024b5d3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWseXpathAndTopicSubscription extends TestCase implements ConsumerNotificationHandler {
+
+    static Properties configs = new Properties();
+
+    BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+    public void handleNotification(SOAPEnvelope msgEnvelope) {
+
+        queue.add(msgEnvelope);
+        System.out.println(msgEnvelope);
+    }
+
+    BlockingQueue<SOAPEnvelope> getMsgQueue() {
+        return queue;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+        configs.load(configURL.openStream());
+
+        TestUtilServer.start(null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public final void testXpathOnlyRoundTrip() {
+
+        try {
+
+            String validMsgFormat = "<c><b><a> %d </a></b></c>";
+            String invalidMsgFormat = "<a><b><c> %d </c></b></a>";
+
+            long value = System.currentTimeMillis();
+            String validMsg = String.format(validMsgFormat, value);
+            String invalidMsg = String.format(invalidMsgFormat, value);
+
+            int consumerPort = TestUtilServer.getAvailablePort();
+
+            String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+            WseMsgBrokerClient msgBrokerClient = new WseMsgBrokerClient();
+            msgBrokerClient.init(brokerEPR);
+
+            String[] consumerEPRs = msgBrokerClient.startConsumerService(consumerPort, this);
+
+            assertTrue(consumerEPRs.length > 0);
+
+            String xpathExpression = "/c/b/a";
+
+            String subscriptionID = msgBrokerClient.subscribe(consumerEPRs[0], null, xpathExpression);
+
+            try {
+                msgBrokerClient.publish(null, AXIOMUtil.stringToOM(validMsg));
+                msgBrokerClient.publish(null, AXIOMUtil.stringToOM(invalidMsg));
+
+                SOAPEnvelope env = getMsgQueue().take();
+
+                assertNotNull(env.getBody());
+                assertNotNull(env.getBody().getChildrenWithLocalName("c"));
+
+                OMElement element = (OMElement) env.getBody().getChildrenWithLocalName("c").next();
+
+                String text = element.toStringWithConsume();
+
+                assertTrue("round trip of message failed" + " - due to invalid messege content",
+                        text.indexOf(new Long(value).toString()) > 0);
+
+                Thread.sleep(5000);
+
+                assertTrue("unexpected msg recieved", getMsgQueue().isEmpty());
+
+            } catch (InterruptedException e) {
+                fail("interrupted while waiting for message");
+            } catch (XMLStreamException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+                fail("invalid xml recieved: " + e.getMessage());
+            }
+
+            msgBrokerClient.unSubscribe(subscriptionID);
+            msgBrokerClient.shutdownConsumerService();
+
+        } catch (AxisFault e) {
+            e.printStackTrace();
+            fail("unexpected exception occured");
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
new file mode 100644
index 0000000..097e6b7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWseXpathSubscription extends TestCase implements ConsumerNotificationHandler {
+
+    static Properties configs = new Properties();
+
+    BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+    public void handleNotification(SOAPEnvelope msgEnvelope) {
+
+        queue.add(msgEnvelope);
+        System.out.println(msgEnvelope);
+    }
+
+    BlockingQueue<SOAPEnvelope> getMsgQueue() {
+        return queue;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+        configs.load(configURL.openStream());
+
+        TestUtilServer.start(null, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public final void testSimpleXpathTopicRoundTrip() {
+
+        try {
+
+            String validMsgFormat = "<c><b><a> %d </a></b></c>";
+            String invalidMsgFormat = "<a><b><c> %d </c></b></a>";
+
+            long value = System.currentTimeMillis();
+            String validMsg = String.format(validMsgFormat, value);
+            String invalidMsg = String.format(invalidMsgFormat, value);
+
+            int consumerPort = TestUtilServer.getAvailablePort();
+
+            String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+
+            WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+            wseMsgBrokerClient.init(brokerEPR);
+
+            String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
+
+            assertTrue(consumerEPRs.length > 0);
+
+            String xpathExpression = "/c/b/a";
+            String topicExpression = "XpathAndTopicTestWse";
+
+            String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topicExpression, xpathExpression);
+
+            try {
+
+                wseMsgBrokerClient.publish(topicExpression, AXIOMUtil.stringToOM(validMsg));
+                wseMsgBrokerClient.publish(topicExpression, AXIOMUtil.stringToOM(invalidMsg));
+
+                SOAPEnvelope env = getMsgQueue().take();
+
+                assertNotNull(env.getBody());
+                assertNotNull(env.getBody().getChildrenWithLocalName("c"));
+
+                OMElement element = (OMElement) env.getBody().getChildrenWithLocalName("c").next();
+
+                String text = element.toStringWithConsume();
+
+                assertTrue("round trip of message failed" + " - due to invalid messege content",
+                        text.indexOf(new Long(value).toString()) > 0);
+
+                Thread.sleep(5000);
+
+                assertTrue("unexpected msg recieved", getMsgQueue().isEmpty());
+
+            } catch (InterruptedException e) {
+                fail("interrupted while waiting for message");
+            } catch (XMLStreamException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+                fail("invalid xml recieved: " + e.getMessage());
+            }
+
+            wseMsgBrokerClient.unSubscribe(subscriptionID);
+            wseMsgBrokerClient.shutdownConsumerService();
+
+        } catch (AxisFault e) {
+            e.printStackTrace();
+            fail("unexpected exception occured");
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
new file mode 100644
index 0000000..da95a00
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath.performance;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import edu.berkeley.cs.db.yfilterplus.queryparser.QueryParser;
+import edu.berkeley.cs.db.yfilterplus.queryparser.XPQueryParser;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMXMLParserWrapper;
+import org.apache.axiom.om.impl.llom.factory.OMXMLBuilderFactory;
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class XppXPath {
+
+    private Vector xPathExpressions = new Vector();
+    protected BufferedReader m_in = null;
+    private static final boolean DEBUG = false;
+    long total = 0;
+
+    public int[] getMatchedLinks(String message) {
+
+        return null;
+    }
+
+    public Vector checkQueries(String query) {
+        Vector result = new Vector();
+        Set xPathExpressionsSet = new HashSet(xPathExpressions);
+
+        int size = xPathExpressions.size();
+
+        for (int i = 0; i < size; i++) {
+
+            boolean match = query.equals(xPathExpressions.get(i));
+
+            // long start=System.nanoTime();
+            if (match) {
+
+                result.add(new Integer(i));
+
+            }
+            // long end=System.nanoTime();
+            // total+=(end-start);
+        }
+
+        // System.out.println("Total="+total);
+        return result;
+    }
+
+    public boolean checkQueriesVectorToSet(Set queries) {
+        // Vector result=new Vector();
+        Set xPathExpressionsSet = new HashSet(xPathExpressions);
+
+        int size = xPathExpressions.size();
+        Iterator iter = queries.iterator();
+        int counter = 0;
+        while (iter.hasNext()) {
+            Object query = iter.next();
+            for (int i = 0; i < size; i++) {
+
+                boolean match = query.equals(xPathExpressions.get(i));
+
+                // long start=System.nanoTime();
+                if (match) {
+
+                    return true;
+
+                }
+                // long end=System.nanoTime();
+                // total+=(end-start);
+            }
+            counter++;
+            // System.out.println("counter="+counter);
+        }
+        // System.out.println("Total="+total);
+        return false;
+    }
+
+    public boolean checkQueriesBySet(Set queries) {
+        Set xPathExpressionsSet = new HashSet(xPathExpressions);
+        Iterator iter = queries.iterator();
+        int counter = 0;
+        while (iter.hasNext()) {
+            if (xPathExpressionsSet.contains(iter.next())) {
+                // System.out.println("counter="+counter);
+                return true;
+            }
+            counter++;
+        }
+        return false;
+    }
+
+    public void addXPathExpressions(String xPathExpression) {
+        xPathExpressions.add(xPathExpression);
+    }
+
+    public void readQueriesFromFile(String queryFile) {
+        int noQueries = Integer.MAX_VALUE;
+        int qNum = 0;
+        // QueryParser qp = new XFQueryParser(queryFile);
+        QueryParser qp = new XPQueryParser(queryFile);
+        // Query query;
+        String queryString;
+        while (qNum < noQueries && ((queryString = qp.readNextQueryString()) != null)) {
+            if (DEBUG)
+                System.out.println(queryString);
+            addXPathExpressions(queryString);
+            qNum++;
+        }
+    }
+
+    public String getARandomQuery() {
+        int index = (int) (xPathExpressions.size() * (Math.random()));
+        return (String) xPathExpressions.get(index);
+    }
+
+    // From http://www.rgagnon.com/javadetails/java-0052.html
+    public static String readFile(String filename) throws IOException {
+        String lineSep = System.getProperty("line.separator");
+        BufferedReader br = new BufferedReader(new FileReader(filename));
+        String nextLine = "";
+        StringBuffer sb = new StringBuffer();
+        while ((nextLine = br.readLine()) != null) {
+            sb.append(nextLine);
+            //
+            // note:
+            // BufferedReader strips the EOL character.
+            //
+            sb.append(lineSep);
+        }
+        return sb.toString();
+    }
+
+    /**
+     * @param args
+     * @throws IOException
+     * @throws FactoryConfigurationError
+     * @throws XMLStreamException
+     */
+    public static void main(String[] args) throws IOException, XMLStreamException, FactoryConfigurationError {
+        String queryFile = "C:\\YiFile\\yfilter-1.0\\yfilter-1.0\\queries2.txt";
+        XppXPath xppXPath = new XppXPath();
+        xppXPath.readQueriesFromFile(queryFile);
+        long total = 0;
+        final int round = 100;
+        String message = readFile("c:\\YiFile\\testdata\\soap2.txt");
+
+        int messageStartPoint = message.indexOf('<');
+        String xpathList = message.substring(0, messageStartPoint);
+        System.out.println("XpathList=" + xpathList);
+        System.out.println("*****************************************");
+        long start0 = System.nanoTime();
+        StringTokenizer parser0 = new StringTokenizer(xpathList, ";");
+        Set xpathTokens = new HashSet();
+        while (parser0.hasMoreTokens()) {
+            xpathTokens.add(parser0.nextToken());
+        }
+        long end0 = System.nanoTime();
+        long total0 = (end0 - start0);
+        // for(int i=0;i<xpathTokens.size();i++){
+        // System.out.println((String)xpathTokens.get(i));
+        // }
+        System.out.println("Avg Time to token=" + (total0));
+        System.out.println("Total token=" + xpathTokens.size());
+        boolean result = false;
+        for (int i = 0; i < round; i++) {
+            String randomQuery = xppXPath.getARandomQuery();
+            // xpathTokens.add(randomQuery);
+            // Vector result=null;
+
+            long start = System.nanoTime();
+            // result=xppXPath.checkQueries(randomQuery);
+            result = xppXPath.checkQueriesBySet(xpathTokens);
+            // result=xppXPath.checkQueriesVectorToSet(xpathTokens);
+            long end = System.nanoTime();
+            total += (end - start);
+        }
+        System.out.println("Match result=" + result);
+        System.out.println("Avg Time for Checking=" + (total / round));
+
+        // XSUL
+        // long start=System.nanoTime();
+        // XmlElement messageEl = builder.parseFragmentFromReader(new
+        // StringReader(
+        // message));
+        // XmlElement messageIdEl= messageEl.element(null,
+        // "Header").element(null,"MessageID");
+        // String messageId=messageIdEl.requiredTextContent();
+        // System.out.println("MessageId="+messageId);
+        // long end=System.nanoTime();
+        // total=(end-start);
+        // System.out.println("Avg Time="+(total));
+
+        // AXIOM
+        // long start=System.nanoTime();
+        // create the parser
+        // XMLStreamReader parser =
+        // XMLInputFactory.newInstance().createXMLStreamReader(new
+        // FileReader("c:\\YiFile\\testdata\\soap_only.txt"));
+        // create the builder
+        String message1 = readFile("c:\\YiFile\\testdata\\soap_only.txt");
+        long start = System.nanoTime();
+        XMLStreamReader parser = XMLInputFactory.newInstance().createXMLStreamReader(
+                new ByteArrayInputStream(message1.getBytes()));
+
+        OMXMLParserWrapper builder = OMXMLBuilderFactory.createStAXSOAPModelBuilder(
+                OMAbstractFactory.getSOAP11Factory(), parser);
+        // get the root element (in this case the envelope)
+
+        SOAPEnvelope envelope = (SOAPEnvelope) builder.getDocumentElement();
+
+        // // create the parser
+        // XMLStreamReader parser =
+        // XMLInputFactory.newInstance().createXMLStreamReader(new
+        // FileReader("c:\\YiFile\\testdata\\soap.txt"));
+        // // create the builder
+        // OMXMLParserWrapper builder =
+        // OMXMLBuilderFactory.createStAXSOAPModelBuilder(OMAbstractFactory.getOMFactory(),parser);
+        // // get the root element (in this case the envelope)
+        // SOAPEnvelope envelope = (SOAPEnvelope)builder.getDocumentElement();
+        //
+        //
+        String messageIDString = envelope.getHeader().getFirstChildWithName(new QName(null, "MessageID")).getText();
+        long end = System.nanoTime();
+        total = (end - start);
+        System.out.println("Avg Time for Axiom=" + (total));
+        // OMElement headerEl=envelope.getHeader().getFirstChildWithName(new
+        // QName("http://schemas.xmlsoap.org/soap/envelope/", "Header"));
+        envelope.getHeader().getFirstChildWithName(new QName(null, "MessageID")).serialize(System.out);
+        System.out.println();
+        //
+        // headerEl.getFirstChildWithName(new QName(null, "MessageID"));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
new file mode 100644
index 0000000..c168a5f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.client.*;
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class NotificationManager implements ConsumerNotificationHandler {
+
+    private MessageBrokerClient client = null;
+    private String[] eprs = null;
+    private String brokerLocation = null;
+    private String protocol = null;
+    private int consumerServerPort = 0;
+    private LinkedList<String> subscriptionIds;
+    private int numberOfTopicSubscribed = 0;
+    private int numMultiThreadSupportPerSub = 0;
+    private int multipleThreadSupportIndex = 1;
+
+    public NotificationManager(String brokerLocationIn, int consumerServerPortIn, String protocolIn,
+            int numMultiThreadSupportPerSub) throws MsgBrokerClientException {
+
+        this.brokerLocation = brokerLocationIn;
+        this.consumerServerPort = consumerServerPortIn;
+        this.protocol = protocolIn;
+        this.numMultiThreadSupportPerSub = numMultiThreadSupportPerSub;
+
+        if (client == null) {
+            if (protocol.equalsIgnoreCase("wse")) {
+                WsntMsgBrokerClient wseClient = new WsntMsgBrokerClient();
+                wseClient.init(this.brokerLocation);
+                wseClient.setTimeoutInMilliSeconds(200000000);
+                eprs = wseClient.startConsumerService(consumerServerPort, this);
+                client = wseClient;
+            } else {
+                WsntMsgBrokerClient wsntClient = new WsntMsgBrokerClient();
+                wsntClient.init(this.brokerLocation);
+                wsntClient.setTimeoutInMilliSeconds(200000000);
+                eprs = wsntClient.startConsumerService(consumerServerPort, this);
+                client = wsntClient;
+            }
+        }
+
+        subscriptionIds = new LinkedList<String>();
+
+    }
+
+    public Subscription createTopicSubscription(String topic) throws Exception {
+
+        if (multipleThreadSupportIndex > numMultiThreadSupportPerSub) {
+            multipleThreadSupportIndex = 1;
+        }
+
+        String subscriptionId = client
+                .subscribe(brokerLocation, eprs[0] + "user" + multipleThreadSupportIndex++, topic);
+        subscriptionIds.add(subscriptionId);
+        Subscription subscription = new Subscription(client, subscriptionId, topic, this, brokerLocation, protocol);
+        return subscription;
+    }
+
+    public Subscription createXpathSubscription(String topicExpression, String xpathExpression) throws Exception {
+        if (multipleThreadSupportIndex > numMultiThreadSupportPerSub) {
+            multipleThreadSupportIndex = 1;
+        }
+
+        String subscriptionId = client.subscribe(eprs[0] + "user" + multipleThreadSupportIndex++, topicExpression,
+                xpathExpression);
+        subscriptionIds.add(subscriptionId);
+        Subscription subscription = new Subscription(client, subscriptionId, topicExpression, xpathExpression, this,
+                brokerLocation, protocol);
+        return subscription;
+    }
+
+    public void cleanup() throws MsgBrokerClientException {
+
+        WseMsgBrokerClient wseClient = null;
+        WsntMsgBrokerClient wsntClient = null;
+
+        if ("wse".equalsIgnoreCase(this.protocol)) {
+            wseClient = (WseMsgBrokerClient) client;
+        } else {
+            wsntClient = (WsntMsgBrokerClient) client;
+        }
+
+        if (subscriptionIds != null) {
+            if (wseClient != null) {
+                while (!subscriptionIds.isEmpty()) {
+                    String subId = subscriptionIds.remove();
+                    wseClient.unSubscribe(subId);
+                }
+            } else {
+                while (!subscriptionIds.isEmpty()) {
+                    String subId = subscriptionIds.remove();
+                    wsntClient.unSubscribe(subId);
+                }
+
+            }
+        }
+
+        if (client != null) {
+            client.shutdownConsumerService();
+        }
+    }
+
+    private BlockingQueue<StatContainer> queue = new LinkedBlockingQueue<StatContainer>();
+    private int numMsgsReceived = 0;
+
+    public void handleNotification(SOAPEnvelope msgEnvelope) {
+        queue.add(new StatContainer(msgEnvelope));
+        numMsgsReceived += 1;
+    }
+
+    public BlockingQueue<StatContainer> getQueue() {
+        return queue;
+    }
+
+    public int getNumberOfMsgsReceived() {
+        return numMsgsReceived;
+    }
+
+    public synchronized void incNoTopicsSubscribed() {
+        numberOfTopicSubscribed++;
+    }
+
+    public synchronized int getNoTopicsSubscribed() {
+        return numberOfTopicSubscribed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
new file mode 100644
index 0000000..4e90ec0
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
@@ -0,0 +1,399 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.ConfigKeys;
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.LoadMsgPayload;
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.LoadXpath;
+
+class Stat {
+    String name;
+    Object value;
+
+    public Stat(String k, Object v) {
+        name = k;
+        value = v;
+    }
+
+}
+
+public class PerformanceTest {
+
+    public static int NOTIFICATIONS_PUBLISHED_PER_TOPIC = 0;
+    static String payload = null;
+    static LinkedList<String> xpathList = null;
+    public static long totalRoundTripTime = 0;
+    private static long avgRountTripTime = 0;
+    public static BufferedWriter out = null;
+    public static Properties configurations = null;
+    public static long avgPublishRTTime = 0l;
+    private static long totalPublishRTT = 0l;
+    private static int notifPerTopic = 0;
+    private static int noTopicsPublished = 0;
+    private static String protocol = "";
+    private static int payLoadMultiplier = 1;
+    private static int consumerPort = 3345;
+    private static long testExpirationTime = 0l;
+    private static int numberOfSubscriber = 0;
+    private static int numMultiThreadsSupportPerSub = 0;
+    private static String topicPrefix = "";
+
+    public static void main(String[] args) throws Exception {
+        loadConfigurationsFromFile();
+        testPerformance();
+    }
+
+    public static void testPerformance() throws Exception {
+
+        setConfigurationValues();
+        File outfile = new File("performance.log");
+        CountDownLatch publiserhStartSignal = new CountDownLatch(1);
+        CountDownLatch publisherDoneSignal = new CountDownLatch(noTopicsPublished);
+        NotificationManager notifManagerArray[] = new NotificationManager[numberOfSubscriber];
+        StatCalculatorThread statCalcThread[] = new StatCalculatorThread[numberOfSubscriber];
+        setPayload(payLoadMultiplier);
+
+        for (int j = 0; j < numberOfSubscriber; j++) {
+            notifManagerArray[j] = new NotificationManager(configurations.getProperty(ConfigKeys.BROKER_URL),
+                    consumerPort + j, protocol, numMultiThreadsSupportPerSub);
+
+        }
+
+        // thread to calculate stats for notification manager
+        // set the subscriptions depending on the topic or xpath based
+        int arrayIndex = 0;
+        int totalReceivers = 0;
+        createSubscriberArray(noTopicsPublished, numberOfSubscriber, notifManagerArray, arrayIndex);
+        System.out.println("subscribing to topics completed, creating publisher threads");
+
+        // start publishers
+        PublisherThread[] publisher = new PublisherThread[noTopicsPublished];
+        createPublishers(noTopicsPublished, protocol, publiserhStartSignal, publisherDoneSignal, publisher);
+        System.out.println("sending signal to start publishing...");
+        long publisherStartTime = System.currentTimeMillis();
+        long startTime = System.currentTimeMillis();
+        publiserhStartSignal.countDown(); // let all threads proceed
+
+        for (int j = 0; j < numberOfSubscriber; j++) {
+            statCalcThread[j] = new StatCalculatorThread(notifManagerArray[j], testExpirationTime);
+            statCalcThread[j].start();
+        }
+
+        publisherDoneSignal.await(); // wait for all to finish
+
+        for (int j = 0; j < noTopicsPublished; j++) {
+            totalPublishRTT += publisher[j].getAvgPubTime();
+        }
+
+        avgPublishRTTime = totalPublishRTT / noTopicsPublished;
+        long publishersRunningTime = System.currentTimeMillis() - publisherStartTime;
+        System.out.println("finished publishing messgaes.");
+
+        for (StatCalculatorThread stats : statCalcThread) {
+            stats.join();
+        }
+
+        long stopTime = 0l;
+        long totNumberOfMessagesReceived = 0;
+
+        for (StatCalculatorThread stats : statCalcThread) {
+            stopTime = stopTime < stats.getLastMsgReceivedTime() ? stats.getLastMsgReceivedTime() : stopTime;
+            totalRoundTripTime += stats.getTotalTime();
+            totNumberOfMessagesReceived += stats.getNumberOfMsgReceived();
+        }
+
+        for (NotificationManager notifMngr : notifManagerArray) {
+            totalReceivers += notifMngr.getNoTopicsSubscribed();
+        }
+
+        avgRountTripTime = totalRoundTripTime / totNumberOfMessagesReceived;
+        long executionTime = stopTime - startTime;
+        double throughtput = (totNumberOfMessagesReceived * 1000) / (executionTime);
+
+        List<Stat> statistics = new ArrayList<Stat>();
+
+        statistics.add(new Stat("Payload size (bytes)", payload.getBytes("US-ASCII").length));
+        statistics.add(new Stat("Protocol", protocol));
+        statistics.add(new Stat("# total expected Msgs", totalReceivers * notifPerTopic));
+        statistics.add(new Stat("# total msgs received", totNumberOfMessagesReceived));
+        setStatList(notifPerTopic, noTopicsPublished, publishersRunningTime, executionTime, throughtput, statistics);
+        printStatistics(statistics, outfile);
+
+        for (NotificationManager notifMngr : notifManagerArray) {
+            notifMngr.cleanup();
+        }
+
+        System.out.println("end of test");
+        System.exit(0);
+    }
+
+    private static void setConfigurationValues() {
+        notifPerTopic = Integer.parseInt(configurations.getProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC));
+        noTopicsPublished = Integer.parseInt(configurations.getProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED));
+        protocol = configurations.getProperty(ConfigKeys.PROTOCOL);
+        payLoadMultiplier = Integer.parseInt(configurations.getProperty(ConfigKeys.PAYLOAD_MULTIPLYER));
+        consumerPort = Integer.parseInt(configurations.getProperty(ConfigKeys.CONSUMER_PORT));
+        testExpirationTime = Math.max(20000,
+                Long.parseLong(configurations.getProperty(ConfigKeys.PERFORMANCE_TEST_TIMEOUT, "20000")));
+        numberOfSubscriber = Integer.parseInt(configurations.getProperty(ConfigKeys.NUMBER_OF_SUBSCRIBERS));
+        numMultiThreadsSupportPerSub = Integer.parseInt(configurations.getProperty(ConfigKeys.MULTI_THREAD_PER_SUB));
+        topicPrefix = "topic" + configurations.getProperty(ConfigKeys.TOPIC_SIMPLE);
+        NOTIFICATIONS_PUBLISHED_PER_TOPIC = notifPerTopic;
+    }
+
+    private static void setStatList(int notifPerTopic, int noTopicsPublished, long publishersRunningTime,
+            long executionTime, double throughtput, List<Stat> statistics) {
+
+        statistics.add(new Stat("# topics published", noTopicsPublished));
+        statistics.add(new Stat("Total RTT (millis)", totalRoundTripTime));
+        statistics.add(new Stat("Average RTT (millis)", avgRountTripTime));
+        statistics.add(new Stat("Total published to receive time (millis)", executionTime));
+        statistics.add(new Stat("Throughput (messages per second)", throughtput));
+        statistics.add(new Stat("Total publish RTT (millis)", totalPublishRTT));
+        statistics.add(new Stat("Average publish RTT (millis)", avgPublishRTTime));
+        statistics.add(new Stat("publisher duration (millis)", publishersRunningTime));
+        statistics.add(new Stat("Publisher throughput (messages per second)", noTopicsPublished * notifPerTopic * 1000
+                / publishersRunningTime));
+    }
+
+    private static void setPayload(int payLoadMultiplier) {
+        String tempPayload = "";
+        try {
+            tempPayload = LoadMsgPayload.getInstance().getMessage("payload.txt");
+        } catch (FileNotFoundException e2) {
+            e2.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        for (int i = 1; i <= payLoadMultiplier; i++) {
+            payload += tempPayload;
+        }
+    }
+
+    private static void createPublishers(int noTopicsPublished, String protocol, CountDownLatch publiserhStartSignal,
+            CountDownLatch publisherDoneSignal, PublisherThread[] publisher) {
+        int threadId = 0;
+        for (int j = 0; j < noTopicsPublished; j++) {
+            threadId++;
+            publisher[j] = new PublisherThread(protocol, configurations.getProperty(ConfigKeys.BROKER_URL), topicPrefix
+                    + j, payload, publiserhStartSignal, publisherDoneSignal, threadId);
+            publisher[j].start();
+        }
+    }
+
+    private static void createSubscriberArray(int noTopicsPublished, int numberOfSubscriber,
+            NotificationManager[] notifManagerArray, int arrayIndex) throws Exception, IOException {
+        if ("false".equalsIgnoreCase(configurations.getProperty(ConfigKeys.IS_XPATH_ENABLED))) {
+            if (numberOfSubscriber <= noTopicsPublished) {
+                for (int i = 0; i < noTopicsPublished; ++i) {
+                    notifManagerArray[arrayIndex].createTopicSubscription(topicPrefix + i);
+                    notifManagerArray[arrayIndex++].incNoTopicsSubscribed();
+                    if (arrayIndex >= numberOfSubscriber) {
+                        arrayIndex = 0;
+                    }
+                }
+            } else {
+                int topicIndex = 0;
+                for (int i = 0; i < numberOfSubscriber; ++i) {
+                    notifManagerArray[i].createTopicSubscription(topicPrefix + topicIndex++);
+                    notifManagerArray[i].incNoTopicsSubscribed();
+                    if (topicIndex >= noTopicsPublished) {
+                        topicIndex = 0;
+                    }
+                }
+            }
+        } else {
+            xpathList = LoadXpath.getInstace().getXpathList("xpath.list");
+            if (numberOfSubscriber <= noTopicsPublished) {
+                Iterator<String> ite = xpathList.iterator();
+                for (int i = 0; i < noTopicsPublished; ++i) {
+                    if (!ite.hasNext())
+                        ite = xpathList.iterator();
+
+                    notifManagerArray[arrayIndex].createXpathSubscription(topicPrefix + i, ite.next());
+                    notifManagerArray[arrayIndex++].incNoTopicsSubscribed();
+                    if (arrayIndex >= numberOfSubscriber) {
+                        arrayIndex = 0;
+                    }
+                }
+            } else {
+                int topicIndex = 0;
+                for (int i = 0; i < numberOfSubscriber; ++i) {
+                    notifManagerArray[i].incNoTopicsSubscribed();
+                    if (topicIndex >= noTopicsPublished) {
+                        topicIndex = 0;
+                    }
+                }
+            }
+        }
+    }
+
+    private static Properties getDefaults() {
+        Properties defaults = new Properties();
+        defaults.setProperty(ConfigKeys.BROKER_URL, "http://localhost:8080/axis2/services/EventingService");
+        defaults.setProperty(ConfigKeys.TOPIC_SIMPLE, "simpleSampleTopic");
+        defaults.setProperty(ConfigKeys.CONSUMER_PORT, "6666");
+        defaults.setProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, "5");
+        defaults.setProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, "5");
+        defaults.setProperty(ConfigKeys.IS_XPATH_ENABLED, "false");
+        defaults.setProperty(ConfigKeys.XPATH, "/c/b/a");
+        defaults.setProperty(ConfigKeys.PAYLOAD_MULTIPLYER, "1");
+        defaults.setProperty(ConfigKeys.PROTOCOL, "wse");
+        defaults.setProperty(ConfigKeys.PUBLISH_TIME_INTERVAL, "10000");
+        defaults.setProperty(ConfigKeys.PERFORMANCE_TEST_TIMEOUT, "5000000");
+        defaults.setProperty(ConfigKeys.NUMBER_OF_SUBSCRIBERS, "1");
+        defaults.setProperty(ConfigKeys.MULTI_THREAD_PER_SUB, "50");
+        return defaults;
+    }
+
+    private static void printStatistics(List<Stat> stats, File aFile) throws IOException {
+        int maxLen = 0;
+        Writer output = new BufferedWriter(new FileWriter(aFile, true));
+
+        for (Stat stat : stats) {
+            maxLen = Math.max(maxLen, stat.name.length());
+        }
+
+        char[] fillchars = null;
+
+        for (Stat stat : stats) {
+            fillchars = new char[maxLen - stat.name.length() + 1];
+            Arrays.fill(fillchars, ' ');
+            String formattedStr = String.format("%s%s : %s", stat.name, new String(fillchars), stat.value.toString());
+            output.write(formattedStr + "\n");
+            System.out.println(formattedStr);
+        }
+
+        fillchars = new char[maxLen];
+        Arrays.fill(fillchars, '-');
+        String fillingString = new String(fillchars);
+        output.write(fillingString + "\n");
+        System.out.println(fillingString);
+        output.close();
+    }
+
+    public static void loadConfigurationsFromFile() {
+        configurations = new Properties(getDefaults());
+
+        try {
+            URL url = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+            if (url == null) {
+                throw new IOException("configuration file not found");
+            }
+            configurations.load(url.openStream());
+        } catch (IOException ioe) {
+            System.out.println("unable to load configuration file, default settings will be used");
+        }
+    }
+
+    // Not used, If required to run as a test case call it from main
+    public static void loadConfigurationsFromSystemEnv() {
+
+        configurations = new Properties(getDefaults());
+
+        Properties envConfigs = System.getProperties();
+        String brokerUrl = envConfigs.getProperty(ConfigKeys.BROKER_URL, null);
+        String consumerUrl = envConfigs.getProperty(ConfigKeys.CONSUMER_EPR, null);
+        String consumerPort = envConfigs.getProperty(ConfigKeys.CONSUMER_PORT, null);
+        String isXpathEnabled = envConfigs.getProperty(ConfigKeys.IS_XPATH_ENABLED, null);
+        String notifPerTopic = envConfigs.getProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, null);
+        String subsPerTopic = envConfigs.getProperty(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC, null);
+        String noTopicsPublished = envConfigs.getProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, null);
+        String payLoadMultiplier = envConfigs.getProperty(ConfigKeys.PAYLOAD_MULTIPLYER, null);
+        String protocol = envConfigs.getProperty(ConfigKeys.PROTOCOL, null);
+        String topicSimple = envConfigs.getProperty(ConfigKeys.TOPIC_SIMPLE, null);
+        String topicXpath = envConfigs.getProperty(ConfigKeys.XPATH, null);
+
+        if (brokerUrl == null) {
+            System.err.println(ConfigKeys.BROKER_URL + " not given");
+            System.exit(1);
+        }
+        if (consumerUrl == null) {
+            System.err.println(ConfigKeys.CONSUMER_EPR + " not given");
+            System.exit(1);
+        }
+        if (consumerPort == null) {
+            System.err.println(ConfigKeys.CONSUMER_PORT + " not given");
+            System.exit(1);
+        }
+        if (isXpathEnabled == null) {
+            System.err.println(ConfigKeys.IS_XPATH_ENABLED + " not given");
+            System.exit(1);
+        }
+        if (notifPerTopic == null) {
+            System.err.println(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC + " not given");
+            System.exit(1);
+        }
+        if (subsPerTopic == null) {
+            System.err.println(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC + " not given");
+            System.exit(1);
+        }
+        if (noTopicsPublished == null) {
+            System.err.println(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED + " not given");
+            System.exit(1);
+        }
+        if (payLoadMultiplier == null) {
+            System.err.println(ConfigKeys.PAYLOAD_MULTIPLYER + " not given");
+            System.exit(1);
+        }
+        if (protocol == null) {
+            System.err.println(ConfigKeys.PROTOCOL + " not given");
+            System.exit(1);
+        }
+        if (topicSimple == null) {
+            System.err.println(ConfigKeys.TOPIC_SIMPLE + " not given");
+            System.exit(1);
+        }
+        if (topicXpath == null) {
+            System.err.println(ConfigKeys.XPATH + " not given");
+            System.exit(1);
+        }
+
+        configurations.put(ConfigKeys.BROKER_URL, brokerUrl);
+        configurations.put(ConfigKeys.CONSUMER_EPR, consumerUrl);
+        configurations.put(ConfigKeys.CONSUMER_PORT, consumerPort);
+        configurations.put(ConfigKeys.IS_XPATH_ENABLED, isXpathEnabled);
+        configurations.put(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, notifPerTopic);
+        configurations.put(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC, subsPerTopic);
+        configurations.put(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, noTopicsPublished);
+        configurations.put(ConfigKeys.PAYLOAD_MULTIPLYER, payLoadMultiplier);
+        configurations.put(ConfigKeys.PROTOCOL, protocol);
+        configurations.put(ConfigKeys.TOPIC_SIMPLE, topicSimple);
+        configurations.put(ConfigKeys.XPATH, topicXpath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
new file mode 100644
index 0000000..4f1215b
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.airavata.wsmg.client.*;
+
+public class PublisherThread extends Thread {
+    private String brokerURL;
+    private String topic;
+    private final CountDownLatch startSignal;
+    private final CountDownLatch doneSignal;
+    private long totPublishTime = 0l;
+    long avgPublishTime = 0l;
+
+    private String payload = "";
+    String msg = "";
+    private MessageBrokerClient client = null;
+    int trackId = 0;
+    int threadId = 0;
+
+    public PublisherThread(String protocolIn, String brokerURLIn, String topicIn, String payloadIn,
+            CountDownLatch startSignalIn, CountDownLatch doneSignalIn, int threadIdIn) {
+        this.payload = payloadIn;
+        this.brokerURL = brokerURLIn;
+        this.topic = topicIn;
+        this.startSignal = startSignalIn;
+        this.doneSignal = doneSignalIn;
+        this.threadId = threadIdIn;
+        if ("wse".equalsIgnoreCase(protocolIn)) {
+
+            WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+            wseMsgBrokerClient.setTimeoutInMilliSeconds(0);
+            wseMsgBrokerClient.init(brokerURL);
+            client = wseMsgBrokerClient;
+
+        } else {
+
+            WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
+            wsntMsgBrokerClient.setTimeoutInMilliSeconds(0);
+            wsntMsgBrokerClient.init(brokerURL);
+            client = wsntMsgBrokerClient;
+        }
+
+    }
+
+    public void run() {
+
+        try {
+            trackId = 1;
+            startSignal.await();
+            System.out.println("Publishing started for topic :" + this.topic);
+            for (int i = 0; i < PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC; i++) {
+                msg = "<perf:performancetest xmlns:perf=\"http://lead.extreme.indiana.edu/namespaces/performance\"><perf:time>"
+                        + System.currentTimeMillis()
+                        + "</perf:time><perf:trackInfo><perf:threadId>"
+                        + threadId
+                        + "</perf:threadId><perf:trackId>"
+                        + trackId
+                        + "</perf:trackId></perf:trackInfo>"
+                        + "<perf:payload>" + payload + "</perf:payload></perf:performancetest>";
+                long publishStartTime = System.currentTimeMillis();
+                client.publish(topic, msg);
+                totPublishTime += System.currentTimeMillis() - publishStartTime;
+                trackId++;
+            }
+
+            avgPublishTime = totPublishTime / PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC;
+            System.out.println("Publishing ended for topic :" + this.topic);
+            doneSignal.countDown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    synchronized long getAvgPubTime() {
+        return this.avgPublishTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
new file mode 100644
index 0000000..541d80b
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.concurrent.TimeUnit;
+
+public class StatCalculatorThread extends Thread {
+
+    private NotificationManager notifManager = null;
+    private long lastMsgReceivedTime = 0l;
+    private long timeTot = 0;
+    private long avgTime = 0;
+    private int numberOfMssgsReceived = 0; // to avoid concurrency
+    private long timeOutMillis;
+    private int expectedNoMessages = 0;
+
+    public StatCalculatorThread(NotificationManager notificationManager, long timeOutInMillis) throws Exception {
+        this.timeOutMillis = timeOutInMillis;
+        this.notifManager = notificationManager;
+        expectedNoMessages = PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC
+                * notificationManager.getNoTopicsSubscribed();
+    }
+
+    @Override
+    public void run() {
+        do {
+
+            StatContainer container = null;
+            try {
+                container = notifManager.getQueue().poll(timeOutMillis, TimeUnit.MILLISECONDS);
+
+                if (container != null) {
+                    timeTot += container.getRondTripTime();
+                    lastMsgReceivedTime = container.getMessageReceivedTime();
+                    numberOfMssgsReceived++;
+                    // ******un-comment in order to log trakId and message
+                    // related other information*****
+                    // if (logger.isInfoEnabled()) {
+                    // trackInfo = env
+                    // .getBody()
+                    // .getFirstElement()
+                    // .getFirstChildWithName(
+                    // new QName(
+                    // "http://lead.extreme.indiana.edu/namespaces/performance",
+                    // "trackInfo")).toStringWithConsume();
+                    // logger.info(trackInfo + "   Send time :" + time
+                    // + "  Received time :" + System.currentTimeMillis());
+                    // }
+                } else {
+                    System.out.println("stat calculator thread was interrupted");
+                    break;
+                }
+            } catch (InterruptedException e1) {
+                e1.printStackTrace();
+                break;
+            }
+
+        } while (expectedNoMessages > numberOfMssgsReceived);
+
+        if (numberOfMssgsReceived > 0) {
+            avgTime = timeTot / numberOfMssgsReceived;
+        } else {
+            System.out.println("no messages received");
+        }
+
+        System.out.println("end of stat calculator");
+    }
+
+    synchronized long getTotalTime() {
+        return timeTot;
+    }
+
+    synchronized long getAverageTime() {
+        return avgTime;
+    }
+
+    synchronized long getNumberOfMsgReceived() {
+        return numberOfMssgsReceived;
+    }
+
+    synchronized public long getLastMsgReceivedTime() {
+        return lastMsgReceivedTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
new file mode 100644
index 0000000..31b9c85
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class StatContainer {
+    private SOAPEnvelope msgEnvelope = null;
+    private long rtt = 0l;
+    private long receivedTime = 0l;
+
+    public StatContainer(SOAPEnvelope msgEnvelope) {
+        this.msgEnvelope = msgEnvelope;
+        this.receivedTime = System.currentTimeMillis();
+        this.rtt = this.receivedTime
+                - Long.parseLong(msgEnvelope.getBody().getFirstElement().getFirstElement().getText());
+    }
+
+    public long getRondTripTime() {
+        return this.rtt;
+    }
+
+    public SOAPEnvelope getMsgEnvelope() {
+        return msgEnvelope;
+    }
+
+    public long getMessageReceivedTime() {
+        return this.receivedTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
new file mode 100644
index 0000000..d8df04f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.MessageBrokerClient;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Subscription {
+
+    private String subscriptionID;
+
+    private String topic;
+    private final static Logger logger = LoggerFactory.getLogger(Subscription.class);
+    private ConsumerNotificationHandler handler;
+    private MessageBrokerClient client;
+    private EndpointReference messageBoxEPR;
+    private String xpath;
+    private String brokerURL;
+
+    private String protocol;
+
+    public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic,
+            ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
+        super();
+        this.subscriptionID = subscriptionID;
+        this.topic = topic;
+        this.handler = callback;
+        this.brokerURL = brokerURL;
+        this.client = clientIn;
+        this.protocol = protocolIn;
+    }
+
+    public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic, String xpath,
+            ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
+        super();
+        this.client = clientIn;
+        this.subscriptionID = subscriptionID;
+        this.topic = topic;
+        this.handler = callback;
+        this.brokerURL = brokerURL;
+        this.xpath = xpath;
+        this.protocol = protocolIn;
+    }
+
+    public ConsumerNotificationHandler getCallback() {
+        return handler;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    // public void destroy() throws RemoteException {
+    // client.shutdownConsumerService();
+    // }
+
+    public EndpointReference getMessageBoxEPR() {
+        return messageBoxEPR;
+    }
+
+    public void setMessageBoxEpr(EndpointReference messageBoxEPR) {
+        this.messageBoxEPR = messageBoxEPR;
+    }
+
+    public String getSubscriptionID() {
+        return subscriptionID;
+    }
+
+    public void setSubscriptionID(String subscriptionID) {
+        this.subscriptionID = subscriptionID;
+    }
+
+    public String getBrokerURL() {
+        return brokerURL;
+    }
+
+    public void setBrokerURL(String brokerURL) {
+        this.brokerURL = brokerURL;
+    }
+
+    // public String getConsumerEPR() throws UnknownHostException {
+    // cli
+    // }
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
new file mode 100644
index 0000000..6463f31
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+public interface ConfigKeys {
+    String CONFIG_FILE_NAME = "configurations.properties";
+    String BROKER_URL = "broker.eventing.service.epr";
+    String CONSUMER_EPR = "consumer.location";
+    String CONSUMER_PORT = "consumer.port";
+    String TOPIC_SIMPLE = "topic.simple";
+    String TOPIC_XPATH = "topic.xpath";
+    String PUBLISH_TIME_INTERVAL = "publish.time.interval";
+    String IS_XPATH_ENABLED = "is.xpath.enabled";
+    String XPATH = "topic.xpath";
+    String PAYLOAD_MULTIPLYER = "payload.multiplyer";
+    String PROTOCOL = "protocol.used";
+    String NUMBER_OF_SUBS_PERTOPIC = "num.subscribers.per.topic";
+    String NOTIFICATIONS_PUBLISHED_PER_TOPIC = "notifications.per.topic";
+    String NUMBER_OF_TOPICS_PUBLISHED = "number.of.topics";
+    String SCHEDULER_REPEAT_PERIOD = "stat.timeout.monitor.scheduler.period";
+    String PERFORMANCE_TEST_TIMEOUT = "performance.test.timeout.period.millis";
+    String NUMBER_OF_SUBSCRIBERS = "number.of.subscriber.servers";
+    String MULTI_THREAD_PER_SUB = "num.muti.thread.per.sub";
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
new file mode 100644
index 0000000..8ed4cb6
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+
+public class LoadMsgPayload {
+
+    private static LoadMsgPayload msg = null;
+
+    public static LoadMsgPayload getInstance() {
+        if (msg == null)
+            msg = new LoadMsgPayload();
+        return msg;
+    }
+
+    public String getMessage(String fileName) throws IOException {
+        URL url = ClassLoader.getSystemResource(fileName);
+        if (url != null)
+            return convertStreamToString(url.openStream());
+        else
+            return "";
+    }
+
+    private String convertStreamToString(InputStream is) throws IOException {
+        if (is != null) {
+            StringBuilder sb = new StringBuilder();
+            String line;
+
+            try {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                while ((line = reader.readLine()) != null) {
+                    sb.append(line).append("\n");
+                }
+            } finally {
+                is.close();
+            }
+            return sb.toString();
+        } else {
+            return "";
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
new file mode 100644
index 0000000..1a1a5fa
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.LinkedList;
+
+public class LoadXpath {
+    private static LoadXpath xpath = null;
+    LinkedList<String> xpathList = null;
+
+    public static LoadXpath getInstace() {
+        if (xpath == null)
+            xpath = new LoadXpath();
+        return xpath;
+    }
+
+    public LinkedList<String> getXpathList(String fileName) throws IOException {
+        URL url = ClassLoader.getSystemResource(fileName);
+        if (url != null && xpathList == null)
+            return convertStreamToString(url.openStream());
+        return xpathList;
+    }
+
+    private LinkedList<String> convertStreamToString(InputStream is) throws IOException {
+        if (is != null) {
+            xpathList = new LinkedList<String>();
+            String line;
+
+            try {
+                BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+                while ((line = reader.readLine()) != null) {
+                    xpathList.add(line);
+                }
+            } finally {
+                is.close();
+            }
+            return xpathList;
+        } else {
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
new file mode 100644
index 0000000..e0162e7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BrokerUtilTest extends TestCase {
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @Before
+    public void setUp() throws Exception {
+    }
+
+    /**
+     * @throws java.lang.Exception
+     */
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    /**
+     * Test method for
+     * {@link org.apache.airavata.wsmg.util.BrokerUtil#sameStringValue(java.lang.String, java.lang.String)} .
+     */
+    @Test
+    public void testSameStringValue() {
+
+        assertTrue(BrokerUtil.sameStringValue(null, null));
+        assertTrue(BrokerUtil.sameStringValue("test", "test"));
+
+        assertFalse(BrokerUtil.sameStringValue("one", "two"));
+        assertFalse(BrokerUtil.sameStringValue(null, "test"));
+        assertFalse(BrokerUtil.sameStringValue("test", null));
+
+    }
+
+    /**
+     * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getTopicLocalString(java.lang.String)}.
+     */
+    @Test
+    public void testGetTopicLocalString() {
+
+        assertEquals("localstring", (BrokerUtil.getTopicLocalString("prefix:localstring")));
+
+        assertEquals("localstring", BrokerUtil.getTopicLocalString("localstring"));
+
+    }
+
+    /**
+     * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getXPathString(org.apache.axiom.om.OMElement)}.
+     */
+    @Test
+    public void testGetXPathString() {
+
+        OMFactory factory = OMAbstractFactory.getOMFactory();
+
+        try {
+
+            BrokerUtil.getXPathString(null);
+            fail("method should validate invalid arguments");
+        } catch (IllegalArgumentException e) {
+
+        } catch (AxisFault e) {
+            fail("invalid exception thrown");
+        }
+
+        try {
+
+            QName invalidQName = new QName("invalidURI", "invalidLocalName");
+
+            OMElement xpathEl = factory.createOMElement(invalidQName);
+
+            BrokerUtil.getXPathString(xpathEl);
+
+            fail("method should validate arguments");
+
+        } catch (AxisFault fault) {
+
+        }
+
+        try {
+
+            String xpathExpression = "testXpathExpression";
+            String dialect = "unknownXpathDialect";
+
+            OMNamespace ns = factory.createOMNamespace("unit_test", "jnt");
+
+            OMElement xpathEl = factory.createOMElement("TestXpath", ns);
+            xpathEl.addAttribute("Dialect", dialect, null);
+
+            xpathEl.setText(xpathExpression);
+
+            BrokerUtil.getXPathString(xpathEl);
+
+            fail("method should reject unknown dialect");
+        } catch (AxisFault e) {
+
+        }
+
+        try {
+
+            String xpathExpression = "textXpathExpression";
+            String dialect = WsmgCommonConstants.XPATH_DIALECT;
+
+            OMNamespace ns = factory.createOMNamespace("unit_test", "jnt");
+
+            OMElement xpathEl = factory.createOMElement("TestXpath", ns);
+            xpathEl.addAttribute("Dialect", dialect, null);
+
+            xpathEl.setText(xpathExpression);
+            assertEquals(xpathExpression, BrokerUtil.getXPathString(xpathEl));
+
+        } catch (AxisFault e) {
+            fail("unable to extract xpath query: " + e.toString());
+        }
+
+    }
+
+    /**
+     * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getTopicFromRequestPath(java.lang.String)}.
+     */
+    @Test
+    public void testGetTopicFromRequestPath() {
+
+        assertNull(BrokerUtil.getTopicFromRequestPath(null));
+        assertNull(BrokerUtil.getTopicFromRequestPath(""));
+        assertNull(BrokerUtil.getTopicFromRequestPath("/"));
+        assertNull(BrokerUtil.getTopicFromRequestPath("/subscribe/url/"));
+        assertNull(BrokerUtil.getTopicFromRequestPath("/subscribe/url/topic/"));
+
+        assertEquals(BrokerUtil.getTopicFromRequestPath("/requestpath/topic/xyz"), "xyz");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
new file mode 100644
index 0000000..0624442
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public interface ConfigKeys {
+
+    String CONFIG_FILE_NAME = "unit_tests.properties";
+
+    String CONSUMER_EPR = "consumer.location";
+    String CONSUMER_PORT = "consumer.port";
+    String TOPIC_SIMPLE = "topic.simple";
+    String TOPIC_XPATH = "topic.xpath";
+    String AXIS2_REPO = "axis2.repo";
+
+}