You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:24 UTC
[82/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
new file mode 100644
index 0000000..098dd49
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/commons/TestCommonRoutines.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.commons;
+
+import java.util.Date;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCommonRoutines extends TestCase {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ /**
+ * Test method for {@link org.apache.airavata.wsmg.commons.CommonRoutines#getXsdDateTime(java.util.Date)}.
+ */
+ @Test
+ public void testGetXsdDateTime() {
+ assertNotNull(CommonRoutines.getXsdDateTime(new Date()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
new file mode 100644
index 0000000..6f1ecbb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestAddtionalWseXpathAndTopicScenarios.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestAddtionalWseXpathAndTopicScenarios extends TestCase {
+
+ static Properties configs = new Properties();
+
+ class NotificationReciever implements ConsumerNotificationHandler {
+
+ private BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+ private String id;
+
+ public NotificationReciever(String id) {
+ this.id = id;
+ }
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+ queue.add(msgEnvelope);
+ System.out.println(String.format("[reciever id: %s] %s", id, msgEnvelope));
+ }
+
+ public BlockingQueue<SOAPEnvelope> getMsgQueue() {
+ return queue;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+
+ TestUtilServer.start(null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testXpathAndTopicOnlyRoundTrip() {
+
+ try {
+
+ String topic = "RoundTripTestXpathAndTopicWse";
+
+ String xpathExpression = "/c/b/a[text()=1]";
+
+ String msgFormat = "<c><b><a>%d</a></b></c>";
+
+ long value = 1;
+ String matchingMsg = String.format(msgFormat, value);
+ String unmatchingMsg = String.format(msgFormat, value + 1);
+
+ int consumerPort = TestUtilServer.getAvailablePort();
+
+ String brokerEpr = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+
+ WseMsgBrokerClient topicOnlyReceiverApi = new WseMsgBrokerClient();
+ topicOnlyReceiverApi.init(brokerEpr);
+ NotificationReciever topicOnlyMsgReceiver = new NotificationReciever("Topic Only");
+ String[] topicConsumerEPRs = topicOnlyReceiverApi.startConsumerService(consumerPort, topicOnlyMsgReceiver);
+ assertTrue("invalid consumer eprs returned", topicConsumerEPRs.length > 0);
+ String topicOnlySubId = topicOnlyReceiverApi.subscribe(topicConsumerEPRs[0], topic, null);
+ System.out.println("Topic only subscription ID: " + topicOnlySubId);
+
+ WseMsgBrokerClient xpathAndTopicReceiverApi = new WseMsgBrokerClient();
+ xpathAndTopicReceiverApi.init(brokerEpr);
+ NotificationReciever topicAndXpathMsgReceiver = new NotificationReciever("Topic And Xpath");
+ String[] topicAndXpathConsumerEPRs = xpathAndTopicReceiverApi.startConsumerService(consumerPort + 1,
+ topicAndXpathMsgReceiver);
+ assertTrue("invalid consumer eprs returned", topicAndXpathConsumerEPRs.length > 0);
+ String topicAndXpathSubId = xpathAndTopicReceiverApi.subscribe(topicAndXpathConsumerEPRs[0], topic,
+ xpathExpression);
+ System.out.println("Xpath and Topic subscription ID: " + topicAndXpathSubId);
+
+ WseMsgBrokerClient senderApi = new WseMsgBrokerClient();
+ senderApi.init(brokerEpr);
+
+ try {
+
+ senderApi.publish(topic, AXIOMUtil.stringToOM(matchingMsg));
+ senderApi.publish(topic, AXIOMUtil.stringToOM(unmatchingMsg));
+
+ Thread.sleep(5000);
+
+ assertTrue("topic only reciever should get all messages" + topicOnlyMsgReceiver.getMsgQueue().size(),
+ topicOnlyMsgReceiver.getMsgQueue().size() == 2);
+
+ assertTrue("xpath and topic reciever should only get one message"
+ + topicAndXpathMsgReceiver.getMsgQueue().size(),
+ topicAndXpathMsgReceiver.getMsgQueue().size() == 1);
+ } catch (XMLStreamException x) {
+ fail("Error while creating OMElement");
+ } catch (InterruptedException e) {
+ fail("interrupted while waiting for message");
+ }
+
+ topicOnlyReceiverApi.unSubscribe(topicOnlySubId);
+ topicOnlyReceiverApi.shutdownConsumerService();
+
+ xpathAndTopicReceiverApi.unSubscribe(topicAndXpathSubId);
+ xpathAndTopicReceiverApi.shutdownConsumerService();
+
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ fail("unexpected exception occured");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
new file mode 100644
index 0000000..024b5d3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathAndTopicSubscription.java
@@ -0,0 +1,137 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWseXpathAndTopicSubscription extends TestCase implements ConsumerNotificationHandler {
+
+ static Properties configs = new Properties();
+
+ BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+
+ queue.add(msgEnvelope);
+ System.out.println(msgEnvelope);
+ }
+
+ BlockingQueue<SOAPEnvelope> getMsgQueue() {
+ return queue;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+
+ TestUtilServer.start(null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testXpathOnlyRoundTrip() {
+
+ try {
+
+ String validMsgFormat = "<c><b><a> %d </a></b></c>";
+ String invalidMsgFormat = "<a><b><c> %d </c></b></a>";
+
+ long value = System.currentTimeMillis();
+ String validMsg = String.format(validMsgFormat, value);
+ String invalidMsg = String.format(invalidMsgFormat, value);
+
+ int consumerPort = TestUtilServer.getAvailablePort();
+
+ String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+ WseMsgBrokerClient msgBrokerClient = new WseMsgBrokerClient();
+ msgBrokerClient.init(brokerEPR);
+
+ String[] consumerEPRs = msgBrokerClient.startConsumerService(consumerPort, this);
+
+ assertTrue(consumerEPRs.length > 0);
+
+ String xpathExpression = "/c/b/a";
+
+ String subscriptionID = msgBrokerClient.subscribe(consumerEPRs[0], null, xpathExpression);
+
+ try {
+ msgBrokerClient.publish(null, AXIOMUtil.stringToOM(validMsg));
+ msgBrokerClient.publish(null, AXIOMUtil.stringToOM(invalidMsg));
+
+ SOAPEnvelope env = getMsgQueue().take();
+
+ assertNotNull(env.getBody());
+ assertNotNull(env.getBody().getChildrenWithLocalName("c"));
+
+ OMElement element = (OMElement) env.getBody().getChildrenWithLocalName("c").next();
+
+ String text = element.toStringWithConsume();
+
+ assertTrue("round trip of message failed" + " - due to invalid messege content",
+ text.indexOf(new Long(value).toString()) > 0);
+
+ Thread.sleep(5000);
+
+ assertTrue("unexpected msg recieved", getMsgQueue().isEmpty());
+
+ } catch (InterruptedException e) {
+ fail("interrupted while waiting for message");
+ } catch (XMLStreamException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ fail("invalid xml recieved: " + e.getMessage());
+ }
+
+ msgBrokerClient.unSubscribe(subscriptionID);
+ msgBrokerClient.shutdownConsumerService();
+
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ fail("unexpected exception occured");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
new file mode 100644
index 0000000..097e6b7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/TestWseXpathSubscription.java
@@ -0,0 +1,139 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath;
+
+import java.net.URL;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.stream.XMLStreamException;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.ConfigKeys;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestWseXpathSubscription extends TestCase implements ConsumerNotificationHandler {
+
+ static Properties configs = new Properties();
+
+ BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+
+ queue.add(msgEnvelope);
+ System.out.println(msgEnvelope);
+ }
+
+ BlockingQueue<SOAPEnvelope> getMsgQueue() {
+ return queue;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ configs.load(configURL.openStream());
+
+ TestUtilServer.start(null, null);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public final void testSimpleXpathTopicRoundTrip() {
+
+ try {
+
+ String validMsgFormat = "<c><b><a> %d </a></b></c>";
+ String invalidMsgFormat = "<a><b><c> %d </c></b></a>";
+
+ long value = System.currentTimeMillis();
+ String validMsg = String.format(validMsgFormat, value);
+ String invalidMsg = String.format(invalidMsgFormat, value);
+
+ int consumerPort = TestUtilServer.getAvailablePort();
+
+ String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.init(brokerEPR);
+
+ String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
+
+ assertTrue(consumerEPRs.length > 0);
+
+ String xpathExpression = "/c/b/a";
+ String topicExpression = "XpathAndTopicTestWse";
+
+ String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topicExpression, xpathExpression);
+
+ try {
+
+ wseMsgBrokerClient.publish(topicExpression, AXIOMUtil.stringToOM(validMsg));
+ wseMsgBrokerClient.publish(topicExpression, AXIOMUtil.stringToOM(invalidMsg));
+
+ SOAPEnvelope env = getMsgQueue().take();
+
+ assertNotNull(env.getBody());
+ assertNotNull(env.getBody().getChildrenWithLocalName("c"));
+
+ OMElement element = (OMElement) env.getBody().getChildrenWithLocalName("c").next();
+
+ String text = element.toStringWithConsume();
+
+ assertTrue("round trip of message failed" + " - due to invalid messege content",
+ text.indexOf(new Long(value).toString()) > 0);
+
+ Thread.sleep(5000);
+
+ assertTrue("unexpected msg recieved", getMsgQueue().isEmpty());
+
+ } catch (InterruptedException e) {
+ fail("interrupted while waiting for message");
+ } catch (XMLStreamException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ fail("invalid xml recieved: " + e.getMessage());
+ }
+
+ wseMsgBrokerClient.unSubscribe(subscriptionID);
+ wseMsgBrokerClient.shutdownConsumerService();
+
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ fail("unexpected exception occured");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
new file mode 100644
index 0000000..da95a00
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/matching/XPath/performance/XppXPath.java
@@ -0,0 +1,269 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.matching.XPath.performance;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.Vector;
+
+import javax.xml.namespace.QName;
+import javax.xml.stream.FactoryConfigurationError;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+
+import edu.berkeley.cs.db.yfilterplus.queryparser.QueryParser;
+import edu.berkeley.cs.db.yfilterplus.queryparser.XPQueryParser;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMXMLParserWrapper;
+import org.apache.axiom.om.impl.llom.factory.OMXMLBuilderFactory;
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class XppXPath {
+
+ private Vector xPathExpressions = new Vector();
+ protected BufferedReader m_in = null;
+ private static final boolean DEBUG = false;
+ long total = 0;
+
+ public int[] getMatchedLinks(String message) {
+
+ return null;
+ }
+
+ public Vector checkQueries(String query) {
+ Vector result = new Vector();
+ Set xPathExpressionsSet = new HashSet(xPathExpressions);
+
+ int size = xPathExpressions.size();
+
+ for (int i = 0; i < size; i++) {
+
+ boolean match = query.equals(xPathExpressions.get(i));
+
+ // long start=System.nanoTime();
+ if (match) {
+
+ result.add(new Integer(i));
+
+ }
+ // long end=System.nanoTime();
+ // total+=(end-start);
+ }
+
+ // System.out.println("Total="+total);
+ return result;
+ }
+
+ public boolean checkQueriesVectorToSet(Set queries) {
+ // Vector result=new Vector();
+ Set xPathExpressionsSet = new HashSet(xPathExpressions);
+
+ int size = xPathExpressions.size();
+ Iterator iter = queries.iterator();
+ int counter = 0;
+ while (iter.hasNext()) {
+ Object query = iter.next();
+ for (int i = 0; i < size; i++) {
+
+ boolean match = query.equals(xPathExpressions.get(i));
+
+ // long start=System.nanoTime();
+ if (match) {
+
+ return true;
+
+ }
+ // long end=System.nanoTime();
+ // total+=(end-start);
+ }
+ counter++;
+ // System.out.println("counter="+counter);
+ }
+ // System.out.println("Total="+total);
+ return false;
+ }
+
+ public boolean checkQueriesBySet(Set queries) {
+ Set xPathExpressionsSet = new HashSet(xPathExpressions);
+ Iterator iter = queries.iterator();
+ int counter = 0;
+ while (iter.hasNext()) {
+ if (xPathExpressionsSet.contains(iter.next())) {
+ // System.out.println("counter="+counter);
+ return true;
+ }
+ counter++;
+ }
+ return false;
+ }
+
+ public void addXPathExpressions(String xPathExpression) {
+ xPathExpressions.add(xPathExpression);
+ }
+
+ public void readQueriesFromFile(String queryFile) {
+ int noQueries = Integer.MAX_VALUE;
+ int qNum = 0;
+ // QueryParser qp = new XFQueryParser(queryFile);
+ QueryParser qp = new XPQueryParser(queryFile);
+ // Query query;
+ String queryString;
+ while (qNum < noQueries && ((queryString = qp.readNextQueryString()) != null)) {
+ if (DEBUG)
+ System.out.println(queryString);
+ addXPathExpressions(queryString);
+ qNum++;
+ }
+ }
+
+ public String getARandomQuery() {
+ int index = (int) (xPathExpressions.size() * (Math.random()));
+ return (String) xPathExpressions.get(index);
+ }
+
+ // From http://www.rgagnon.com/javadetails/java-0052.html
+ public static String readFile(String filename) throws IOException {
+ String lineSep = System.getProperty("line.separator");
+ BufferedReader br = new BufferedReader(new FileReader(filename));
+ String nextLine = "";
+ StringBuffer sb = new StringBuffer();
+ while ((nextLine = br.readLine()) != null) {
+ sb.append(nextLine);
+ //
+ // note:
+ // BufferedReader strips the EOL character.
+ //
+ sb.append(lineSep);
+ }
+ return sb.toString();
+ }
+
+ /**
+ * @param args
+ * @throws IOException
+ * @throws FactoryConfigurationError
+ * @throws XMLStreamException
+ */
+ public static void main(String[] args) throws IOException, XMLStreamException, FactoryConfigurationError {
+ String queryFile = "C:\\YiFile\\yfilter-1.0\\yfilter-1.0\\queries2.txt";
+ XppXPath xppXPath = new XppXPath();
+ xppXPath.readQueriesFromFile(queryFile);
+ long total = 0;
+ final int round = 100;
+ String message = readFile("c:\\YiFile\\testdata\\soap2.txt");
+
+ int messageStartPoint = message.indexOf('<');
+ String xpathList = message.substring(0, messageStartPoint);
+ System.out.println("XpathList=" + xpathList);
+ System.out.println("*****************************************");
+ long start0 = System.nanoTime();
+ StringTokenizer parser0 = new StringTokenizer(xpathList, ";");
+ Set xpathTokens = new HashSet();
+ while (parser0.hasMoreTokens()) {
+ xpathTokens.add(parser0.nextToken());
+ }
+ long end0 = System.nanoTime();
+ long total0 = (end0 - start0);
+ // for(int i=0;i<xpathTokens.size();i++){
+ // System.out.println((String)xpathTokens.get(i));
+ // }
+ System.out.println("Avg Time to token=" + (total0));
+ System.out.println("Total token=" + xpathTokens.size());
+ boolean result = false;
+ for (int i = 0; i < round; i++) {
+ String randomQuery = xppXPath.getARandomQuery();
+ // xpathTokens.add(randomQuery);
+ // Vector result=null;
+
+ long start = System.nanoTime();
+ // result=xppXPath.checkQueries(randomQuery);
+ result = xppXPath.checkQueriesBySet(xpathTokens);
+ // result=xppXPath.checkQueriesVectorToSet(xpathTokens);
+ long end = System.nanoTime();
+ total += (end - start);
+ }
+ System.out.println("Match result=" + result);
+ System.out.println("Avg Time for Checking=" + (total / round));
+
+ // XSUL
+ // long start=System.nanoTime();
+ // XmlElement messageEl = builder.parseFragmentFromReader(new
+ // StringReader(
+ // message));
+ // XmlElement messageIdEl= messageEl.element(null,
+ // "Header").element(null,"MessageID");
+ // String messageId=messageIdEl.requiredTextContent();
+ // System.out.println("MessageId="+messageId);
+ // long end=System.nanoTime();
+ // total=(end-start);
+ // System.out.println("Avg Time="+(total));
+
+ // AXIOM
+ // long start=System.nanoTime();
+ // create the parser
+ // XMLStreamReader parser =
+ // XMLInputFactory.newInstance().createXMLStreamReader(new
+ // FileReader("c:\\YiFile\\testdata\\soap_only.txt"));
+ // create the builder
+ String message1 = readFile("c:\\YiFile\\testdata\\soap_only.txt");
+ long start = System.nanoTime();
+ XMLStreamReader parser = XMLInputFactory.newInstance().createXMLStreamReader(
+ new ByteArrayInputStream(message1.getBytes()));
+
+ OMXMLParserWrapper builder = OMXMLBuilderFactory.createStAXSOAPModelBuilder(
+ OMAbstractFactory.getSOAP11Factory(), parser);
+ // get the root element (in this case the envelope)
+
+ SOAPEnvelope envelope = (SOAPEnvelope) builder.getDocumentElement();
+
+ // // create the parser
+ // XMLStreamReader parser =
+ // XMLInputFactory.newInstance().createXMLStreamReader(new
+ // FileReader("c:\\YiFile\\testdata\\soap.txt"));
+ // // create the builder
+ // OMXMLParserWrapper builder =
+ // OMXMLBuilderFactory.createStAXSOAPModelBuilder(OMAbstractFactory.getOMFactory(),parser);
+ // // get the root element (in this case the envelope)
+ // SOAPEnvelope envelope = (SOAPEnvelope)builder.getDocumentElement();
+ //
+ //
+ String messageIDString = envelope.getHeader().getFirstChildWithName(new QName(null, "MessageID")).getText();
+ long end = System.nanoTime();
+ total = (end - start);
+ System.out.println("Avg Time for Axiom=" + (total));
+ // OMElement headerEl=envelope.getHeader().getFirstChildWithName(new
+ // QName("http://schemas.xmlsoap.org/soap/envelope/", "Header"));
+ envelope.getHeader().getFirstChildWithName(new QName(null, "MessageID")).serialize(System.out);
+ System.out.println();
+ //
+ // headerEl.getFirstChildWithName(new QName(null, "MessageID"));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
new file mode 100644
index 0000000..c168a5f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/NotificationManager.java
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.LinkedList;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.client.*;
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class NotificationManager implements ConsumerNotificationHandler {
+
+ private MessageBrokerClient client = null;
+ private String[] eprs = null;
+ private String brokerLocation = null;
+ private String protocol = null;
+ private int consumerServerPort = 0;
+ private LinkedList<String> subscriptionIds;
+ private int numberOfTopicSubscribed = 0;
+ private int numMultiThreadSupportPerSub = 0;
+ private int multipleThreadSupportIndex = 1;
+
+ public NotificationManager(String brokerLocationIn, int consumerServerPortIn, String protocolIn,
+ int numMultiThreadSupportPerSub) throws MsgBrokerClientException {
+
+ this.brokerLocation = brokerLocationIn;
+ this.consumerServerPort = consumerServerPortIn;
+ this.protocol = protocolIn;
+ this.numMultiThreadSupportPerSub = numMultiThreadSupportPerSub;
+
+ if (client == null) {
+ if (protocol.equalsIgnoreCase("wse")) {
+ WsntMsgBrokerClient wseClient = new WsntMsgBrokerClient();
+ wseClient.init(this.brokerLocation);
+ wseClient.setTimeoutInMilliSeconds(200000000);
+ eprs = wseClient.startConsumerService(consumerServerPort, this);
+ client = wseClient;
+ } else {
+ WsntMsgBrokerClient wsntClient = new WsntMsgBrokerClient();
+ wsntClient.init(this.brokerLocation);
+ wsntClient.setTimeoutInMilliSeconds(200000000);
+ eprs = wsntClient.startConsumerService(consumerServerPort, this);
+ client = wsntClient;
+ }
+ }
+
+ subscriptionIds = new LinkedList<String>();
+
+ }
+
+ public Subscription createTopicSubscription(String topic) throws Exception {
+
+ if (multipleThreadSupportIndex > numMultiThreadSupportPerSub) {
+ multipleThreadSupportIndex = 1;
+ }
+
+ String subscriptionId = client
+ .subscribe(brokerLocation, eprs[0] + "user" + multipleThreadSupportIndex++, topic);
+ subscriptionIds.add(subscriptionId);
+ Subscription subscription = new Subscription(client, subscriptionId, topic, this, brokerLocation, protocol);
+ return subscription;
+ }
+
+ public Subscription createXpathSubscription(String topicExpression, String xpathExpression) throws Exception {
+ if (multipleThreadSupportIndex > numMultiThreadSupportPerSub) {
+ multipleThreadSupportIndex = 1;
+ }
+
+ String subscriptionId = client.subscribe(eprs[0] + "user" + multipleThreadSupportIndex++, topicExpression,
+ xpathExpression);
+ subscriptionIds.add(subscriptionId);
+ Subscription subscription = new Subscription(client, subscriptionId, topicExpression, xpathExpression, this,
+ brokerLocation, protocol);
+ return subscription;
+ }
+
+ public void cleanup() throws MsgBrokerClientException {
+
+ WseMsgBrokerClient wseClient = null;
+ WsntMsgBrokerClient wsntClient = null;
+
+ if ("wse".equalsIgnoreCase(this.protocol)) {
+ wseClient = (WseMsgBrokerClient) client;
+ } else {
+ wsntClient = (WsntMsgBrokerClient) client;
+ }
+
+ if (subscriptionIds != null) {
+ if (wseClient != null) {
+ while (!subscriptionIds.isEmpty()) {
+ String subId = subscriptionIds.remove();
+ wseClient.unSubscribe(subId);
+ }
+ } else {
+ while (!subscriptionIds.isEmpty()) {
+ String subId = subscriptionIds.remove();
+ wsntClient.unSubscribe(subId);
+ }
+
+ }
+ }
+
+ if (client != null) {
+ client.shutdownConsumerService();
+ }
+ }
+
+ private BlockingQueue<StatContainer> queue = new LinkedBlockingQueue<StatContainer>();
+ private int numMsgsReceived = 0;
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+ queue.add(new StatContainer(msgEnvelope));
+ numMsgsReceived += 1;
+ }
+
+ public BlockingQueue<StatContainer> getQueue() {
+ return queue;
+ }
+
+ public int getNumberOfMsgsReceived() {
+ return numMsgsReceived;
+ }
+
+ public synchronized void incNoTopicsSubscribed() {
+ numberOfTopicSubscribed++;
+ }
+
+ public synchronized int getNoTopicsSubscribed() {
+ return numberOfTopicSubscribed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
new file mode 100644
index 0000000..4e90ec0
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PerformanceTest.java
@@ -0,0 +1,399 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.ConfigKeys;
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.LoadMsgPayload;
+import org.apache.airavata.wsmg.performance_evaluator.rtt.util.LoadXpath;
+
+class Stat {
+ String name;
+ Object value;
+
+ public Stat(String k, Object v) {
+ name = k;
+ value = v;
+ }
+
+}
+
+public class PerformanceTest {
+
+ public static int NOTIFICATIONS_PUBLISHED_PER_TOPIC = 0;
+ static String payload = null;
+ static LinkedList<String> xpathList = null;
+ public static long totalRoundTripTime = 0;
+ private static long avgRountTripTime = 0;
+ public static BufferedWriter out = null;
+ public static Properties configurations = null;
+ public static long avgPublishRTTime = 0l;
+ private static long totalPublishRTT = 0l;
+ private static int notifPerTopic = 0;
+ private static int noTopicsPublished = 0;
+ private static String protocol = "";
+ private static int payLoadMultiplier = 1;
+ private static int consumerPort = 3345;
+ private static long testExpirationTime = 0l;
+ private static int numberOfSubscriber = 0;
+ private static int numMultiThreadsSupportPerSub = 0;
+ private static String topicPrefix = "";
+
+ public static void main(String[] args) throws Exception {
+ loadConfigurationsFromFile();
+ testPerformance();
+ }
+
+ public static void testPerformance() throws Exception {
+
+ setConfigurationValues();
+ File outfile = new File("performance.log");
+ CountDownLatch publiserhStartSignal = new CountDownLatch(1);
+ CountDownLatch publisherDoneSignal = new CountDownLatch(noTopicsPublished);
+ NotificationManager notifManagerArray[] = new NotificationManager[numberOfSubscriber];
+ StatCalculatorThread statCalcThread[] = new StatCalculatorThread[numberOfSubscriber];
+ setPayload(payLoadMultiplier);
+
+ for (int j = 0; j < numberOfSubscriber; j++) {
+ notifManagerArray[j] = new NotificationManager(configurations.getProperty(ConfigKeys.BROKER_URL),
+ consumerPort + j, protocol, numMultiThreadsSupportPerSub);
+
+ }
+
+ // thread to calculate stats for notification manager
+ // set the subscriptions depending on the topic or xpath based
+ int arrayIndex = 0;
+ int totalReceivers = 0;
+ createSubscriberArray(noTopicsPublished, numberOfSubscriber, notifManagerArray, arrayIndex);
+ System.out.println("subscribing to topics completed, creating publisher threads");
+
+ // start publishers
+ PublisherThread[] publisher = new PublisherThread[noTopicsPublished];
+ createPublishers(noTopicsPublished, protocol, publiserhStartSignal, publisherDoneSignal, publisher);
+ System.out.println("sending signal to start publishing...");
+ long publisherStartTime = System.currentTimeMillis();
+ long startTime = System.currentTimeMillis();
+ publiserhStartSignal.countDown(); // let all threads proceed
+
+ for (int j = 0; j < numberOfSubscriber; j++) {
+ statCalcThread[j] = new StatCalculatorThread(notifManagerArray[j], testExpirationTime);
+ statCalcThread[j].start();
+ }
+
+ publisherDoneSignal.await(); // wait for all to finish
+
+ for (int j = 0; j < noTopicsPublished; j++) {
+ totalPublishRTT += publisher[j].getAvgPubTime();
+ }
+
+ avgPublishRTTime = totalPublishRTT / noTopicsPublished;
+ long publishersRunningTime = System.currentTimeMillis() - publisherStartTime;
+ System.out.println("finished publishing messgaes.");
+
+ for (StatCalculatorThread stats : statCalcThread) {
+ stats.join();
+ }
+
+ long stopTime = 0l;
+ long totNumberOfMessagesReceived = 0;
+
+ for (StatCalculatorThread stats : statCalcThread) {
+ stopTime = stopTime < stats.getLastMsgReceivedTime() ? stats.getLastMsgReceivedTime() : stopTime;
+ totalRoundTripTime += stats.getTotalTime();
+ totNumberOfMessagesReceived += stats.getNumberOfMsgReceived();
+ }
+
+ for (NotificationManager notifMngr : notifManagerArray) {
+ totalReceivers += notifMngr.getNoTopicsSubscribed();
+ }
+
+ avgRountTripTime = totalRoundTripTime / totNumberOfMessagesReceived;
+ long executionTime = stopTime - startTime;
+ double throughtput = (totNumberOfMessagesReceived * 1000) / (executionTime);
+
+ List<Stat> statistics = new ArrayList<Stat>();
+
+ statistics.add(new Stat("Payload size (bytes)", payload.getBytes("US-ASCII").length));
+ statistics.add(new Stat("Protocol", protocol));
+ statistics.add(new Stat("# total expected Msgs", totalReceivers * notifPerTopic));
+ statistics.add(new Stat("# total msgs received", totNumberOfMessagesReceived));
+ setStatList(notifPerTopic, noTopicsPublished, publishersRunningTime, executionTime, throughtput, statistics);
+ printStatistics(statistics, outfile);
+
+ for (NotificationManager notifMngr : notifManagerArray) {
+ notifMngr.cleanup();
+ }
+
+ System.out.println("end of test");
+ System.exit(0);
+ }
+
+ private static void setConfigurationValues() {
+ notifPerTopic = Integer.parseInt(configurations.getProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC));
+ noTopicsPublished = Integer.parseInt(configurations.getProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED));
+ protocol = configurations.getProperty(ConfigKeys.PROTOCOL);
+ payLoadMultiplier = Integer.parseInt(configurations.getProperty(ConfigKeys.PAYLOAD_MULTIPLYER));
+ consumerPort = Integer.parseInt(configurations.getProperty(ConfigKeys.CONSUMER_PORT));
+ testExpirationTime = Math.max(20000,
+ Long.parseLong(configurations.getProperty(ConfigKeys.PERFORMANCE_TEST_TIMEOUT, "20000")));
+ numberOfSubscriber = Integer.parseInt(configurations.getProperty(ConfigKeys.NUMBER_OF_SUBSCRIBERS));
+ numMultiThreadsSupportPerSub = Integer.parseInt(configurations.getProperty(ConfigKeys.MULTI_THREAD_PER_SUB));
+ topicPrefix = "topic" + configurations.getProperty(ConfigKeys.TOPIC_SIMPLE);
+ NOTIFICATIONS_PUBLISHED_PER_TOPIC = notifPerTopic;
+ }
+
+ private static void setStatList(int notifPerTopic, int noTopicsPublished, long publishersRunningTime,
+ long executionTime, double throughtput, List<Stat> statistics) {
+
+ statistics.add(new Stat("# topics published", noTopicsPublished));
+ statistics.add(new Stat("Total RTT (millis)", totalRoundTripTime));
+ statistics.add(new Stat("Average RTT (millis)", avgRountTripTime));
+ statistics.add(new Stat("Total published to receive time (millis)", executionTime));
+ statistics.add(new Stat("Throughput (messages per second)", throughtput));
+ statistics.add(new Stat("Total publish RTT (millis)", totalPublishRTT));
+ statistics.add(new Stat("Average publish RTT (millis)", avgPublishRTTime));
+ statistics.add(new Stat("publisher duration (millis)", publishersRunningTime));
+ statistics.add(new Stat("Publisher throughput (messages per second)", noTopicsPublished * notifPerTopic * 1000
+ / publishersRunningTime));
+ }
+
+ private static void setPayload(int payLoadMultiplier) {
+ String tempPayload = "";
+ try {
+ tempPayload = LoadMsgPayload.getInstance().getMessage("payload.txt");
+ } catch (FileNotFoundException e2) {
+ e2.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ for (int i = 1; i <= payLoadMultiplier; i++) {
+ payload += tempPayload;
+ }
+ }
+
+ private static void createPublishers(int noTopicsPublished, String protocol, CountDownLatch publiserhStartSignal,
+ CountDownLatch publisherDoneSignal, PublisherThread[] publisher) {
+ int threadId = 0;
+ for (int j = 0; j < noTopicsPublished; j++) {
+ threadId++;
+ publisher[j] = new PublisherThread(protocol, configurations.getProperty(ConfigKeys.BROKER_URL), topicPrefix
+ + j, payload, publiserhStartSignal, publisherDoneSignal, threadId);
+ publisher[j].start();
+ }
+ }
+
+ private static void createSubscriberArray(int noTopicsPublished, int numberOfSubscriber,
+ NotificationManager[] notifManagerArray, int arrayIndex) throws Exception, IOException {
+ if ("false".equalsIgnoreCase(configurations.getProperty(ConfigKeys.IS_XPATH_ENABLED))) {
+ if (numberOfSubscriber <= noTopicsPublished) {
+ for (int i = 0; i < noTopicsPublished; ++i) {
+ notifManagerArray[arrayIndex].createTopicSubscription(topicPrefix + i);
+ notifManagerArray[arrayIndex++].incNoTopicsSubscribed();
+ if (arrayIndex >= numberOfSubscriber) {
+ arrayIndex = 0;
+ }
+ }
+ } else {
+ int topicIndex = 0;
+ for (int i = 0; i < numberOfSubscriber; ++i) {
+ notifManagerArray[i].createTopicSubscription(topicPrefix + topicIndex++);
+ notifManagerArray[i].incNoTopicsSubscribed();
+ if (topicIndex >= noTopicsPublished) {
+ topicIndex = 0;
+ }
+ }
+ }
+ } else {
+ xpathList = LoadXpath.getInstace().getXpathList("xpath.list");
+ if (numberOfSubscriber <= noTopicsPublished) {
+ Iterator<String> ite = xpathList.iterator();
+ for (int i = 0; i < noTopicsPublished; ++i) {
+ if (!ite.hasNext())
+ ite = xpathList.iterator();
+
+ notifManagerArray[arrayIndex].createXpathSubscription(topicPrefix + i, ite.next());
+ notifManagerArray[arrayIndex++].incNoTopicsSubscribed();
+ if (arrayIndex >= numberOfSubscriber) {
+ arrayIndex = 0;
+ }
+ }
+ } else {
+ int topicIndex = 0;
+ for (int i = 0; i < numberOfSubscriber; ++i) {
+ notifManagerArray[i].incNoTopicsSubscribed();
+ if (topicIndex >= noTopicsPublished) {
+ topicIndex = 0;
+ }
+ }
+ }
+ }
+ }
+
+ private static Properties getDefaults() {
+ Properties defaults = new Properties();
+ defaults.setProperty(ConfigKeys.BROKER_URL, "http://localhost:8080/axis2/services/EventingService");
+ defaults.setProperty(ConfigKeys.TOPIC_SIMPLE, "simpleSampleTopic");
+ defaults.setProperty(ConfigKeys.CONSUMER_PORT, "6666");
+ defaults.setProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, "5");
+ defaults.setProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, "5");
+ defaults.setProperty(ConfigKeys.IS_XPATH_ENABLED, "false");
+ defaults.setProperty(ConfigKeys.XPATH, "/c/b/a");
+ defaults.setProperty(ConfigKeys.PAYLOAD_MULTIPLYER, "1");
+ defaults.setProperty(ConfigKeys.PROTOCOL, "wse");
+ defaults.setProperty(ConfigKeys.PUBLISH_TIME_INTERVAL, "10000");
+ defaults.setProperty(ConfigKeys.PERFORMANCE_TEST_TIMEOUT, "5000000");
+ defaults.setProperty(ConfigKeys.NUMBER_OF_SUBSCRIBERS, "1");
+ defaults.setProperty(ConfigKeys.MULTI_THREAD_PER_SUB, "50");
+ return defaults;
+ }
+
+ private static void printStatistics(List<Stat> stats, File aFile) throws IOException {
+ int maxLen = 0;
+ Writer output = new BufferedWriter(new FileWriter(aFile, true));
+
+ for (Stat stat : stats) {
+ maxLen = Math.max(maxLen, stat.name.length());
+ }
+
+ char[] fillchars = null;
+
+ for (Stat stat : stats) {
+ fillchars = new char[maxLen - stat.name.length() + 1];
+ Arrays.fill(fillchars, ' ');
+ String formattedStr = String.format("%s%s : %s", stat.name, new String(fillchars), stat.value.toString());
+ output.write(formattedStr + "\n");
+ System.out.println(formattedStr);
+ }
+
+ fillchars = new char[maxLen];
+ Arrays.fill(fillchars, '-');
+ String fillingString = new String(fillchars);
+ output.write(fillingString + "\n");
+ System.out.println(fillingString);
+ output.close();
+ }
+
+ public static void loadConfigurationsFromFile() {
+ configurations = new Properties(getDefaults());
+
+ try {
+ URL url = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
+ if (url == null) {
+ throw new IOException("configuration file not found");
+ }
+ configurations.load(url.openStream());
+ } catch (IOException ioe) {
+ System.out.println("unable to load configuration file, default settings will be used");
+ }
+ }
+
+ // Not used, If required to run as a test case call it from main
+ public static void loadConfigurationsFromSystemEnv() {
+
+ configurations = new Properties(getDefaults());
+
+ Properties envConfigs = System.getProperties();
+ String brokerUrl = envConfigs.getProperty(ConfigKeys.BROKER_URL, null);
+ String consumerUrl = envConfigs.getProperty(ConfigKeys.CONSUMER_EPR, null);
+ String consumerPort = envConfigs.getProperty(ConfigKeys.CONSUMER_PORT, null);
+ String isXpathEnabled = envConfigs.getProperty(ConfigKeys.IS_XPATH_ENABLED, null);
+ String notifPerTopic = envConfigs.getProperty(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, null);
+ String subsPerTopic = envConfigs.getProperty(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC, null);
+ String noTopicsPublished = envConfigs.getProperty(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, null);
+ String payLoadMultiplier = envConfigs.getProperty(ConfigKeys.PAYLOAD_MULTIPLYER, null);
+ String protocol = envConfigs.getProperty(ConfigKeys.PROTOCOL, null);
+ String topicSimple = envConfigs.getProperty(ConfigKeys.TOPIC_SIMPLE, null);
+ String topicXpath = envConfigs.getProperty(ConfigKeys.XPATH, null);
+
+ if (brokerUrl == null) {
+ System.err.println(ConfigKeys.BROKER_URL + " not given");
+ System.exit(1);
+ }
+ if (consumerUrl == null) {
+ System.err.println(ConfigKeys.CONSUMER_EPR + " not given");
+ System.exit(1);
+ }
+ if (consumerPort == null) {
+ System.err.println(ConfigKeys.CONSUMER_PORT + " not given");
+ System.exit(1);
+ }
+ if (isXpathEnabled == null) {
+ System.err.println(ConfigKeys.IS_XPATH_ENABLED + " not given");
+ System.exit(1);
+ }
+ if (notifPerTopic == null) {
+ System.err.println(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC + " not given");
+ System.exit(1);
+ }
+ if (subsPerTopic == null) {
+ System.err.println(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC + " not given");
+ System.exit(1);
+ }
+ if (noTopicsPublished == null) {
+ System.err.println(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED + " not given");
+ System.exit(1);
+ }
+ if (payLoadMultiplier == null) {
+ System.err.println(ConfigKeys.PAYLOAD_MULTIPLYER + " not given");
+ System.exit(1);
+ }
+ if (protocol == null) {
+ System.err.println(ConfigKeys.PROTOCOL + " not given");
+ System.exit(1);
+ }
+ if (topicSimple == null) {
+ System.err.println(ConfigKeys.TOPIC_SIMPLE + " not given");
+ System.exit(1);
+ }
+ if (topicXpath == null) {
+ System.err.println(ConfigKeys.XPATH + " not given");
+ System.exit(1);
+ }
+
+ configurations.put(ConfigKeys.BROKER_URL, brokerUrl);
+ configurations.put(ConfigKeys.CONSUMER_EPR, consumerUrl);
+ configurations.put(ConfigKeys.CONSUMER_PORT, consumerPort);
+ configurations.put(ConfigKeys.IS_XPATH_ENABLED, isXpathEnabled);
+ configurations.put(ConfigKeys.NOTIFICATIONS_PUBLISHED_PER_TOPIC, notifPerTopic);
+ configurations.put(ConfigKeys.NUMBER_OF_SUBS_PERTOPIC, subsPerTopic);
+ configurations.put(ConfigKeys.NUMBER_OF_TOPICS_PUBLISHED, noTopicsPublished);
+ configurations.put(ConfigKeys.PAYLOAD_MULTIPLYER, payLoadMultiplier);
+ configurations.put(ConfigKeys.PROTOCOL, protocol);
+ configurations.put(ConfigKeys.TOPIC_SIMPLE, topicSimple);
+ configurations.put(ConfigKeys.XPATH, topicXpath);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
new file mode 100644
index 0000000..4f1215b
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/PublisherThread.java
@@ -0,0 +1,99 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.airavata.wsmg.client.*;
+
+public class PublisherThread extends Thread {
+ private String brokerURL;
+ private String topic;
+ private final CountDownLatch startSignal;
+ private final CountDownLatch doneSignal;
+ private long totPublishTime = 0l;
+ long avgPublishTime = 0l;
+
+ private String payload = "";
+ String msg = "";
+ private MessageBrokerClient client = null;
+ int trackId = 0;
+ int threadId = 0;
+
+ public PublisherThread(String protocolIn, String brokerURLIn, String topicIn, String payloadIn,
+ CountDownLatch startSignalIn, CountDownLatch doneSignalIn, int threadIdIn) {
+ this.payload = payloadIn;
+ this.brokerURL = brokerURLIn;
+ this.topic = topicIn;
+ this.startSignal = startSignalIn;
+ this.doneSignal = doneSignalIn;
+ this.threadId = threadIdIn;
+ if ("wse".equalsIgnoreCase(protocolIn)) {
+
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.setTimeoutInMilliSeconds(0);
+ wseMsgBrokerClient.init(brokerURL);
+ client = wseMsgBrokerClient;
+
+ } else {
+
+ WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
+ wsntMsgBrokerClient.setTimeoutInMilliSeconds(0);
+ wsntMsgBrokerClient.init(brokerURL);
+ client = wsntMsgBrokerClient;
+ }
+
+ }
+
+ public void run() {
+
+ try {
+ trackId = 1;
+ startSignal.await();
+ System.out.println("Publishing started for topic :" + this.topic);
+ for (int i = 0; i < PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC; i++) {
+ msg = "<perf:performancetest xmlns:perf=\"http://lead.extreme.indiana.edu/namespaces/performance\"><perf:time>"
+ + System.currentTimeMillis()
+ + "</perf:time><perf:trackInfo><perf:threadId>"
+ + threadId
+ + "</perf:threadId><perf:trackId>"
+ + trackId
+ + "</perf:trackId></perf:trackInfo>"
+ + "<perf:payload>" + payload + "</perf:payload></perf:performancetest>";
+ long publishStartTime = System.currentTimeMillis();
+ client.publish(topic, msg);
+ totPublishTime += System.currentTimeMillis() - publishStartTime;
+ trackId++;
+ }
+
+ avgPublishTime = totPublishTime / PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC;
+ System.out.println("Publishing ended for topic :" + this.topic);
+ doneSignal.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ synchronized long getAvgPubTime() {
+ return this.avgPublishTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
new file mode 100644
index 0000000..541d80b
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatCalculatorThread.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import java.util.concurrent.TimeUnit;
+
+public class StatCalculatorThread extends Thread {
+
+ private NotificationManager notifManager = null;
+ private long lastMsgReceivedTime = 0l;
+ private long timeTot = 0;
+ private long avgTime = 0;
+ private int numberOfMssgsReceived = 0; // to avoid concurrency
+ private long timeOutMillis;
+ private int expectedNoMessages = 0;
+
+ public StatCalculatorThread(NotificationManager notificationManager, long timeOutInMillis) throws Exception {
+ this.timeOutMillis = timeOutInMillis;
+ this.notifManager = notificationManager;
+ expectedNoMessages = PerformanceTest.NOTIFICATIONS_PUBLISHED_PER_TOPIC
+ * notificationManager.getNoTopicsSubscribed();
+ }
+
+ @Override
+ public void run() {
+ do {
+
+ StatContainer container = null;
+ try {
+ container = notifManager.getQueue().poll(timeOutMillis, TimeUnit.MILLISECONDS);
+
+ if (container != null) {
+ timeTot += container.getRondTripTime();
+ lastMsgReceivedTime = container.getMessageReceivedTime();
+ numberOfMssgsReceived++;
+ // ******un-comment in order to log trakId and message
+ // related other information*****
+ // if (logger.isInfoEnabled()) {
+ // trackInfo = env
+ // .getBody()
+ // .getFirstElement()
+ // .getFirstChildWithName(
+ // new QName(
+ // "http://lead.extreme.indiana.edu/namespaces/performance",
+ // "trackInfo")).toStringWithConsume();
+ // logger.info(trackInfo + " Send time :" + time
+ // + " Received time :" + System.currentTimeMillis());
+ // }
+ } else {
+ System.out.println("stat calculator thread was interrupted");
+ break;
+ }
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ break;
+ }
+
+ } while (expectedNoMessages > numberOfMssgsReceived);
+
+ if (numberOfMssgsReceived > 0) {
+ avgTime = timeTot / numberOfMssgsReceived;
+ } else {
+ System.out.println("no messages received");
+ }
+
+ System.out.println("end of stat calculator");
+ }
+
+ synchronized long getTotalTime() {
+ return timeTot;
+ }
+
+ synchronized long getAverageTime() {
+ return avgTime;
+ }
+
+ synchronized long getNumberOfMsgReceived() {
+ return numberOfMssgsReceived;
+ }
+
+ synchronized public long getLastMsgReceivedTime() {
+ return lastMsgReceivedTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
new file mode 100644
index 0000000..31b9c85
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/StatContainer.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import org.apache.axiom.soap.SOAPEnvelope;
+
+public class StatContainer {
+ private SOAPEnvelope msgEnvelope = null;
+ private long rtt = 0l;
+ private long receivedTime = 0l;
+
+ public StatContainer(SOAPEnvelope msgEnvelope) {
+ this.msgEnvelope = msgEnvelope;
+ this.receivedTime = System.currentTimeMillis();
+ this.rtt = this.receivedTime
+ - Long.parseLong(msgEnvelope.getBody().getFirstElement().getFirstElement().getText());
+ }
+
+ public long getRondTripTime() {
+ return this.rtt;
+ }
+
+ public SOAPEnvelope getMsgEnvelope() {
+ return msgEnvelope;
+ }
+
+ public long getMessageReceivedTime() {
+ return this.receivedTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
new file mode 100644
index 0000000..d8df04f
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/Subscription.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.MessageBrokerClient;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Subscription {
+
+ private String subscriptionID;
+
+ private String topic;
+ private final static Logger logger = LoggerFactory.getLogger(Subscription.class);
+ private ConsumerNotificationHandler handler;
+ private MessageBrokerClient client;
+ private EndpointReference messageBoxEPR;
+ private String xpath;
+ private String brokerURL;
+
+ private String protocol;
+
+ public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic,
+ ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
+ super();
+ this.subscriptionID = subscriptionID;
+ this.topic = topic;
+ this.handler = callback;
+ this.brokerURL = brokerURL;
+ this.client = clientIn;
+ this.protocol = protocolIn;
+ }
+
+ public Subscription(MessageBrokerClient clientIn, String subscriptionID, String topic, String xpath,
+ ConsumerNotificationHandler callback, String brokerURL, String protocolIn) {
+ super();
+ this.client = clientIn;
+ this.subscriptionID = subscriptionID;
+ this.topic = topic;
+ this.handler = callback;
+ this.brokerURL = brokerURL;
+ this.xpath = xpath;
+ this.protocol = protocolIn;
+ }
+
+ public ConsumerNotificationHandler getCallback() {
+ return handler;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ // public void destroy() throws RemoteException {
+ // client.shutdownConsumerService();
+ // }
+
+ public EndpointReference getMessageBoxEPR() {
+ return messageBoxEPR;
+ }
+
+ public void setMessageBoxEpr(EndpointReference messageBoxEPR) {
+ this.messageBoxEPR = messageBoxEPR;
+ }
+
+ public String getSubscriptionID() {
+ return subscriptionID;
+ }
+
+ public void setSubscriptionID(String subscriptionID) {
+ this.subscriptionID = subscriptionID;
+ }
+
+ public String getBrokerURL() {
+ return brokerURL;
+ }
+
+ public void setBrokerURL(String brokerURL) {
+ this.brokerURL = brokerURL;
+ }
+
+ // public String getConsumerEPR() throws UnknownHostException {
+ // cli
+ // }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
new file mode 100644
index 0000000..6463f31
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/ConfigKeys.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+public interface ConfigKeys {
+ String CONFIG_FILE_NAME = "configurations.properties";
+ String BROKER_URL = "broker.eventing.service.epr";
+ String CONSUMER_EPR = "consumer.location";
+ String CONSUMER_PORT = "consumer.port";
+ String TOPIC_SIMPLE = "topic.simple";
+ String TOPIC_XPATH = "topic.xpath";
+ String PUBLISH_TIME_INTERVAL = "publish.time.interval";
+ String IS_XPATH_ENABLED = "is.xpath.enabled";
+ String XPATH = "topic.xpath";
+ String PAYLOAD_MULTIPLYER = "payload.multiplyer";
+ String PROTOCOL = "protocol.used";
+ String NUMBER_OF_SUBS_PERTOPIC = "num.subscribers.per.topic";
+ String NOTIFICATIONS_PUBLISHED_PER_TOPIC = "notifications.per.topic";
+ String NUMBER_OF_TOPICS_PUBLISHED = "number.of.topics";
+ String SCHEDULER_REPEAT_PERIOD = "stat.timeout.monitor.scheduler.period";
+ String PERFORMANCE_TEST_TIMEOUT = "performance.test.timeout.period.millis";
+ String NUMBER_OF_SUBSCRIBERS = "number.of.subscriber.servers";
+ String MULTI_THREAD_PER_SUB = "num.muti.thread.per.sub";
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
new file mode 100644
index 0000000..8ed4cb6
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadMsgPayload.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+
+public class LoadMsgPayload {
+
+ private static LoadMsgPayload msg = null;
+
+ public static LoadMsgPayload getInstance() {
+ if (msg == null)
+ msg = new LoadMsgPayload();
+ return msg;
+ }
+
+ public String getMessage(String fileName) throws IOException {
+ URL url = ClassLoader.getSystemResource(fileName);
+ if (url != null)
+ return convertStreamToString(url.openStream());
+ else
+ return "";
+ }
+
+ private String convertStreamToString(InputStream is) throws IOException {
+ if (is != null) {
+ StringBuilder sb = new StringBuilder();
+ String line;
+
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ while ((line = reader.readLine()) != null) {
+ sb.append(line).append("\n");
+ }
+ } finally {
+ is.close();
+ }
+ return sb.toString();
+ } else {
+ return "";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
new file mode 100644
index 0000000..1a1a5fa
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/performance_evaluator/rtt/util/LoadXpath.java
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.performance_evaluator.rtt.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.LinkedList;
+
+public class LoadXpath {
+ private static LoadXpath xpath = null;
+ LinkedList<String> xpathList = null;
+
+ public static LoadXpath getInstace() {
+ if (xpath == null)
+ xpath = new LoadXpath();
+ return xpath;
+ }
+
+ public LinkedList<String> getXpathList(String fileName) throws IOException {
+ URL url = ClassLoader.getSystemResource(fileName);
+ if (url != null && xpathList == null)
+ return convertStreamToString(url.openStream());
+ return xpathList;
+ }
+
+ private LinkedList<String> convertStreamToString(InputStream is) throws IOException {
+ if (is != null) {
+ xpathList = new LinkedList<String>();
+ String line;
+
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ while ((line = reader.readLine()) != null) {
+ xpathList.add(line);
+ }
+ } finally {
+ is.close();
+ }
+ return xpathList;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
new file mode 100644
index 0000000..e0162e7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/BrokerUtilTest.java
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axiom.om.OMNamespace;
+import org.apache.axis2.AxisFault;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BrokerUtilTest extends TestCase {
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ /**
+ * Test method for
+ * {@link org.apache.airavata.wsmg.util.BrokerUtil#sameStringValue(java.lang.String, java.lang.String)} .
+ */
+ @Test
+ public void testSameStringValue() {
+
+ assertTrue(BrokerUtil.sameStringValue(null, null));
+ assertTrue(BrokerUtil.sameStringValue("test", "test"));
+
+ assertFalse(BrokerUtil.sameStringValue("one", "two"));
+ assertFalse(BrokerUtil.sameStringValue(null, "test"));
+ assertFalse(BrokerUtil.sameStringValue("test", null));
+
+ }
+
+ /**
+ * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getTopicLocalString(java.lang.String)}.
+ */
+ @Test
+ public void testGetTopicLocalString() {
+
+ assertEquals("localstring", (BrokerUtil.getTopicLocalString("prefix:localstring")));
+
+ assertEquals("localstring", BrokerUtil.getTopicLocalString("localstring"));
+
+ }
+
+ /**
+ * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getXPathString(org.apache.axiom.om.OMElement)}.
+ */
+ @Test
+ public void testGetXPathString() {
+
+ OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ try {
+
+ BrokerUtil.getXPathString(null);
+ fail("method should validate invalid arguments");
+ } catch (IllegalArgumentException e) {
+
+ } catch (AxisFault e) {
+ fail("invalid exception thrown");
+ }
+
+ try {
+
+ QName invalidQName = new QName("invalidURI", "invalidLocalName");
+
+ OMElement xpathEl = factory.createOMElement(invalidQName);
+
+ BrokerUtil.getXPathString(xpathEl);
+
+ fail("method should validate arguments");
+
+ } catch (AxisFault fault) {
+
+ }
+
+ try {
+
+ String xpathExpression = "testXpathExpression";
+ String dialect = "unknownXpathDialect";
+
+ OMNamespace ns = factory.createOMNamespace("unit_test", "jnt");
+
+ OMElement xpathEl = factory.createOMElement("TestXpath", ns);
+ xpathEl.addAttribute("Dialect", dialect, null);
+
+ xpathEl.setText(xpathExpression);
+
+ BrokerUtil.getXPathString(xpathEl);
+
+ fail("method should reject unknown dialect");
+ } catch (AxisFault e) {
+
+ }
+
+ try {
+
+ String xpathExpression = "textXpathExpression";
+ String dialect = WsmgCommonConstants.XPATH_DIALECT;
+
+ OMNamespace ns = factory.createOMNamespace("unit_test", "jnt");
+
+ OMElement xpathEl = factory.createOMElement("TestXpath", ns);
+ xpathEl.addAttribute("Dialect", dialect, null);
+
+ xpathEl.setText(xpathExpression);
+ assertEquals(xpathExpression, BrokerUtil.getXPathString(xpathEl));
+
+ } catch (AxisFault e) {
+ fail("unable to extract xpath query: " + e.toString());
+ }
+
+ }
+
+ /**
+ * Test method for {@link org.apache.airavata.wsmg.util.BrokerUtil#getTopicFromRequestPath(java.lang.String)}.
+ */
+ @Test
+ public void testGetTopicFromRequestPath() {
+
+ assertNull(BrokerUtil.getTopicFromRequestPath(null));
+ assertNull(BrokerUtil.getTopicFromRequestPath(""));
+ assertNull(BrokerUtil.getTopicFromRequestPath("/"));
+ assertNull(BrokerUtil.getTopicFromRequestPath("/subscribe/url/"));
+ assertNull(BrokerUtil.getTopicFromRequestPath("/subscribe/url/topic/"));
+
+ assertEquals(BrokerUtil.getTopicFromRequestPath("/requestpath/topic/xyz"), "xyz");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
new file mode 100644
index 0000000..0624442
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/util/ConfigKeys.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public interface ConfigKeys {
+
+ String CONFIG_FILE_NAME = "unit_tests.properties";
+
+ String CONSUMER_EPR = "consumer.location";
+ String CONSUMER_PORT = "consumer.port";
+ String TOPIC_SIMPLE = "topic.simple";
+ String TOPIC_XPATH = "topic.xpath";
+ String AXIS2_REPO = "axis2.repo";
+
+}