You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/14 20:31:25 UTC
[83/90] [abbrv] AIRAVATA-1124
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
new file mode 100644
index 0000000..072cbb3
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/SenderUtils.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger;
+
+import java.io.StringReader;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.config.WSMGParameter;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMFactory;
+import org.apache.axis2.addressing.EndpointReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * this class is not thread safe
+ * */
+public class SenderUtils implements Deliverable {
+
+ private static final Logger logger = LoggerFactory.getLogger(SenderUtils.class);
+
+ private static OMFactory factory = OMAbstractFactory.getOMFactory();
+
+ private ConsumerUrlManager urlManager;
+
+ private DeliveryProtocol protocol;
+
+ public SenderUtils(ConsumerUrlManager urlMan) {
+ urlManager = urlMan;
+ }
+
+ public void setProtocol(DeliveryProtocol protocol) {
+ this.protocol = protocol;
+ }
+
+ public void send(ConsumerInfo consumerInfo, OMElement notificationMessageBodyEl,
+ AdditionalMessageContent additionalMessageContent) {
+
+ if (consumerInfo.isPaused()) {
+ return;
+ }
+
+ if (notificationMessageBodyEl == null) {
+ logger.info("notification message is null, IGNORED");
+ return;
+ }
+
+ if (urlManager.isUnavailable(consumerInfo.getConsumerEprStr())) {
+ logger.info("consumer url is unavailable: " + consumerInfo.getConsumerEprStr());
+ return;
+ }
+
+ EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+ /*
+ * Extract message
+ */
+ OMElement message = null;
+ if (consumerInfo.getType().compareTo("wsnt") == 0) {
+ if (consumerInfo.isUseNotify()) {
+ message = wrapRawMessageToWsntWrappedFormat(notificationMessageBodyEl, additionalMessageContent);
+ } else {
+ message = notificationMessageBodyEl;
+ }
+ } else { // wse
+ message = notificationMessageBodyEl;
+ }
+
+ long timeElapsed = -1;
+ long startTime = -1;
+
+ startTime = System.currentTimeMillis();
+
+ try {
+
+ /*
+ * sending message out
+ */
+ protocol.deliver(consumerInfo, message, additionalMessageContent);
+
+ long finishTime = System.currentTimeMillis();
+ timeElapsed = finishTime - startTime;
+ if (WSMGParameter.showTrackId)
+ logger.info(String.format("track id = %s : delivered to: %s in %d ms",
+ additionalMessageContent.getTrackId(), consumerReference.getAddress(), timeElapsed));
+
+ urlManager.onSucessfullDelivery(consumerReference, timeElapsed);
+ } catch (SendingException ex) {
+
+ long finishTime = System.currentTimeMillis();
+ timeElapsed = finishTime - startTime;
+
+ urlManager.onFailedDelivery(consumerReference, finishTime, timeElapsed, ex, additionalMessageContent);
+
+ }
+ }
+
+ public OMElement wrapRawMessageToWsntWrappedFormat(OMElement rawNotif,
+ AdditionalMessageContent additionalMessageContent) {
+
+ OMElement fullNotif = factory.createOMElement("Notify", NameSpaceConstants.WSNT_NS);
+
+ OMElement notificationMessageEl = factory.createOMElement("NotificationMessage", NameSpaceConstants.WSNT_NS,
+ fullNotif);
+
+ String topicElString = additionalMessageContent.getTopicElement();
+ if (topicElString != null) {
+ OMElement topicEl = null;
+ try {
+ topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+ } catch (XMLStreamException e) {
+ logger.error("XMLStreamreader exception when setting topicEl", e);
+ }
+ notificationMessageEl.addChild(topicEl);
+ }
+ String producerReferenceElString = additionalMessageContent.getProducerReference();
+ if (producerReferenceElString != null) {
+ OMElement producerReferenceEl = null;
+ try {
+ producerReferenceEl = CommonRoutines.reader2OMElement(new StringReader(producerReferenceElString));
+ } catch (XMLStreamException e) {
+ logger.error("XMLStreamException at creating producerReferenceEl", e);
+ }
+ notificationMessageEl.addChild(producerReferenceEl);
+ }
+
+ OMElement messageEl = factory.createOMElement("Message", NameSpaceConstants.WSNT_NS, notificationMessageEl);
+ messageEl.addChild(rawNotif);
+
+ return fullNotif;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
new file mode 100644
index 0000000..417cef7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/DeliveryProtocol.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.axiom.om.OMElement;
+
+public interface DeliveryProtocol {
+
+ public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+ throws SendingException;
+
+ public void setTimeout(long timeout);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
new file mode 100644
index 0000000..c4dd24a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/SendingException.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol;
+
+import org.apache.axis2.AxisFault;
+
+public class SendingException extends AxisFault {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 6250791562500752579L;
+
+ public SendingException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
new file mode 100644
index 0000000..7e2568a
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/protocol/impl/Axis2Protocol.java
@@ -0,0 +1,156 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.protocol.impl;
+
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
+import org.apache.airavata.wsmg.messenger.protocol.SendingException;
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.util.ElementHelper;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axiom.soap.SOAPHeaderBlock;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.client.Options;
+import org.apache.axis2.client.ServiceClient;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Axis2Protocol implements DeliveryProtocol {
+
+ private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class);
+
+ private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
+
+ private ServiceClient nonThreadLocalServiceClient;
+
+ long tcpConnectionTimeout;
+
+ public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
+ throws SendingException {
+ EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());
+
+ /*
+ * Extract information
+ */
+ String actionString = null;
+ List<OMElement> soapHeaders = new LinkedList<OMElement>();
+ if (consumerInfo.getType().compareTo("wsnt") == 0) {
+ actionString = NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify";
+ } else { // wse
+ actionString = additionalMessageContent.getAction();
+ String topicElString = additionalMessageContent.getTopicElement();
+ if (topicElString != null) {
+ OMElement topicEl = null;
+ try {
+ topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
+ soapHeaders.add(topicEl);
+ } catch (XMLStreamException e) {
+ logger.error("exception at topicEl xmlStreamException", e);
+ }
+ }
+ }
+
+ ServiceClient client = null;
+ try {
+
+ client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(),
+ soapHeaders);
+
+ client.sendRobust(message);
+
+ } catch (AxisFault ex) {
+ throw new SendingException(ex.getCause());
+ } finally {
+ if (client != null) {
+ try {
+ client.cleanup();
+ client.cleanupTransport();
+ } catch (AxisFault ex) {
+ logger.error(ex.getMessage(), ex);
+ }
+ }
+ }
+ }
+
+ public void setTimeout(long timeout) {
+ this.tcpConnectionTimeout = timeout;
+ }
+
+ private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
+ List<OMElement> soapHeaders) throws AxisFault {
+
+ // not engaging addressing modules
+ ServiceClient client = getServiceClient();
+
+ SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
+ msgIdEl.setText(msgId);
+ SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
+ actionEl.setText(action);
+
+ SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
+ to.setText(consumerLocation.getAddress());
+
+ client.addHeader(actionEl);
+ client.addHeader(msgIdEl);
+ client.addHeader(to);
+
+ for (OMElement omHeader : soapHeaders) {
+ try {
+ SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
+ client.addHeader(headerBlock);
+ } catch (Exception e) {
+ throw AxisFault.makeFault(e);
+ }
+ }
+
+ Options opts = new Options();
+ opts.setTimeOutInMilliSeconds(tcpConnectionTimeout);
+ opts.setMessageId(msgId);
+ opts.setTo(consumerLocation);
+ opts.setAction(action);
+ opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
+ opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10);
+ client.setOptions(opts);
+
+ return client;
+ }
+
+ private ServiceClient getServiceClient() throws AxisFault {
+ if (nonThreadLocalServiceClient == null) {
+ nonThreadLocalServiceClient = new ServiceClient();
+ }
+ nonThreadLocalServiceClient.removeHeaders();
+ return nonThreadLocalServiceClient;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
new file mode 100644
index 0000000..9eb50cc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/SendingStrategy.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy;
+
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+
+public interface SendingStrategy {
+ void init();
+
+ void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable);
+
+ void shutdown();
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
new file mode 100644
index 0000000..5236f47
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ConsumerHandler.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class ConsumerHandler implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+
+ protected LinkedBlockingQueue<LightweightMsg> queue = new LinkedBlockingQueue<LightweightMsg>();
+
+ private String consumerUrl;
+
+ private Deliverable deliverable;
+
+ public ConsumerHandler(String url, Deliverable deliverable) {
+ this.consumerUrl = url;
+ this.deliverable = deliverable;
+ }
+
+ public String getConsumerUrl() {
+ return consumerUrl;
+ }
+
+ public void submitMessage(LightweightMsg msg) {
+ try {
+ queue.put(msg);
+ } catch (InterruptedException e) {
+ log.error("Interrupted when trying to add message");
+ }
+ }
+
+ protected void send(List<LightweightMsg> list) {
+ for (LightweightMsg m : list) {
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(m.getPayLoad()));
+ deliverable.send(m.getConsumerInfo(), messgae2Send, m.getHeader());
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
new file mode 100644
index 0000000..7d21fdb
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/FixedParallelSender.java
@@ -0,0 +1,185 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FixedParallelSender implements SendingStrategy {
+
+ private static final Logger log = LoggerFactory.getLogger(FixedParallelSender.class);
+
+ private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30;
+
+ private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>();
+ private HashMap<String, Boolean> submittedConsumerHandlers = new HashMap<String, Boolean>();
+
+ private int batchSize;
+
+ private ExecutorService threadPool;
+
+ private boolean stop;
+
+ private Thread t;
+
+ public FixedParallelSender(int poolsize, int batchsize) {
+ this.threadPool = Executors.newFixedThreadPool(poolsize);
+ this.batchSize = batchsize;
+ }
+
+ public void init() {
+ this.t = new Thread(new ChooseHandlerToSubmit());
+ this.t.start();
+ }
+
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
+ for (ConsumerInfo consumer : consumerInfoList) {
+ sendToConsumerHandler(consumer, outMessage, deliverable);
+ }
+ }
+
+ public void shutdown() {
+ log.debug("Shutting down");
+ this.stop = true;
+
+ try {
+ this.t.join();
+ } catch (InterruptedException ie) {
+ log.error("Wait for ChooseHandlerToSubmit thread to finish (join) is interrupted");
+ }
+
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ log.error("Interrupted while waiting thread pool to shutdown");
+ }
+
+ log.debug("Shut down");
+ }
+
+ private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
+
+ String consumerUrl = consumer.getConsumerEprStr();
+
+ LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+ message.getAdditionalMessageContent());
+
+ synchronized (activeConsumerHandlers) {
+ ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl);
+ if (handler == null) {
+ handler = new FixedParallelConsumerHandler(consumerUrl, deliverable);
+ activeConsumerHandlers.put(consumerUrl, handler);
+ submittedConsumerHandlers.put(consumerUrl, Boolean.FALSE);
+ }
+ handler.submitMessage(lwm);
+ }
+ }
+
+ class ChooseHandlerToSubmit implements Runnable {
+ private static final int SLEEP_TIME_SECONDS = 1;
+
+ public void run() {
+ /*
+ * If stop is true, we will not get any message to send from addMessageToSend() method. So,
+ * activeConsumerHandlers size will not increase but decrease only. When shutdown() is invoked, we will have
+ * to send out all messages in our queue.
+ */
+ while (!stop || activeConsumerHandlers.size() > 0) {
+
+ synchronized (activeConsumerHandlers) {
+ Iterator<String> it = activeConsumerHandlers.keySet().iterator();
+ while (it.hasNext()) {
+ String key = it.next();
+ boolean submitted = submittedConsumerHandlers.get(key);
+
+ /*
+ * If consumer handlers is not scheduled to run, submit it to thread pool.
+ */
+ if (!submitted) {
+ threadPool.submit(activeConsumerHandlers.get(key));
+ submittedConsumerHandlers.put(key, Boolean.TRUE);
+ }
+ }
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+ } catch (InterruptedException e) {
+ log.error("interrupted while waiting", e);
+ }
+ }
+ }
+ }
+
+ class FixedParallelConsumerHandler extends ConsumerHandler {
+
+ public FixedParallelConsumerHandler(String url, Deliverable deliverable) {
+ super(url, deliverable);
+ }
+
+ public void run() {
+
+ log.debug(String.format("FixedParallelConsumerHandler starting: %s", getConsumerUrl()));
+
+ ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
+
+ queue.drainTo(localList, batchSize);
+
+ send(localList);
+ localList.clear();
+
+ /*
+ * Remove handler if and only if there is no message
+ */
+ synchronized (activeConsumerHandlers) {
+
+ /*
+ * all message is sent or not, we will set it as not submitted. So, it can be put back to thread pool.
+ */
+ submittedConsumerHandlers.put(getConsumerUrl(), Boolean.FALSE);
+
+ if (queue.size() == 0) {
+ submittedConsumerHandlers.remove(getConsumerUrl());
+ activeConsumerHandlers.remove(getConsumerUrl());
+
+ log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl()));
+ }
+ }
+
+ log.debug(String.format("FixedParallelConsumerHandler done: %s,", getConsumerUrl()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
new file mode 100644
index 0000000..ca56c58
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/LightweightMsg.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+
+class LightweightMsg {
+ private ConsumerInfo consumerInfo;
+ private String payload;
+ private AdditionalMessageContent header;
+
+ public LightweightMsg(ConsumerInfo c, String pld, AdditionalMessageContent h) {
+ consumerInfo = c;
+ payload = pld;
+ header = h;
+ }
+
+ public String getPayLoad() {
+ return payload;
+ }
+
+ public ConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public AdditionalMessageContent getHeader() {
+ return header;
+ }
+
+ public String toString() {
+ return String.format("header: %s, consumer: %s, pld: %s", header, consumerInfo.getConsumerEprStr(), payload);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
new file mode 100644
index 0000000..cede65d
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/ParallelSender.java
@@ -0,0 +1,155 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Each subscriber (URL Endpoint) will have its own thread to send a message to
+ *
+ */
+public class ParallelSender implements SendingStrategy {
+
+ private static final Logger log = LoggerFactory.getLogger(ParallelSender.class);
+
+ private static final long TIME_TO_WAIT_FOR_SHUTDOWN_SECOND = 30;
+
+ private HashMap<String, ConsumerHandler> activeConsumerHandlers = new HashMap<String, ConsumerHandler>();
+
+ private ExecutorService threadPool;
+
+ public void init() {
+ this.threadPool = Executors.newCachedThreadPool();
+ }
+
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ List<ConsumerInfo> consumerInfoList = outMessage.getConsumerInfoList();
+ for (ConsumerInfo consumer : consumerInfoList) {
+ sendToConsumerHandler(consumer, outMessage, deliverable);
+ }
+ }
+
+ public void shutdown() {
+ log.debug("Shutting down");
+
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(TIME_TO_WAIT_FOR_SHUTDOWN_SECOND, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ log.error("Interrupted while waiting thread pool to shutdown");
+ }
+ log.debug("Shut down");
+ }
+
+ private void sendToConsumerHandler(ConsumerInfo consumer, OutGoingMessage message, Deliverable deliverable) {
+ String consumerUrl = consumer.getConsumerEprStr();
+
+ LightweightMsg lwm = new LightweightMsg(consumer, message.getTextMessage(),
+ message.getAdditionalMessageContent());
+
+ synchronized (activeConsumerHandlers) {
+ ConsumerHandler handler = activeConsumerHandlers.get(consumerUrl);
+ if (handler == null) {
+ handler = new ParallelConsumerHandler(consumerUrl, deliverable);
+ activeConsumerHandlers.put(consumerUrl, handler);
+ handler.submitMessage(lwm);
+ threadPool.submit(handler);
+ } else {
+ handler.submitMessage(lwm);
+ }
+ }
+ }
+
+ class ParallelConsumerHandler extends ConsumerHandler {
+
+ private static final int MAX_UNSUCCESSFUL_DRAINS = 3;
+ private static final int SLEEP_TIME_SECONDS = 1;
+ private int numberOfUnsuccessfulDrain = 0;
+
+ public ParallelConsumerHandler(String url, Deliverable deliverable) {
+ super(url, deliverable);
+ }
+
+ public void run() {
+ log.debug(String.format("ParallelConsumerHandler starting: %s", getConsumerUrl()));
+
+ ArrayList<LightweightMsg> localList = new ArrayList<LightweightMsg>();
+ while (true) {
+
+ /*
+ * Try to find more message to send out
+ */
+ if (queue.drainTo(localList) <= 0) {
+ numberOfUnsuccessfulDrain++;
+ } else {
+ numberOfUnsuccessfulDrain = 0;
+ }
+
+ /*
+ * No new message for sometimes
+ */
+ if (numberOfUnsuccessfulDrain >= MAX_UNSUCCESSFUL_DRAINS) {
+ /*
+ * Stop this thread if and only if there is no message
+ */
+ synchronized (activeConsumerHandlers) {
+ if (queue.size() == 0) {
+ if (activeConsumerHandlers.remove(getConsumerUrl()) != null) {
+ log.debug(String.format("Consumer handler is already removed: %s", getConsumerUrl()));
+ }
+ log.debug(String.format("ParallelConsumerHandler done: %s,", getConsumerUrl()));
+ break;
+ }
+ }
+ }
+
+ send(localList);
+ localList.clear();
+
+ if (numberOfUnsuccessfulDrain > 0) {
+ waitForMessages();
+ }
+ }
+ }
+
+ private void waitForMessages() {
+ try {
+ TimeUnit.SECONDS.sleep(SLEEP_TIME_SECONDS);
+ log.debug("finished - waiting for messages");
+ } catch (InterruptedException e) {
+ log.error("interrupted while waiting for messages", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
new file mode 100644
index 0000000..380e559
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/messenger/strategy/impl/SerialSender.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.messenger.strategy.impl;
+
+import java.io.StringReader;
+import java.util.List;
+
+import javax.xml.stream.XMLStreamException;
+
+import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
+import org.apache.airavata.wsmg.broker.ConsumerInfo;
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.OutGoingMessage;
+import org.apache.airavata.wsmg.messenger.Deliverable;
+import org.apache.airavata.wsmg.messenger.strategy.SendingStrategy;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SerialSender implements SendingStrategy {
+
+ private static final Logger log = LoggerFactory.getLogger(SerialSender.class);
+
+ public void init() {
+ }
+
+ public void shutdown() {
+ }
+
+ public void addMessageToSend(OutGoingMessage outMessage, Deliverable deliverable) {
+ sendNotification(outMessage, deliverable);
+ }
+
+ public void sendNotification(OutGoingMessage outGoingMessage, Deliverable deliverable) {
+
+ if (outGoingMessage == null) {
+ log.error("Got a null outgoing message");
+ return;
+ }
+ String messageString = outGoingMessage.getTextMessage();
+
+ List<ConsumerInfo> consumerInfoList = outGoingMessage.getConsumerInfoList();
+ AdditionalMessageContent soapHeader = outGoingMessage.getAdditionalMessageContent();
+
+ try {
+ OMElement messgae2Send = CommonRoutines.reader2OMElement(new StringReader(messageString));
+
+ for (ConsumerInfo obj : consumerInfoList) {
+ deliverable.send(obj, messgae2Send, soapHeader);
+ }
+
+ } catch (XMLStreamException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
new file mode 100644
index 0000000..6400f63
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/BrokerUtil.java
@@ -0,0 +1,121 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import javax.xml.namespace.QName;
+
+import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.AxisFault;
+
+public class BrokerUtil {
+
+ /**
+ * Compares String {@code x} with String {@code y}. The result is {@code true} if and only if both arguments are
+ * {@code null} or String {@code x} has the same sequence of characters as String {@code y}.
+ *
+ * @param x
+ * @param y
+ * @return {@code true} if the String {@code x} and String {@code y} are equivalent, {@code false} otherwise
+ */
+ public static boolean sameStringValue(String x, String y) {
+ return (x == null && y == null) || (x != null && y != null && x.equals(y));
+ }
+
+ public static String getTopicLocalString(String filterText) {
+
+ if (filterText == null)
+ throw new IllegalArgumentException("filter text can't be null");
+
+ String localName = null;
+
+ int pos = filterText.indexOf(':');
+
+ if (pos != -1) {
+ localName = filterText.substring(pos + 1);
+
+ } else {
+
+ localName = filterText;
+ }
+
+ return localName;
+ }
+
+ /**
+ *
+ * @return localString
+ * @throws AxisFault
+ */
+ public static String getXPathString(OMElement xpathEl) throws AxisFault {
+
+ if (xpathEl == null) {
+ throw new IllegalArgumentException("xpath element can't be null");
+ }
+
+ OMAttribute dialectAttribute = xpathEl.getAttribute(new QName("Dialect"));
+
+ if (dialectAttribute == null) {
+ dialectAttribute = xpathEl.getAttribute(new QName("DIALECT"));
+
+ }
+ if (dialectAttribute == null) {
+ throw new AxisFault("dialect is required for subscribe");
+ }
+ String dialectString = dialectAttribute.getAttributeValue();
+ if (!dialectString.equals(WsmgCommonConstants.XPATH_DIALECT)) {
+ // System.out.println("***Unkown dialect: " + dialectString);
+ throw new AxisFault("Unkown dialect: " + dialectString);
+ }
+ String xpathLocalString = xpathEl.getText();
+ return xpathLocalString;
+ }
+
+ public static String getTopicFromRequestPath(String topicPath) {
+ if (topicPath == null)
+ return null;
+ if (topicPath.length() == 0)
+ return null;
+ if (topicPath.startsWith("/")) {
+ topicPath = topicPath.substring(1);
+ if (topicPath.length() == 0)
+ return null;
+ }
+
+ String ret = null;
+
+ int index = topicPath.indexOf(WsmgCommonConstants.TOPIC_PREFIX);
+ if (index >= 0) {
+
+ ret = topicPath.substring(index + WsmgCommonConstants.TOPIC_PREFIX.length());
+
+ if (ret.length() == 0) {
+ ret = null;
+ }
+
+ }
+
+ return ret;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
new file mode 100644
index 0000000..bcd00a4
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/Counter.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+//Used for stress test. use together with TimerThread
+public class Counter {
+
+ private AtomicLong counter = new AtomicLong(0);
+
+ private AtomicReference<String> otherStringValue = new AtomicReference<String>();
+
+ public void addCounter() {
+ counter.getAndIncrement();
+
+ }
+
+ public synchronized void addCounter(String otherValue) {
+ counter.getAndIncrement();
+ otherStringValue.set(otherValue);
+ }
+
+ /**
+ * @return Returns the counterValue.
+ */
+ public long getCounterValue() {
+
+ return counter.get();
+ }
+
+ /**
+ * @param counterValue
+ * The counterValue to set.
+ */
+ public void setCounterValue(long counterValue) {
+ counter.set(counterValue);
+
+ }
+
+ /**
+ * @return Returns the otherValueString.
+ */
+ public String getOtherValueString() {
+
+ return otherStringValue.get();
+ }
+
+ /**
+ * @param otherValueString
+ * The otherValueString to set.
+ */
+ public void setOtherValueString(String otherValueString) {
+ otherStringValue.set(otherValueString);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
new file mode 100644
index 0000000..a2c0bfc
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/RunTimeStatistics.java
@@ -0,0 +1,157 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeSet;
+
+import org.apache.airavata.wsmg.commons.CommonRoutines;
+import org.apache.airavata.wsmg.commons.WsmgVersion;
+
+public class RunTimeStatistics {
+ public static long totalMessageSize = 0;
+ public static long totalReceivedNotification = 0;
+ public static long totalSentOutNotification = 0;
+ public static long totalFailedNotification = 0;
+ public static long totalSubscriptions = 0;
+ public static long totalSubscriptionsAtStartUp = 0;
+ public static long totalUnSubscriptions = 0;
+ public static long minMessageSize = Long.MAX_VALUE;
+ public static long maxMessageSize = 0;
+ public static String startUpTime = "";
+ public static long totalSuccessfulDeliveryTime = 0;
+ public static long totalFailedDeliveryTime = 0;
+ public static long minSuccessfulDeliveryTime = Long.MAX_VALUE;
+ public static long maxSuccessfulDeliveryTime = 0;
+ public static long minFailedDeliveryTime = Long.MAX_VALUE;
+ public static long maxFailedDeliveryTime = 0;
+ public static final HashMap<String, Integer> failConsumerList = new HashMap<String, Integer>();
+
+ // public static TreeSet currentBlackList=new TreeSet();
+ // public static TreeSet previousBlackList=new TreeSet();
+
+ private static long startUpTimeInMillis;
+
+ public static synchronized void addNewNotificationMessageSize(int size) {
+ if (size < minMessageSize) {
+ minMessageSize = size;
+ }
+ if (size > maxMessageSize) {
+ maxMessageSize = size;
+ }
+ totalMessageSize += size;
+ totalReceivedNotification++;
+ }
+
+ public static synchronized void addNewSuccessfulDeliverTime(long deliveryTime) {
+ if (deliveryTime < minSuccessfulDeliveryTime) {
+ minSuccessfulDeliveryTime = deliveryTime;
+ }
+ if (deliveryTime > maxSuccessfulDeliveryTime) {
+ maxSuccessfulDeliveryTime = deliveryTime;
+ }
+ totalSuccessfulDeliveryTime += deliveryTime;
+ totalSentOutNotification++;
+ }
+
+ public static synchronized void addNewFailedDeliverTime(long deliveryTime) {
+ if (deliveryTime < minFailedDeliveryTime) {
+ minFailedDeliveryTime = deliveryTime;
+ }
+ if (deliveryTime > maxFailedDeliveryTime) {
+ maxFailedDeliveryTime = deliveryTime;
+ }
+ totalFailedDeliveryTime += deliveryTime;
+ totalFailedNotification++;
+ }
+
+ public static synchronized void addFailedConsumerURL(String url) {
+ Integer previousCount = failConsumerList.get(url);
+ if (previousCount == null) {
+ failConsumerList.put(url, 1);
+ } else {
+ previousCount++;
+ failConsumerList.put(url, previousCount);
+ }
+ }
+
+ public static void setStartUpTime() {
+ Date currentDate = new Date(); // Current date
+ startUpTime = CommonRoutines.getXsdDateTime(currentDate);
+ startUpTimeInMillis = currentDate.getTime();
+ }
+
+ public static String getHtmlString() {
+ String htmlString = "";
+
+ htmlString += "<p>Total incoming message number: <span class=\"xml-requests-count\">"
+ + totalReceivedNotification + "</span><br />\n";
+ htmlString += "Total successful outgoing message number: " + totalSentOutNotification + "<br>\n";
+ htmlString += "Total unreachable outgoing message number: " + totalFailedNotification + "<br>\n";
+ htmlString += "Total subscriptions requested: " + totalSubscriptions + "(+" + totalSubscriptionsAtStartUp
+ + " startUp)<br>\n";
+ htmlString += "Total Unsubscriptions requested: " + totalUnSubscriptions + "<br>\n";
+ htmlString += "</p>\n";
+ int averageMessageSize = 0;
+ if (totalReceivedNotification != 0) {
+ averageMessageSize = (int) (totalMessageSize / totalReceivedNotification);
+ }
+ htmlString += "<p>Average message size: " + averageMessageSize + " bytes<br>\n";
+ htmlString += "Max message size: " + maxMessageSize + " bytes<br>\n";
+ htmlString += "Min message size: " + minMessageSize + " bytes<br>\n";
+ htmlString += "</p>\n";
+ long averageSuccessfulDeliveryTime = 0;
+ if (totalSuccessfulDeliveryTime != 0) {
+ averageSuccessfulDeliveryTime = (totalSuccessfulDeliveryTime / totalSentOutNotification);
+ }
+ htmlString += "<p>Average Successful Delivery Time: " + averageSuccessfulDeliveryTime + " ms<br>\n";
+ htmlString += "Max Successful Delivery Time: " + maxSuccessfulDeliveryTime + " ms<br>\n";
+ htmlString += "Min Successful Delivery Time: " + minSuccessfulDeliveryTime + " ms<br>\n";
+ htmlString += "</p>\n";
+ long averageFailedDeliveryTime = 0;
+ if (totalFailedDeliveryTime != 0) {
+ averageFailedDeliveryTime = (totalFailedDeliveryTime / totalFailedNotification);
+ }
+ htmlString += "<p>Average Unreachable Delivery Time: " + averageFailedDeliveryTime + " ms<br>\n";
+ htmlString += "Max Unreachable Delivery Time: " + maxFailedDeliveryTime + " ms<br>\n";
+ htmlString += "Min Unreachable Delivery Time: " + minFailedDeliveryTime + " ms<br>\n";
+ htmlString += "</p>\n";
+ htmlString += "<p>Service started at: " + startUpTime + " <span class=\"starttime-seconds\">"
+ + startUpTimeInMillis + "</span> [seconds] since UNIX epoch)" + "<br />\n";
+
+ htmlString += "Version: <span class=\"service-name\">" + WsmgVersion.getImplementationVersion()
+ + "</span></p>\n";
+
+ htmlString += "<p>Total unreachable consumerUrl: " + failConsumerList.size() + " <br>\n";
+ TreeSet<String> consumerUrlList = new TreeSet<String>(failConsumerList.keySet());
+ Iterator<String> iter = consumerUrlList.iterator();
+ while (iter.hasNext()) {
+ String url = iter.next();
+ int failedCount = failConsumerList.get(url);
+ htmlString += " " + url + " -->" + failedCount + " <br>\n";
+ }
+ htmlString += "</p>\n";
+ return htmlString;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
new file mode 100644
index 0000000..77f7c57
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/TimerThread.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public class TimerThread implements Runnable {
+ Counter counter;
+
+ long counterValue = 0;
+
+ long seqNum = 0;
+
+ String comment = "";
+
+ public TimerThread(Counter counter, String comment) {
+ this.counter = counter;
+ this.comment = comment;
+ }
+
+ public void run() {
+ long currentTime = 0;
+ long interval = 1000;
+ long lastCounter = 0;
+ long idleCount = 0;
+ // wait for about 5 sec and start from 000 time so that other thread can
+ // start together
+ currentTime = System.currentTimeMillis();
+ long launchTime = ((currentTime + 2000) / 1000) * 1000;
+ long sleepTime = launchTime - currentTime;
+ System.out.println("launchTime=" + launchTime + " SleepTime=" + sleepTime);
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ while (true) {
+ currentTime = System.currentTimeMillis();
+ counterValue = counter.getCounterValue();
+ long receivedCount = counterValue - lastCounter;
+ lastCounter = counterValue;
+ if (receivedCount == 0) {
+ idleCount++;
+ } else {
+ idleCount = 0;
+ }
+ if (receivedCount > 0 || (receivedCount == 0 && idleCount < 3)) {
+ // System.out.println("time="+currentTime+" counter="+
+ // counter.getCounterValue()+"
+ // received="+receivedCount+comment);
+ System.out.println(seqNum + " " + counter.getCounterValue() + " " + receivedCount + comment
+ + counter.getOtherValueString());
+ }
+ seqNum++;
+ launchTime = launchTime + interval;
+ sleepTime = launchTime - currentTime;
+ // System.out.println("launchTime="+launchTime+"
+ // SleepTime="+sleepTime);
+ if (sleepTime < 0)
+ sleepTime = 0;
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
new file mode 100644
index 0000000..931a5bf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsEventingOperations.java
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsEventingOperations {
+
+ RENEW("renew"), PUBLISH("publish"), GET_STATUS("getStatus"), SUBSCRIPTION_END("subscriptionEnd"), SUBSCRIBE(
+ "subscribe"), UNSUBSCRIBE("unsubscribe");
+
+ private final String name;
+
+ private WsEventingOperations(String n) {
+ name = n;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public boolean equals(String s) {
+ return name.equals(s);
+ }
+
+ public static WsEventingOperations valueFrom(String s) {
+ for (WsEventingOperations status : WsEventingOperations.values()) {
+ if (status.toString().equalsIgnoreCase(s)) {
+ return status;
+ }
+
+ }
+
+ throw new RuntimeException("invalid WsEventingOperation:- " + s);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
new file mode 100644
index 0000000..c771134
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/util/WsNotificationOperations.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.util;
+
+public enum WsNotificationOperations {
+
+ NOTIFY("notify"), SUBSCRIBE("subscribe"), GET_CURRENT_MSG("getCurrentMessage"), PAUSE_REQUEST("gause"), RESUME_REQUEST(
+ "resume"), PAUSE_SUBSCRIPTION("pauseSubscription"), RESUME_SUBSCRIPTION("resumeSubscription"), REGISTER_PUBLISHER(
+ "registerPublisher"), UNSUBSCRIBE("unsubscribe");
+
+ private final String name;
+
+ private WsNotificationOperations(String n) {
+ name = n;
+ }
+
+ public String toString() {
+ return name;
+ }
+
+ public boolean equals(String s) {
+ return name.equals(s);
+ }
+
+ public static WsNotificationOperations valueFrom(String s) {
+ for (WsNotificationOperations status : WsNotificationOperations.values()) {
+ if (status.toString().equalsIgnoreCase(s)) {
+ return status;
+ }
+
+ }
+
+ throw new RuntimeException("invalid Ws notification Operation:- " + s);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
new file mode 100755
index 0000000..5663ebf
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/cleanDBScript.sql
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+delete from disQ;
+delete from MaxIDTable;
+delete from MinIDTable;
+delete from specialSubscription;
+delete from subscription;
+delete from msgbox;
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
new file mode 100644
index 0000000..80b51a4
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-derby.sql
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE TABLE SUBSCRIPTION (
+ SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '',
+ TOPICS VARCHAR(255) DEFAULT '',
+ XPATH VARCHAR(200) DEFAULT '',
+ CONSUMERADDRESS VARCHAR(255) DEFAULT '',
+ REFERENCEPROPERTIES BLOB,
+ CONTENT BLOB,
+ WSRM INTEGER NOT NULL DEFAULT 0 ,
+ CREATIONTIME TIMESTAMP NOT NULL
+ );
+CREATE TABLE SPECIALSUBSCRIPTION (
+ SUBSCRIPTIONID VARCHAR(200) NOT NULL DEFAULT '',
+ TOPICS VARCHAR(255) DEFAULT '',
+ XPATH VARCHAR(200) DEFAULT '',
+ CONSUMERADDRESS VARCHAR(255) DEFAULT '',
+ REFERENCEPROPERTIES BLOB,
+ CONTENT BLOB,
+ WSRM INTEGER NOT NULL DEFAULT 0,
+ CREATIONTIME TIMESTAMP NOT NULL
+ );
+
+
+CREATE TABLE DISQ (
+ ID BIGINT GENERATED BY DEFAULT AS IDENTITY,
+ TRACKID VARCHAR(100) DEFAULT NULL,
+ MESSAGE BLOB,
+ STATUS INTEGER DEFAULT NULL,
+ TOPIC VARCHAR(255) DEFAULT '',
+ PRIMARY KEY (ID)
+ );
+
+CREATE TABLE MAXIDTABLE(
+ MAXID INTEGER
+ );
+
+CREATE TABLE MINIDTABLE(
+ MINID INTEGER
+ );
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
new file mode 100755
index 0000000..cb506ef
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/database_scripts/msgBroker-mysql.sql
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+CREATE DATABASE IF NOT EXISTS wsmg;
+CREATE TABLE `subscription` (
+ `SubscriptionId` varchar(200) NOT NULL default '',
+ `Topics` varchar(255) default '',
+ `XPath` varchar(200) default '',
+ `ConsumerAddress` varchar(255) default '',
+ `ReferenceProperties` blob,
+ `content` blob,
+ `wsrm` tinyint(1) NOT NULL default '0',
+ `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'
+ );
+CREATE TABLE `specialSubscription` (
+ `SubscriptionId` varchar(200) NOT NULL default '',
+ `Topics` varchar(255) default '',
+ `XPath` varchar(200) default '',
+ `ConsumerAddress` varchar(255) default '',
+ `ReferenceProperties` blob,
+ `content` blob,
+ `wsrm` tinyint(1) NOT NULL default '0',
+ `CreationTime` datetime NOT NULL default '0000-00-00 00:00:00'
+ );
+
+
+CREATE TABLE `disQ` (
+ `id` bigint(11) NOT NULL auto_increment,
+ `trackId` varchar(100) default NULL,
+ `message` longblob,
+ `status` int(11) default NULL,
+ `topic` varchar(255) default '',
+ PRIMARY KEY (`id`)
+ );
+
+CREATE TABLE MaxIDTable(
+ maxID integer
+ );
+
+CREATE TABLE MinIDTable(
+ minID integer
+ );
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/main/resources/services.xml
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/main/resources/services.xml b/modules/ws-messenger/messagebroker/src/main/resources/services.xml
new file mode 100644
index 0000000..229262c
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/main/resources/services.xml
@@ -0,0 +1,125 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<serviceGroup>
+ <service name="EventingService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+ <operation name="renew">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+ <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Renew
+ </actionMapping>
+ <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/RenewResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="getStatus">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+ <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatus
+ </actionMapping>
+ <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/GetStatusResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="subscriptionEnd">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+ <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscriptionEnd
+ </actionMapping>
+ </operation>
+
+ <operation name="subscribe">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+ <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Subscribe
+ </actionMapping>
+ <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/SubscribeResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="unsubscribe">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingMsgReceiver" />
+ <actionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/Unsubscribe
+ </actionMapping>
+ <outputActionMapping>http://schemas.xmlsoap.org/ws/2004/08/eventing/UnsubscribeResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="publish">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wseventing.WSEventingPublishMsgReceiver" />
+ <actionMapping>http://org.apache.airavata/WseNotification
+ </actionMapping>
+ </operation>
+
+ </service>
+
+ <service name="NotificationService" class="org.apache.airavata.wsmg.broker.BrokerServiceLifeCycle">
+
+ <operation name="notify">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/Notify
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/NotifyResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="subscribe">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequest
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/SubscribeRequestResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="getCurrentMessage">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageRequest
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/GetCurrentMessageResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="pauseSubscription">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubsriptionRequest
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/PauseSubscriptionResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="resumeSubscription">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubsriptionRequest
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/ResumeSubscriptionResponse
+ </outputActionMapping>
+ </operation>
+
+ <operation name="unsubscribe">
+ <messageReceiver class="org.apache.airavata.wsmg.broker.wsnotification.WSNotificationMsgReceiver" />
+ <actionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubsribeRequest
+ </actionMapping>
+ <outputActionMapping>http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification/UnsubscribeResponse
+ </outputActionMapping>
+ </operation>
+
+ </service>
+
+ <parameter name="configuration.file.name" locked="false">airavata-server.properties</parameter>
+
+</serviceGroup>
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
new file mode 100644
index 0000000..4867ba7
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSETest.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.Test;
+
+public class BrokerWSETest extends TestCase implements ConsumerNotificationHandler {
+
+ private static int port = TestUtilServer.TESTING_PORT;
+ static Properties configs = new Properties();
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+ System.out.println("Received " + msgEnvelope);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ TestUtilServer.start(null, null);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ TestUtilServer.stop();
+ }
+
+ @Test
+ public void testRoundTrip() throws InterruptedException {
+
+ try {
+
+ String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";
+ long value = System.currentTimeMillis();
+ String msg = String.format("<msg> current time is : %d </msg>", value);
+
+ WseMsgBrokerClient wseMsgBrokerClient = new WseMsgBrokerClient();
+ wseMsgBrokerClient.init(brokerEPR);
+ int consumerPort = TestUtilServer.getAvailablePort();
+
+ String[] consumerEPRs = wseMsgBrokerClient.startConsumerService(consumerPort, this);
+
+ assertTrue(consumerEPRs.length > 0);
+
+ String topic = "WseRoundTripTestTopic";
+
+ String subscriptionID = wseMsgBrokerClient.subscribe(consumerEPRs[0], topic, null);
+ System.out.println("topic sub id = " + subscriptionID);
+
+ try {
+ wseMsgBrokerClient.publish(topic, msg);
+ wseMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>"));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ Thread.sleep(2000);
+
+ try {
+ wseMsgBrokerClient.unSubscribe(subscriptionID);
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ wseMsgBrokerClient.shutdownConsumerService();
+
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ try {
+ System.in.read();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+ fail("unexpected exception occured");
+ }
+ System.out.println("Broker roundtrip done");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/9c47eec8/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
new file mode 100644
index 0000000..baebd31
--- /dev/null
+++ b/modules/ws-messenger/messagebroker/src/test/java/org/apache/airavata/wsmg/broker/BrokerWSNTTest.java
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
+import org.apache.airavata.wsmg.client.WsntMsgBrokerClient;
+import org.apache.airavata.wsmg.util.TestUtilServer;
+import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axis2.AxisFault;
+import org.junit.Test;
+
+public class BrokerWSNTTest extends TestCase implements ConsumerNotificationHandler {
+
+ static Properties configs = new Properties();
+
+ public void handleNotification(SOAPEnvelope msgEnvelope) {
+ System.out.println("Received " + msgEnvelope);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ TestUtilServer.start(null, null);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ TestUtilServer.stop();
+ }
+
+ @Test
+ public void testRoundTrip() throws InterruptedException {
+
+ try {
+ long value = System.currentTimeMillis();
+ String msg = String.format("<msg> current time is : %d </msg>", value);
+
+ WsntMsgBrokerClient wsntMsgBrokerClient = new WsntMsgBrokerClient();
+
+ int consumerPort = 6767;
+
+ String brokerEPR = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/NotificationService";
+ wsntMsgBrokerClient.init(brokerEPR);
+ String[] consumerEPRs = wsntMsgBrokerClient.startConsumerService(consumerPort, this);
+
+ assertTrue(consumerEPRs.length > 0);
+
+ String topic = "/WsntRoundTripTestTopic";
+
+ String topicSubscriptionID = wsntMsgBrokerClient.subscribe(consumerEPRs[0], topic, null);
+ System.out.println("topic subscription id: " + topicSubscriptionID);
+
+ try {
+ wsntMsgBrokerClient.publish(topic, msg);
+ wsntMsgBrokerClient.publish(topic, AXIOMUtil.stringToOM("<foo><bar>Test</bar></foo>"));
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ Thread.sleep(2000);
+
+ try {
+ wsntMsgBrokerClient.unSubscribe(topicSubscriptionID);
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ wsntMsgBrokerClient.shutdownConsumerService();
+
+ } catch (AxisFault e) {
+ e.printStackTrace();
+ try {
+ System.in.read();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+
+ fail("unexpected exception occured");
+ }
+ System.out.println("Broker roundtrip done");
+
+ }
+}