You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/01/27 16:27:05 UTC
[11/13] airavata git commit: retiring ws-messenger and remove
dependency of workflow tracking - AIRAVATA-1556, AIRAVATA-1557
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/WsrfResourceStub.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/WsrfResourceStub.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/WsrfResourceStub.java
deleted file mode 100644
index 704aa25..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/WsrfResourceStub.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client;
-
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Iterator;
-import java.util.List;
-
-import javax.xml.namespace.QName;
-
-import org.apache.airavata.wsmg.client.util.DcDate;
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.OMNamespace;
-import org.apache.axiom.om.OMNode;
-import org.apache.axiom.om.util.UUIDGenerator;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeaderBlock;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
-import org.apache.axis2.client.ServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- *
- * TODO To change the template for this generated type comment go to Window - Preferences - Java - Code Style - Code
- * Templates
- */
-public class WsrfResourceStub {
- private final static Logger logger = LoggerFactory.getLogger(WsrfResourceStub.class);
- private final static OMFactory factory = OMAbstractFactory.getOMFactory();
- private final static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
- protected Options opts;
-
- private EndpointReference resourceEndpointReference;
-
- private long timeoutInMilliSeconds;
-
- protected WsrfResourceStub(EndpointReference resourceEpr, long timeout) {
- this.resourceEndpointReference = resourceEpr;
- logger.info("resourceEprInWsrfResourceStub Constructor" + resourceEpr.toString());
-
- timeoutInMilliSeconds = timeout;
-
- opts = new Options();
- opts.setProperty(org.apache.axis2.transport.http.HTTPConstants.CHUNKED, Boolean.FALSE);
- opts.setTo(resourceEpr);
- opts.setTimeOutInMilliSeconds(timeout);
-
- }
-
- public EndpointReference getResourceEpr() {
- return resourceEndpointReference;
- }
-
- public long getTimeoutInMilliSeconds() {
- return timeoutInMilliSeconds;
- }
-
- public void setTimeoutInMilliSeconds(long timeout) {
- timeoutInMilliSeconds = timeout;
- }
-
- public void destroy() throws AxisFault {
- String uuid = UUIDGenerator.getUUID();
- opts.setMessageId(uuid);
- OMElement message = factory.createOMElement("Destroy", NameSpaceConstants.WSRL_NS);
- opts.setAction(message.getNamespace().getNamespaceURI() + "/" + message.getLocalName());
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
-
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(message.getNamespace().getNamespaceURI() + "/" + message.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
-
- try {
- client.sendRobust(message);
- } catch (AxisFault axisFault) {
- axisFault.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }finally {
- client.cleanup();
- client.cleanupTransport();
- }
- }
-
- public void setTerminationTime(Calendar cal) throws AxisFault {
- String uuid = UUIDGenerator.getUUID();
- opts.setMessageId(uuid);
- OMElement message = factory.createOMElement("SetTerminationTime", NameSpaceConstants.WSRL_NS);
- opts.setAction(message.getNamespace().getNamespaceURI() + "/" + message.getLocalName());
-
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
-
- OMElement child = factory.createOMElement("RequestedTerminationTime", message.getNamespace(), message);
-
- if (cal == null) {
- OMNamespace XSI_NS = factory.createOMNamespace("http://www.w3.org/2001/XMLSchema", "xsd");
- child.addAttribute("nill", "true", XSI_NS);
- // (XmlConstants.XSI_NS, "nil", "true");
- } else {
-
- DcDate dcDate = new DcDate(cal);
- child.setText(dcDate.toString());
- }
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(message.getNamespace().getNamespaceURI() + "/" + message.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
- try {
- client.sendRobust(message);
- } catch (AxisFault axisFault) {
- axisFault.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }finally {
- client.cleanupTransport();
- }
-
- }
-
- public List<OMElement> getResourceProperty(QName qn) throws AxisFault { // List<XmlElement>
-
- OMElement messageEl = factory.createOMElement("GetResourceProperty", NameSpaceConstants.WSRP_NS);
- String uuid = UUIDGenerator.getUUID();
-
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
- opts.setMessageId(uuid);
- opts.setAction(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- QName textQName = new QName(qn.getNamespaceURI(), qn.getLocalPart(), qn.getPrefix());
-
- factory.createOMText(messageEl, textQName);
-
- if (qn.getPrefix() != null) {
- OMNamespace ns = factory.createOMNamespace(qn.getNamespaceURI(), qn.getPrefix());
- messageEl.declareNamespace(ns);
- }
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
-
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
-
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
- OMElement responseMessage = client.sendReceive(messageEl);
- client.cleanupTransport();
-
- List<OMElement> list = elementsAsList(responseMessage);
- return list;
- }
-
- public List<OMElement> getMultipleResourceProperties(QName[] qnamez) throws AxisFault { // TODO
-
- OMElement messageEl = factory.createOMElement("GetMultipleResourceProperties", NameSpaceConstants.WSRP_NS);
- String uuid = UUIDGenerator.getUUID();
- opts.setMessageId(uuid);
- opts.setAction(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
-
- // message.addChild(new QNameElText(message, WidgetService.WIDGET_NS,
- // "TerminationTime"));
- // message.declareNamespace(WidgetService.WIDGET_NS);
- // wsrp:ResourceProperty
-
- for (QName qn : qnamez) {
-
- OMNamespace ns = factory.createOMNamespace(qn.getNamespaceURI(), qn.getPrefix());
-
- OMElement child = factory.createOMElement("ResourceProperty", NameSpaceConstants.WSRP_NS);
-
- QName textQName = new QName(qn.getNamespaceURI(), qn.getLocalPart(), qn.getPrefix());
-
- factory.createOMText(child, textQName);
-
- if (qn.getPrefix() != null) {
- messageEl.declareNamespace(ns);
- }
-
- }
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
-
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
- OMElement responseMessage = client.sendReceive(messageEl);
- client.cleanupTransport();
-
- List<OMElement> list = elementsAsList(responseMessage);
- return list;
- }
-
- public List<OMNode> queryResourcePropertiesByXpath(String query) throws AxisFault {
- if (query == null) {
- throw new IllegalArgumentException();
- }
- String uuid = UUIDGenerator.getUUID();
- opts.setMessageId(uuid);
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
- OMElement messageEl = factory.createOMElement("QueryResourceProperties", NameSpaceConstants.WSRP_NS);
-
- opts.setAction(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- OMElement queryExpressionEl = factory.createOMElement("QueryExpression", NameSpaceConstants.WSRP_NS);
-
- queryExpressionEl.addAttribute("dialect", WsmgCommonConstants.XPATH_DIALECT, null);
-
- queryExpressionEl.setText(query);
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
-
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
- OMElement responseMessage = client.sendReceive(messageEl);
- client.cleanupTransport();
- List<OMNode> list = childrenAsList(responseMessage);
- return list;
- }
-
- public void setResourceProperties(OMElement[] requests) throws AxisFault {
- if (requests.length == 0) {
- throw new IllegalArgumentException("at least one request is required");
- }
-
- OMElement messageEl = factory.createOMElement("SetResourceProperties", NameSpaceConstants.WSRP_NS);
-
- String uuid = UUIDGenerator.getUUID();
- opts.setMessageId(uuid);
- opts.setTimeOutInMilliSeconds(getTimeoutInMilliSeconds());
- opts.setAction(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- for (int i = 0; i < requests.length; i++) {
- messageEl.addChild(requests[i]);
- }
- // message.addChild(new QNameElText(message, WidgetService.WIDGET_NS,
- // "TerminationTime"));
- // message.declareNamespace(WidgetService.WIDGET_NS);
- // wsrp:ResourceProperty
- // message.declareNamespace(WidgetService.WSRL_NS);
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
-
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(uuid);
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(this.resourceEndpointReference.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText(messageEl.getNamespace().getNamespaceURI() + "/" + messageEl.getLocalName());
-
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
- }
- client.setOptions(opts);
- client.sendRobust(messageEl);
-
- }
-
- private List<OMElement> elementsAsList(OMElement responseMessage) {
- List<OMElement> list = new ArrayList<OMElement>();
-
- for (Iterator it = responseMessage.getChildElements(); it.hasNext();) {
- OMElement current = (OMElement) it.next();
- list.add(current);
- }
-
- return list;
- }
-
- private List<OMNode> childrenAsList(OMElement responseMessage) {
- List<OMNode> list = new ArrayList<OMNode>();
-
- for (Iterator it = responseMessage.getChildren(); it.hasNext();) {
- OMNode child = (OMNode) it.next();
- list.add(child);
- }
- return list;
- }
-
- public static void verbose(String msg) {
- System.err.println(msg);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
deleted file mode 100644
index 46bbb73..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-/**
- * AMQPBroadcastReceiver defines an interface that should be implemented by a message receiver that
- * receives broadcast messages from the broker.
- */
-public interface AMQPBroadcastReceiver {
-
- /**
- * Subscribe to the broadcast channel.
- *
- * @throws AMQPException
- */
- public void Subscribe() throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
deleted file mode 100644
index d1343fd..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.apache.axiom.om.OMElement;
-
-/**
- * AMQPBroadcastSender defines an interface that should be implemented by a message sender that
- * sends messages that can be consumed by any downstream client, irrespective of message routing key.
- */
-public interface AMQPBroadcastSender {
-
- /**
- * Send a message.
- *
- * @param message Message to be delivered.
- * @throws AMQPException
- */
- public void Send(OMElement message) throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
deleted file mode 100644
index 5545edc..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-public interface AMQPCallback {
-
- /**
- * Gets called when a message is available on the receiver.
- *
- * @param message Message that is available.
- */
- public void onMessage(String message);
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
deleted file mode 100644
index bd3db9e..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import com.rabbitmq.client.ConnectionFactory;
-
-import java.util.Properties;
-
-/**
- * AMQPClient class takes care of establishing/terminating connections with the broker.
- */
-public class AMQPClient {
- protected final ConnectionFactory connectionFactory = new ConnectionFactory();
-
- /**
- * Create an instance of client.
- *
- * @param properties Connection properties.
- */
- public AMQPClient(Properties properties) {
- connectionFactory.setHost(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST));
- String port = properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT);
- connectionFactory.setPort((port == null) ? 5672 : Integer.parseInt(port));
- connectionFactory.setUsername(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME));
- connectionFactory.setPassword(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD));
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
deleted file mode 100644
index 2f7c3d8..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-/**
- * AMQPException is an extension for AMQP-specific exception handling.
- */
-public class AMQPException extends Exception {
-
- public AMQPException() {
- super();
- }
-
- public AMQPException(String message) {
- super(message);
- }
-
- public AMQPException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public AMQPException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
deleted file mode 100644
index 82eeb2c..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-/**
- * AMQPReceiver defines an interface that should be implemented by a message receiver that
- * receives messages selectively based on a unique routing key. A message would only get delivered
- * to a subscriber if and only if the routing key of message satisfies the subscription key.
- */
-public interface AMQPReceiver {
-
- /**
- * Subscribe to a channel.
- *
- * @param key Key that defines the channel binging.
- * @throws AMQPException
- */
- public void Subscribe(AMQPRoutingKey key) throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
deleted file mode 100644
index 5e7c3ff..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.apache.axiom.om.OMElement;
-import org.jaxen.JaxenException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * AMQPRoutingAwareClient class takes care of handling routing keys so that a derived class
- * can only have the logic for sending/receiving messages based on its intended message flow pattern.
- */
-public class AMQPRoutingAwareClient extends AMQPClient {
- private static final Logger log = LoggerFactory.getLogger(AMQPClient.class);
-
- private static final String ELEMENT_EVENT = "event";
- private static final String ELEMENT_KEY = "key";
- private static final String ELEMENT_SEGMENT = "segment";
- private static final String ATTRIBUTE_NAME = "name";
-
- private HashMap<String, HashMap<String, AMQPRoutingKey>> eventRoutingKeys = new HashMap<String, HashMap<String, AMQPRoutingKey>>();
-
- /**
- * Create an instance of client.
- *
- * @param properties Connection properties.
- */
- public AMQPRoutingAwareClient(Properties properties) {
- super(properties);
- }
-
- /**
- * Initialize the client.
- *
- * @param routingKeys Routing key configuration.
- * @throws AMQPException on error.
- */
- public void init(Element routingKeys) throws AMQPException {
- if (routingKeys != null) {
- NodeList events = routingKeys.getElementsByTagName(ELEMENT_EVENT);
- for (int i = 0; i < events.getLength(); i++) {
- // event
- Element event = (Element)(events.item(i));
- String eventName = event.getAttribute(ATTRIBUTE_NAME).trim();
- if ((eventName == null) || (eventName.isEmpty()) || eventRoutingKeys.containsKey(eventName)) {
- continue;
- }
-
- HashMap<String, AMQPRoutingKey> eventKeys = new HashMap<String, AMQPRoutingKey>();
- eventRoutingKeys.put(eventName, eventKeys);
-
- // keys
- NodeList keys = event.getElementsByTagName(ELEMENT_KEY);
- for (int j = 0; j < keys.getLength(); j++) {
- Element key = (Element)(keys.item(j));
- String keyName = key.getAttribute(ATTRIBUTE_NAME).trim();
- if ((keyName == null) || (keyName.isEmpty()) || eventKeys.containsKey(keyName)) {
- continue;
- }
-
- AMQPRoutingKey routingKey = new AMQPRoutingKey(eventName, keyName);
- eventKeys.put(keyName, routingKey);
-
- // segments
- NodeList segments = key.getElementsByTagName(ELEMENT_SEGMENT);
- for (int k = 0; k < segments.getLength(); k++) {
- Element segment = (Element)(segments.item(k));
- String segmentName = segment.getAttribute(ATTRIBUTE_NAME).trim();
- if ((segmentName == null) || (segmentName.isEmpty()) || routingKey.containsSegment(segmentName)) {
- continue;
- }
-
- String segmentExpression = segment.getTextContent().trim();
- if (-1 != segmentExpression.indexOf('@')) {
- // Attribute
- routingKey.addEvaluatableAttributeSegment(segmentName, segmentExpression);
- } else {
- // Element
- routingKey.addEvaluatableElementSegment(segmentName, segmentExpression);
- }
- }
- }
- }
- }
- }
-
- /**
- * Initialize client. Load routing configuration on its own.
- *
- * @throws AMQPException on error.
- */
- public void init() throws AMQPException {
- init(AMQPUtil.loadRoutingKeys());
- }
-
- /**
- * Check if a given message is routable as per routing configuration.
- *
- * @param message Message to be routed.
- * @return true if routable or false otherwise.
- */
- protected boolean isRoutable(OMElement message) {
- return eventRoutingKeys.containsKey(message.getLocalName());
- }
-
- /**
- * Evaluate the set of native routing keys for a given message.
- *
- * @param message Message for which the routing keys are required.
- * @param routingKeys Possible set of routing keys.
- */
- protected void getRoutingKeys(OMElement message, List<String> routingKeys) {
- HashMap<String, AMQPRoutingKey> eventKeys = eventRoutingKeys.get(message.getLocalName());
- if (eventKeys != null) {
-
- for (AMQPRoutingKey eventKey : eventKeys.values()) {
- try {
- String routingKey = eventKey.evaluate(message);
- if (!routingKey.isEmpty()) {
- routingKeys.add(routingKey);
- }
- } catch (JaxenException e) {
- // Do nothing. The erroneous key will be ignored.
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
deleted file mode 100644
index 2cb52cb..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.apache.axiom.om.OMAttribute;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.xpath.AXIOMXPath;
-import org.jaxen.JaxenException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * AMQPRoutingKey represents an AMQP routing key. A key would consist of one or more segments where
- * a segment would be an element or an attribute of an event.
- */
-public class AMQPRoutingKey {
- private static final Logger log = LoggerFactory.getLogger(AMQPRoutingKey.class);
-
- private String eventName = "";
- private String keyName = "";
- private List<Segment> segments = new ArrayList<Segment>();
-
- public AMQPRoutingKey(String eventName, String keyName) {
- this.eventName = eventName;
- this.keyName = keyName;
- }
-
- /**
- * Get associated event name.
- *
- * @return Event name.
- */
- public String getEventName() {
- return eventName;
- }
-
- /**
- * Set associated event name.
- *
- * @param eventName Event name.
- */
- public void setEventName(String eventName) {
- this.eventName = eventName;
- }
-
- /**
- * Get name of key.
- *
- * @return Key name.
- */
- public String getKeyName() {
- return keyName;
- }
-
- /**
- * Set name of key.
- *
- * @param keyName Key name.
- */
- public void setKeyName(String keyName) {
- this.keyName = keyName;
- }
-
- /**
- * Check if a segment already exists.
- *
- * @param name Name of the segment.
- * @return true if exists or false otherwise.
- */
- boolean containsSegment(String name) {
- boolean found = false;
-
- for (Segment segment : segments) {
- if (segment.getName().equals(name)) {
- found = true;
- break;
- }
- }
-
- return found;
- }
-
- /**
- * Add a segment.
- *
- * @param name Name of the segment.
- * @param value Value of the segment.
- * @throws AMQPException on duplicate segment.
- */
- public void addSegment(String name, String value) throws AMQPException {
- segments.add(new Segment(name, value));
- }
-
- /**
- * Add an evaluatable element segment.
- *
- * @param name Name of the element.
- * @param expression Expression that needs evaluating to retrieve the value of element.
- * @throws AMQPException on duplicate element.
- */
- public void addEvaluatableElementSegment(String name, String expression) throws AMQPException {
- try {
- segments.add(new EvaluatableElementSegment(name, expression));
- } catch (JaxenException e) {
- throw new AMQPException(e);
- }
- }
-
- /**
- * Add an evaluatable attribute segment.
- *
- * @param name Name of the attribute.
- * @param expression Expression that needs evaluating to retrieve the value of attribute.
- * @throws AMQPException on duplicate attribute.
- */
- public void addEvaluatableAttributeSegment(String name, String expression) throws AMQPException {
- try {
- segments.add(new EvaluatableAttributeSegment(name, expression));
- } catch (JaxenException e) {
- throw new AMQPException(e);
- }
- }
-
- /**
- * Generate native AMQP key using the segments.
- *
- * @return Routing key in native(AMQP) format.
- */
- public String getNativeKey() {
- String routingKey = !eventName.isEmpty() ? eventName : "*";
-
- boolean segmentsGiven = false;
- for (Segment segment : segments) {
-
- String segmentValue = segment.getValue().trim();
- if (!segmentValue.isEmpty()) {
- routingKey += ".";
- routingKey += segment.getValue();
-
- segmentsGiven = true;
- }
- }
-
- if (!segmentsGiven) {
- routingKey += ".";
- routingKey += "#";
- }
-
- return routingKey;
- }
-
- /**
- * Evaluate the routing key for a given message.
- *
- * @param message Message for which the routing key is required.
- * @return Routing key.
- * @throws JaxenException on expression evaluation error.
- */
- public String evaluate(OMElement message) throws JaxenException {
- String routingKey = eventName;
-
- for (Segment segment : segments) {
-
- if (segment instanceof EvaluatableSegment) {
- routingKey += ".";
- routingKey += ((EvaluatableSegment)segment).evaluate(message);
- }
- }
-
- return routingKey;
- }
-
- /**
- * Segment provides a base implementation for segments. This class could be extended
- * by a particular type of segment(element/attribute) based on its specific requirements.
- */
- private class Segment {
-
- private String name = "";
- private String value = "";
-
- /**
- * Create an instance of segment.
- *
- * @param name Name of segment.
- */
- public Segment(String name) {
- this.name = name;
- }
-
- /**
- * Create an instance of segment.
- *
- * @param name Name of segment.
- * @param value Value of segment.
- */
- public Segment(String name, String value) {
- this.name = name;
- this.value = value;
- }
-
- /**
- * Get name of segment.
- *
- * @return Name.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Set name of segment.
- *
- * @param name Name to be set.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Get value of segment.
- *
- * @return Value.
- */
- public String getValue() {
- return value;
- }
-
- /**
- * Set value of segment.
- *
- * @param value Value to be set.
- */
- public void setValue(String value) {
- this.value = value;
- }
- }
-
- /**
- * EvaluatableSegment provides a base implementation for segments that are evaluated on the fly
- * based on an incoming event. This class could be extended by a particular type of segment(element/attribute)
- * based on its specific requirements.
- */
- private abstract class EvaluatableSegment extends Segment {
-
- private static final String NAMESPACE_PREFIX = "ns";
- private static final String NAMESPACE_URL = "http://airavata.apache.org/schemas/wft/2011/08";
-
- protected AXIOMXPath xpathProcessor = null;
-
- /**
- * Create an instance of EvaluatableSegment.
- *
- * @param name Name of segment.
- * @param expression Expression that needs evaluating to retrieve the value of segment.
- * @throws JaxenException on expression evalution error.
- */
- protected EvaluatableSegment(String name, String expression) throws JaxenException {
- super(name);
-
- xpathProcessor = new AXIOMXPath(getNormalizedExpression(expression));
- xpathProcessor.addNamespace(NAMESPACE_PREFIX, NAMESPACE_URL);
- }
-
- /**
- * Normalize an expression to include namespace.
- *
- * @param expression Expression to be normalized.
- * @return Normalized expression.
- */
- private String getNormalizedExpression(String expression) {
- try {
- StringBuffer normalizedExpression = new StringBuffer();
- normalizedExpression.append(NAMESPACE_PREFIX);
- normalizedExpression.append(":");
-
- expression = expression.trim();
- for (int i = 0; i < expression.length(); i++) {
- char c = expression.charAt(i);
-
- normalizedExpression.append(c);
- if (((c == '/') && (expression.charAt(i + 1) != '@')) || (c == '@')) {
- normalizedExpression.append(NAMESPACE_PREFIX);
- normalizedExpression.append(":");
- }
- }
-
- return normalizedExpression.toString();
- } catch (ArrayIndexOutOfBoundsException e) {
- return "";
- }
- }
-
- /**
- * Returns value of segment.
- *
- * @param message Message from which the data is extracted.
- * @return Value of segment.
- * @throws JaxenException on error.
- */
- public abstract String evaluate(OMElement message) throws JaxenException;
- }
-
- /**
- * EvaluatableElementSegment is the EvaluatableSegment extension for event elements.
- */
- private class EvaluatableElementSegment extends EvaluatableSegment {
-
- public EvaluatableElementSegment(String name, String expression) throws JaxenException {
- super(name, expression);
- }
-
- @Override
- public String evaluate(OMElement message) throws JaxenException {
- String value = "";
-
- OMElement element = (OMElement)xpathProcessor.selectSingleNode(message);
- if (element != null) {
- value = element.getText();
- }
-
- return value;
- }
- }
-
- /**
- * EvaluatableAttributeSegment is the EvaluatableSegment extension for event attributes.
- */
- private class EvaluatableAttributeSegment extends EvaluatableSegment {
-
- public EvaluatableAttributeSegment(String name, String expression) throws JaxenException {
- super(name, expression);
- }
-
- @Override
- public String evaluate(OMElement message) throws JaxenException {
- String value = "";
-
- OMAttribute attribute = (OMAttribute)xpathProcessor.selectSingleNode(message);
- if (attribute != null) {
- value = attribute.getAttributeValue();
- }
-
- return value;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
deleted file mode 100644
index 30d25ed..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.apache.axiom.om.OMElement;
-
-/**
- * AMQPSender defines an interface that should be implemented by a message sender that
- * sends messages to one or more clients that receive messages selectively, based on a
- * routing key. The routing key is formed by a set of fields in the message.
- */
-public interface AMQPSender {
-
- /**
- * Send a message.
- *
- * @param message Message to be delivered.
- * @throws AMQPException on error.
- */
- public void Send(OMElement message) throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
deleted file mode 100644
index e49607f..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-/**
- * AMQPTopicReceiver defines an interface that should be implemented by a message receiver that
- * receives messages from a topic. A message would only get delivered to a topic subscriber
- * if and only if the routing key of message satisfies the topic.
- */
-public interface AMQPTopicReceiver {
-
- /**
- * Subscribe to a topic.
- *
- * @param topic Topic that needs to be subscribed to.
- * @throws AMQPException on error.
- */
- public void Subscribe(AMQPRoutingKey topic) throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
deleted file mode 100644
index 0cd6ca8..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.apache.axiom.om.OMElement;
-
-/**
- * AMQPTopicSender defines an interface that should be implemented by a message sender that
- * sends messages to one or more consumers that have subscribed to topics. A message
- * would only be delivered to a topic subscriber if and only if the routing key of message
- * satisfies the topic.
- */
-public interface AMQPTopicSender {
-
- /**
- * Send a message.
- *
- * @param message Message to be delivered.
- * @throws AMQPException on error.
- */
- public void Send(OMElement message) throws AMQPException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
deleted file mode 100644
index 71b0210..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import java.io.File;
-import java.net.URL;
-
-/**
- * AMQPUtil provides common utilities required for the AMQP transport implementation.
- */
-public class AMQPUtil {
-
- public static final String CONFIG_AMQP_ENABLE = "amqp.notification.enable";
-
- public static final String CONFIG_AMQP_PROVIDER_HOST = "amqp.broker.host";
- public static final String CONFIG_AMQP_PROVIDER_PORT = "amqp.broker.port";
- public static final String CONFIG_AMQP_PROVIDER_USERNAME = "amqp.broker.username";
- public static final String CONFIG_AMQP_PROVIDER_PASSWORD = "amqp.broker.password";
-
- public static final String CONFIG_AMQP_SENDER = "amqp.sender";
- public static final String CONFIG_AMQP_TOPIC_SENDER = "amqp.topic.sender";
- public static final String CONFIG_AMQP_BROADCAST_SENDER = "amqp.broadcast.sender";
-
- public static final String EXCHANGE_NAME_DIRECT = "ws-messenger-direct";
- public static final String EXCHANGE_TYPE_DIRECT = "direct";
- public static final String EXCHANGE_NAME_TOPIC = "ws-messenger-topic";
- public static final String EXCHANGE_TYPE_TOPIC = "topic";
- public static final String EXCHANGE_NAME_FANOUT = "ws-messenger-fanout";
- public static final String EXCHANGE_TYPE_FANOUT = "fanout";
-
- private static final String ROUTING_KEY_FILENAME = "amqp-routing-keys.xml";
-
- /**
- * Load routing keys from configuration file.
- *
- * @return Root element of routing key object model.
- * @throws AMQPException on error.
- */
- public static Element loadRoutingKeys() throws AMQPException {
- try {
- URL resource = AMQPUtil.class.getClassLoader().getResource(ROUTING_KEY_FILENAME);
- DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
- DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
- Document document = docBuilder.parse(new File(resource.getPath()));
-
- return document.getDocumentElement();
- } catch (Exception e) {
- throw new AMQPException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
deleted file mode 100644
index b5d2588..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.QueueingConsumer;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * AMQPBroadcastReceiverImpl class provides functionality to consume a broadcast message feed.
- */
-public class AMQPBroadcastReceiverImpl extends AMQPClient implements AMQPBroadcastReceiver {
- private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastReceiverImpl.class);
-
- private AMQPCallback callback = null;
-
- public AMQPBroadcastReceiverImpl(Properties properties, AMQPCallback callback) {
- super(properties);
-
- this.callback = callback;
- }
-
- public void Subscribe() throws AMQPException {
- if (callback != null) {
- try {
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
-
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_FANOUT, "");
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
-
- callback.onMessage(message);
- }
- } catch (Exception e) {
- throw new AMQPException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
deleted file mode 100644
index 26e0bcd..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.airavata.wsmg.client.amqp.AMQPBroadcastSender;
-import org.apache.airavata.wsmg.client.amqp.AMQPException;
-import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
-import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
-import org.apache.axiom.om.OMElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * AMQPBroadcastSenderImpl provides functionality to produce a broadcast message feed.
- */
-public class AMQPBroadcastSenderImpl extends AMQPRoutingAwareClient implements AMQPBroadcastSender {
- private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastSenderImpl.class);
-
- public AMQPBroadcastSenderImpl(Properties properties) {
- super(properties);
- }
-
- public void Send(OMElement message) throws AMQPException {
- try {
- if (isRoutable(message)) {
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
-
- channel.basicPublish(AMQPUtil.EXCHANGE_NAME_FANOUT, "", null, message.toString().getBytes());
-
- channel.close();
- connection.close();
- }
- } catch (IOException e) {
- throw new AMQPException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
deleted file mode 100644
index 694c381..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.QueueingConsumer;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * AMQPReceiverImpl class provides functionality to receive messages selectively based on a unique routing key.
- */
-public class AMQPReceiverImpl extends AMQPRoutingAwareClient implements AMQPReceiver {
- private static final Logger log = LoggerFactory.getLogger(AMQPReceiverImpl.class);
-
- private AMQPCallback callback = null;
-
- public AMQPReceiverImpl(Properties properties, AMQPCallback callback) {
- super(properties);
-
- this.callback = callback;
- }
-
- public void Subscribe(AMQPRoutingKey key) throws AMQPException {
- if (callback != null) {
- try {
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
-
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_DIRECT, key.getNativeKey());
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
-
- callback.onMessage(message);
- }
- } catch (Exception e) {
- throw new AMQPException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
deleted file mode 100644
index 5cff26a..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.airavata.wsmg.client.amqp.AMQPException;
-import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
-import org.apache.airavata.wsmg.client.amqp.AMQPSender;
-import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
-import org.apache.axiom.om.OMElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * AMQPSenderImpl class provides functionality to send messages with a unique routing key
- * so that a receiver can consume them selectively.
- */
-public class AMQPSenderImpl extends AMQPRoutingAwareClient implements AMQPSender {
- private static final Logger log = LoggerFactory.getLogger(AMQPSenderImpl.class);
-
- public AMQPSenderImpl(Properties properties) {
- super(properties);
- }
-
- public void Send(OMElement message) throws AMQPException {
- try {
- if (isRoutable(message)) {
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
-
- List<String> routingKeys = new ArrayList<String>();
- getRoutingKeys(message, routingKeys);
-
- for (String routingKey : routingKeys) {
- channel.basicPublish(
- AMQPUtil.EXCHANGE_NAME_DIRECT, routingKey, null, message.toString().getBytes());
- }
-
- channel.close();
- connection.close();
- }
- } catch (IOException e) {
- throw new AMQPException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
deleted file mode 100644
index eb9b929..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.QueueingConsumer;
-import org.apache.airavata.wsmg.client.amqp.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-/**
- * AMQPTopicReceiver class provides functionality to receive messages based on a pattern.
- * These patterns are also called Topics.
- */
-public class AMQPTopicReceiverImpl extends AMQPRoutingAwareClient implements AMQPTopicReceiver {
- private static final Logger log = LoggerFactory.getLogger(AMQPTopicReceiverImpl.class);
-
- private AMQPCallback callback = null;
-
- public AMQPTopicReceiverImpl(Properties properties, AMQPCallback callback) {
- super(properties);
-
- this.callback = callback;
- }
-
- public void Subscribe(AMQPRoutingKey topic) throws AMQPException {
- if (callback != null) {
- try {
- Connection connection = connectionFactory.newConnection();
-
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
-
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_TOPIC, topic.getNativeKey());
-
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
-
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
-
- callback.onMessage(message);
- }
- } catch (Exception e) {
- throw new AMQPException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
deleted file mode 100644
index 8a351a5..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.amqp.rabbitmq;
-
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import org.apache.airavata.wsmg.client.amqp.AMQPException;
-import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
-import org.apache.airavata.wsmg.client.amqp.AMQPTopicSender;
-import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
-import org.apache.axiom.om.OMElement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * AMQPTopicSenderImpl class provides functionality to send messages that can be consumed
- * based on a pattern. These patterns are also called Topics.
- */
-public class AMQPTopicSenderImpl extends AMQPRoutingAwareClient implements AMQPTopicSender {
- private static final Logger log = LoggerFactory.getLogger(AMQPTopicSenderImpl.class);
-
- public AMQPTopicSenderImpl(Properties properties) {
- super(properties);
- }
-
- public void Send(OMElement message) throws AMQPException {
- try {
- if (isRoutable(message)) {
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
-
- List<String> routingKeys = new ArrayList<String>();
- getRoutingKeys(message, routingKeys);
-
- for (String routingKey : routingKeys) {
- channel.basicPublish(
- AMQPUtil.EXCHANGE_NAME_TOPIC, routingKey, null, message.toString().getBytes());
- }
-
- channel.close();
- connection.close();
- }
- } catch (IOException e) {
- throw new AMQPException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/commons/NotificationProducer.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/commons/NotificationProducer.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/commons/NotificationProducer.java
deleted file mode 100644
index b3d5ae8..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/commons/NotificationProducer.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.commons;
-
-import org.apache.airavata.wsmg.commons.WsmgCommonConstants;
-import org.apache.airavata.wsmg.commons.NameSpaceConstants;
-import org.apache.axiom.om.OMAbstractFactory;
-import org.apache.axiom.om.OMElement;
-import org.apache.axiom.om.OMFactory;
-import org.apache.axiom.om.util.UUIDGenerator;
-import org.apache.axiom.soap.SOAPFactory;
-import org.apache.axiom.soap.SOAPHeaderBlock;
-import org.apache.axis2.AxisFault;
-import org.apache.axis2.addressing.EndpointReference;
-import org.apache.axis2.client.Options;
-import org.apache.axis2.client.ServiceClient;
-
-//import org.apache.airavata.wsmg.WsmgConstants;
-
-//import org.apache.airavata.wsmg.WsmgConstants;
-
-public class NotificationProducer {
-
- private final OMFactory factory = OMAbstractFactory.getOMFactory();
- private final SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();
-
- public NotificationProducer() {
-
- }
-
- public synchronized OMElement deliverMessage(OMElement notificationMessage, String type,
- EndpointReference brokerLocationEPR, long timeout) throws AxisFault {
-
- ServiceClient client = createServiceClient(type, notificationMessage, brokerLocationEPR, timeout, null);
-
- OMElement ret = client.sendReceive(notificationMessage);
- client.cleanup();
- client.cleanupTransport();
- return ret;
-
- }
-
- public synchronized OMElement deliverMessage(OMElement notificationMessage, String type,
- EndpointReference brokerLocationEPR, long timeout, OMElement topicExpressionEl) throws AxisFault {
-
- ServiceClient client = createServiceClient(type, notificationMessage, brokerLocationEPR, timeout,
- topicExpressionEl);
-
- OMElement ret = client.sendReceive(notificationMessage);
- client.cleanup();
- client.cleanupTransport();
- return ret;
-
- }
-
- private ServiceClient createServiceClient(String type, OMElement notificationMessage,
- EndpointReference brokerLocationEPR, long timeout, OMElement topicExpressionEl) throws AxisFault {
-
- ServiceClient client = new ServiceClient();
-
- if (client.getAxisConfiguration().getModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING) != null) {
- brokerLocationEPR.addReferenceParameter(topicExpressionEl);
- client.engageModule(WsmgCommonConstants.AXIS_MODULE_NAME_ADDRESSING);
- } else {
- SOAPHeaderBlock msgId = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
- msgId.setText(UUIDGenerator.getUUID());
-
- SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
- to.setText(brokerLocationEPR.getAddress());
-
- SOAPHeaderBlock action = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
- action.setText("wsnt".equals(type) ? NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify"
- : WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION);
- if (topicExpressionEl != null) {
- try {
- client.addHeader(org.apache.axiom.om.util.ElementHelper.toSOAPHeaderBlock(topicExpressionEl,
- soapfactory));
- } catch (Exception e) {
- throw AxisFault.makeFault(e);
- }
- }
- client.addHeader(action);
- client.addHeader(msgId);
- client.addHeader(to);
-
- }
-
- Options opts = new Options();
-
- opts.setAction("wsnt".equals(type) ? NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify"
- : WsmgCommonConstants.WSMG_PUBLISH_SOAP_ACTION);
-
- opts.setTo(brokerLocationEPR);
- opts.setTimeOutInMilliSeconds(timeout);
-
- client.setOptions(opts);
-
- return client;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/cab15715/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/msgbox/MessagePuller.java
----------------------------------------------------------------------
diff --git a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/msgbox/MessagePuller.java b/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/msgbox/MessagePuller.java
deleted file mode 100644
index 7d5b895..0000000
--- a/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/msgbox/MessagePuller.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.wsmg.client.msgbox;
-
-import java.rmi.RemoteException;
-import java.util.Iterator;
-
-import org.apache.airavata.wsmg.client.NotificationHandler;
-import org.apache.airavata.wsmg.msgbox.client.MsgBoxClient;
-import org.apache.axiom.om.OMElement;
-import org.apache.axis2.addressing.EndpointReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MessagePuller {
-
- private final static Logger logger = LoggerFactory.getLogger(MessagePuller.class);
-
- MsgBoxClient msgBoxUser = null;
-
- EndpointReference msgBoxId = null;
-
- NotificationHandler handler = null;
-
- long backoff = 1000;
-
- long unavailableInterval = 300000;
-
- long timeout = 30000L;
-
- boolean stopPulling = false;
-
- public MessagePuller() {
- }
-
- public MessagePuller(MsgBoxClient msgBoxUser, EndpointReference msgBoxAddr, NotificationHandler handler,
- long backoff, long timeout) {
- this.msgBoxUser = msgBoxUser;
- this.msgBoxId = msgBoxAddr;
- this.handler = handler;
- this.backoff = backoff;
- this.timeout = timeout;
- }
-
- public MessagePuller(MsgBoxClient msgBoxUser, EndpointReference msgBoxId, NotificationHandler handler) {
- this(msgBoxUser, msgBoxId, handler, 1000, 30000);
- }
-
- public void startPulling() {
- Puller puller = new Puller();
- new Thread(puller).start();
- }
-
- public void stopPulling() {
- stopPulling = true;
- }
-
-
- protected class Puller implements Runnable {
-
- public void run() {
-
- long backofftime = backoff;
- while (!stopPulling) {
- Iterator<OMElement> messages = null;
- try {
-
- messages = msgBoxUser.takeMessagesFromMsgBox(msgBoxId, timeout);
-
- try {
- if (messages == null || (!messages.hasNext())) {
- // sleep only when nothing was found
- Thread.sleep(backoff);
- }
- } catch (InterruptedException ex) {
- logger.error("the message puller thread was interruped", ex);
- }
-
- if (messages != null && messages.hasNext()) {
- backofftime = backoff;
- while (messages.hasNext()) {
- String notification = messages.next().toStringWithConsume();
- try {
- handler.handleNotification(notification);
- } catch (Throwable e) {
- logger.info("Error occured in the user callback for message" + notification
- + e.toString());
- }
-
- }
- }
- } catch (Exception e) {
- logger.error("exception on MessagePuller", e);
- try {
- backofftime = backofftime * 2;
- Thread.sleep(Math.min(backofftime, unavailableInterval));
- backofftime = Math.min(backofftime, unavailableInterval);
- } catch (InterruptedException e1) {
- logger.error("message puller was interruped while sleeping", e1);
- }
- }
- }
- return;
- }
- }
-
-}