You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2013/07/20 17:39:18 UTC
svn commit: r1505154 - in /airavata/trunk/modules:
distribution/airavata-server/src/main/resources/conf/ ws-messenger/client/
ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/
ws-messenger/client/src/main/java/org/apache/airavata/...
Author: smarru
Date: Sat Jul 20 15:39:17 2013
New Revision: 1505154
URL: http://svn.apache.org/r1505154
Log:
applying AMQP patches, thanks Danushka for the contribution
Added:
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml
airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/
airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java
airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java
airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/
airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
Modified:
airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
airavata/trunk/modules/ws-messenger/client/pom.xml
airavata/trunk/modules/ws-messenger/messagebroker/pom.xml
airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
Modified: airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties (original)
+++ airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties Sat Jul 20 15:39:17 2013
@@ -216,6 +216,21 @@ messagePerservationIntervalMinutes=0
class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.AiravataJPARegistry
#class.registry.accessor=org.apache.airavata.rest.client.RegistryClient
+###########################################################################
+# AMQP Notification Configuration
+###########################################################################
+amqp.notification.enable=1
+
+amqp.broker.host=localhost
+amqp.broker.port=5672
+amqp.broker.username=guest
+amqp.broker.password=guest
+
+amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
+amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
+amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
+
+
###---------------------------Computational Middleware Configurations---------------------------###
#enable.application.job.status.history=true
Modified: airavata/trunk/modules/ws-messenger/client/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/pom.xml (original)
+++ airavata/trunk/modules/ws-messenger/client/pom.xml Sat Jul 20 15:39:17 2013
@@ -48,6 +48,13 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
+ </dependency>
+
+ <!-- RabbitMQ client -->
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>3.1.2</version>
</dependency>
</dependencies>
</project>
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPBroadcastReceiver defines an interface that should be implemented by a message receiver that
+ * receives broadcast messages from the broker.
+ */
+public interface AMQPBroadcastReceiver {
+
+ /**
+ * Subscribe to the broadcast channel.
+ *
+ * @throws AMQPException
+ */
+ public void Subscribe() throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPBroadcastSender defines an interface that should be implemented by a message sender that
+ * sends messages that can be consumed by any downstream client, irrespective of message routing key.
+ */
+public interface AMQPBroadcastSender {
+
+ /**
+ * Send a message.
+ *
+ * @param message Message to be delivered.
+ * @throws AMQPException
+ */
+ public void Send(OMElement message) throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+public interface AMQPCallback {
+
+ /**
+ * Gets called when a message is available on the receiver.
+ *
+ * @param message Message that is available.
+ */
+ public void onMessage(String message);
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPClient class takes care of establishing/terminating connections with the broker.
+ */
+public class AMQPClient {
+ protected final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+ /**
+ * Create an instance of client.
+ *
+ * @param properties Connection properties.
+ */
+ public AMQPClient(Properties properties) {
+ connectionFactory.setHost(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST));
+ String port = properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT);
+ connectionFactory.setPort((port == null) ? 5672 : Integer.parseInt(port));
+ connectionFactory.setUsername(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME));
+ connectionFactory.setPassword(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD));
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPException is an extension for AMQP-specific exception handling.
+ */
+public class AMQPException extends Exception {
+
+ public AMQPException() {
+ super();
+ }
+
+ public AMQPException(String message) {
+ super(message);
+ }
+
+ public AMQPException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AMQPException(Throwable cause) {
+ super(cause);
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPReceiver defines an interface that should be implemented by a message receiver that
+ * receives messages selectively based on a unique routing key. A message would only get delivered
+ * to a subscriber if and only if the routing key of message satisfies the subscription key.
+ */
+public interface AMQPReceiver {
+
+ /**
+ * Subscribe to a channel.
+ *
+ * @param key Key that defines the channel binging.
+ * @throws AMQPException
+ */
+ public void Subscribe(AMQPRoutingKey key) throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+import org.jaxen.JaxenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPRoutingAwareClient class takes care of handling routing keys so that a derived class
+ * can only have the logic for sending/receiving messages based on its intended message flow pattern.
+ */
+public class AMQPRoutingAwareClient extends AMQPClient {
+ private static final Logger log = LoggerFactory.getLogger(AMQPClient.class);
+
+ private static final String ELEMENT_EVENT = "event";
+ private static final String ELEMENT_KEY = "key";
+ private static final String ELEMENT_SEGMENT = "segment";
+ private static final String ATTRIBUTE_NAME = "name";
+
+ private HashMap<String, HashMap<String, AMQPRoutingKey>> eventRoutingKeys = new HashMap<String, HashMap<String, AMQPRoutingKey>>();
+
+ /**
+ * Create an instance of client.
+ *
+ * @param properties Connection properties.
+ */
+ public AMQPRoutingAwareClient(Properties properties) {
+ super(properties);
+ }
+
+ /**
+ * Initialize the client.
+ *
+ * @param routingKeys Routing key configuration.
+ * @throws AMQPException on error.
+ */
+ public void init(Element routingKeys) throws AMQPException {
+ if (routingKeys != null) {
+ NodeList events = routingKeys.getElementsByTagName(ELEMENT_EVENT);
+ for (int i = 0; i < events.getLength(); i++) {
+ // event
+ Element event = (Element)(events.item(i));
+ String eventName = event.getAttribute(ATTRIBUTE_NAME).trim();
+ if ((eventName == null) || (eventName.isEmpty()) || eventRoutingKeys.containsKey(eventName)) {
+ continue;
+ }
+
+ HashMap<String, AMQPRoutingKey> eventKeys = new HashMap<String, AMQPRoutingKey>();
+ eventRoutingKeys.put(eventName, eventKeys);
+
+ // keys
+ NodeList keys = event.getElementsByTagName(ELEMENT_KEY);
+ for (int j = 0; j < keys.getLength(); j++) {
+ Element key = (Element)(keys.item(j));
+ String keyName = key.getAttribute(ATTRIBUTE_NAME).trim();
+ if ((keyName == null) || (keyName.isEmpty()) || eventKeys.containsKey(keyName)) {
+ continue;
+ }
+
+ AMQPRoutingKey routingKey = new AMQPRoutingKey(eventName, keyName);
+ eventKeys.put(keyName, routingKey);
+
+ // segments
+ NodeList segments = key.getElementsByTagName(ELEMENT_SEGMENT);
+ for (int k = 0; k < segments.getLength(); k++) {
+ Element segment = (Element)(segments.item(k));
+ String segmentName = segment.getAttribute(ATTRIBUTE_NAME).trim();
+ if ((segmentName == null) || (segmentName.isEmpty()) || routingKey.containsSegment(segmentName)) {
+ continue;
+ }
+
+ String segmentExpression = segment.getTextContent().trim();
+ if (-1 != segmentExpression.indexOf('@')) {
+ // Attribute
+ routingKey.addEvaluatableAttributeSegment(segmentName, segmentExpression);
+ } else {
+ // Element
+ routingKey.addEvaluatableElementSegment(segmentName, segmentExpression);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Initialize client. Load routing configuration on its own.
+ *
+ * @throws AMQPException on error.
+ */
+ public void init() throws AMQPException {
+ init(AMQPUtil.loadRoutingKeys());
+ }
+
+ /**
+ * Check if a given message is routable as per routing configuration.
+ *
+ * @param message Message to be routed.
+ * @return true if routable or false otherwise.
+ */
+ protected boolean isRoutable(OMElement message) {
+ return eventRoutingKeys.containsKey(message.getLocalName());
+ }
+
+ /**
+ * Evaluate the set of native routing keys for a given message.
+ *
+ * @param message Message for which the routing keys are required.
+ * @param routingKeys Possible set of routing keys.
+ */
+ protected void getRoutingKeys(OMElement message, List<String> routingKeys) {
+ HashMap<String, AMQPRoutingKey> eventKeys = eventRoutingKeys.get(message.getLocalName());
+ if (eventKeys != null) {
+
+ for (AMQPRoutingKey eventKey : eventKeys.values()) {
+ try {
+ String routingKey = eventKey.evaluate(message);
+ if (!routingKey.isEmpty()) {
+ routingKeys.add(routingKey);
+ }
+ } catch (JaxenException e) {
+ // Do nothing. The erroneous key will be ignored.
+ }
+ }
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,369 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.jaxen.JaxenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AMQPRoutingKey represents an AMQP routing key. A key would consist of one or more segments where
+ * a segment would be an element or an attribute of an event.
+ */
+public class AMQPRoutingKey {
+ private static final Logger log = LoggerFactory.getLogger(AMQPRoutingKey.class);
+
+ private String eventName = "";
+ private String keyName = "";
+ private List<Segment> segments = new ArrayList<Segment>();
+
+ public AMQPRoutingKey(String eventName, String keyName) {
+ this.eventName = eventName;
+ this.keyName = keyName;
+ }
+
+ /**
+ * Get associated event name.
+ *
+ * @return Event name.
+ */
+ public String getEventName() {
+ return eventName;
+ }
+
+ /**
+ * Set associated event name.
+ *
+ * @param eventName Event name.
+ */
+ public void setEventName(String eventName) {
+ this.eventName = eventName;
+ }
+
+ /**
+ * Get name of key.
+ *
+ * @return Key name.
+ */
+ public String getKeyName() {
+ return keyName;
+ }
+
+ /**
+ * Set name of key.
+ *
+ * @param keyName Key name.
+ */
+ public void setKeyName(String keyName) {
+ this.keyName = keyName;
+ }
+
+ /**
+ * Check if a segment already exists.
+ *
+ * @param name Name of the segment.
+ * @return true if exists or false otherwise.
+ */
+ boolean containsSegment(String name) {
+ boolean found = false;
+
+ for (Segment segment : segments) {
+ if (segment.getName().equals(name)) {
+ found = true;
+ break;
+ }
+ }
+
+ return found;
+ }
+
+ /**
+ * Add a segment.
+ *
+ * @param name Name of the segment.
+ * @param value Value of the segment.
+ * @throws AMQPException on duplicate segment.
+ */
+ public void addSegment(String name, String value) throws AMQPException {
+ segments.add(new Segment(name, value));
+ }
+
+ /**
+ * Add an evaluatable element segment.
+ *
+ * @param name Name of the element.
+ * @param expression Expression that needs evaluating to retrieve the value of element.
+ * @throws AMQPException on duplicate element.
+ */
+ public void addEvaluatableElementSegment(String name, String expression) throws AMQPException {
+ try {
+ segments.add(new EvaluatableElementSegment(name, expression));
+ } catch (JaxenException e) {
+ throw new AMQPException(e);
+ }
+ }
+
+ /**
+ * Add an evaluatable attribute segment.
+ *
+ * @param name Name of the attribute.
+ * @param expression Expression that needs evaluating to retrieve the value of attribute.
+ * @throws AMQPException on duplicate attribute.
+ */
+ public void addEvaluatableAttributeSegment(String name, String expression) throws AMQPException {
+ try {
+ segments.add(new EvaluatableAttributeSegment(name, expression));
+ } catch (JaxenException e) {
+ throw new AMQPException(e);
+ }
+ }
+
+ /**
+ * Generate native AMQP key using the segments.
+ *
+ * @return Routing key in native(AMQP) format.
+ */
+ public String getNativeKey() {
+ String routingKey = !eventName.isEmpty() ? eventName : "*";
+
+ boolean segmentsGiven = false;
+ for (Segment segment : segments) {
+
+ String segmentValue = segment.getValue().trim();
+ if (!segmentValue.isEmpty()) {
+ routingKey += ".";
+ routingKey += segment.getValue();
+
+ segmentsGiven = true;
+ }
+ }
+
+ if (!segmentsGiven) {
+ routingKey += ".";
+ routingKey += "#";
+ }
+
+ return routingKey;
+ }
+
+ /**
+ * Evaluate the routing key for a given message.
+ *
+ * @param message Message for which the routing key is required.
+ * @return Routing key.
+ * @throws JaxenException on expression evaluation error.
+ */
+ public String evaluate(OMElement message) throws JaxenException {
+ String routingKey = eventName;
+
+ for (Segment segment : segments) {
+
+ if (segment instanceof EvaluatableSegment) {
+ routingKey += ".";
+ routingKey += ((EvaluatableSegment)segment).evaluate(message);
+ }
+ }
+
+ return routingKey;
+ }
+
+ /**
+ * Segment provides a base implementation for segments. This class could be extended
+ * by a particular type of segment(element/attribute) based on its specific requirements.
+ */
+ private class Segment {
+
+ private String name = "";
+ private String value = "";
+
+ /**
+ * Create an instance of segment.
+ *
+ * @param name Name of segment.
+ */
+ public Segment(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Create an instance of segment.
+ *
+ * @param name Name of segment.
+ * @param value Value of segment.
+ */
+ public Segment(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+
+ /**
+ * Get name of segment.
+ *
+ * @return Name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set name of segment.
+ *
+ * @param name Name to be set.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get value of segment.
+ *
+ * @return Value.
+ */
+ public String getValue() {
+ return value;
+ }
+
+ /**
+ * Set value of segment.
+ *
+ * @param value Value to be set.
+ */
+ public void setValue(String value) {
+ this.value = value;
+ }
+ }
+
+ /**
+ * EvaluatableSegment provides a base implementation for segments that are evaluated on the fly
+ * based on an incoming event. This class could be extended by a particular type of segment(element/attribute)
+ * based on its specific requirements.
+ */
+ private abstract class EvaluatableSegment extends Segment {
+
+ private static final String NAMESPACE_PREFIX = "ns";
+ private static final String NAMESPACE_URL = "http://airavata.apache.org/schemas/wft/2011/08";
+
+ protected AXIOMXPath xpathProcessor = null;
+
+ /**
+ * Create an instance of EvaluatableSegment.
+ *
+ * @param name Name of segment.
+ * @param expression Expression that needs evaluating to retrieve the value of segment.
+ * @throws JaxenException on expression evalution error.
+ */
+ protected EvaluatableSegment(String name, String expression) throws JaxenException {
+ super(name);
+
+ xpathProcessor = new AXIOMXPath(getNormalizedExpression(expression));
+ xpathProcessor.addNamespace(NAMESPACE_PREFIX, NAMESPACE_URL);
+ }
+
+ /**
+ * Normalize an expression to include namespace.
+ *
+ * @param expression Expression to be normalized.
+ * @return Normalized expression.
+ */
+ private String getNormalizedExpression(String expression) {
+ try {
+ StringBuffer normalizedExpression = new StringBuffer();
+ normalizedExpression.append(NAMESPACE_PREFIX);
+ normalizedExpression.append(":");
+
+ expression = expression.trim();
+ for (int i = 0; i < expression.length(); i++) {
+ char c = expression.charAt(i);
+
+ normalizedExpression.append(c);
+ if (((c == '/') && (expression.charAt(i + 1) != '@')) || (c == '@')) {
+ normalizedExpression.append(NAMESPACE_PREFIX);
+ normalizedExpression.append(":");
+ }
+ }
+
+ return normalizedExpression.toString();
+ } catch (ArrayIndexOutOfBoundsException e) {
+ return "";
+ }
+ }
+
+ /**
+ * Returns value of segment.
+ *
+ * @param message Message from which the data is extracted.
+ * @return Value of segment.
+ * @throws JaxenException on error.
+ */
+ public abstract String evaluate(OMElement message) throws JaxenException;
+ }
+
+ /**
+ * EvaluatableElementSegment is the EvaluatableSegment extension for event elements.
+ */
+ private class EvaluatableElementSegment extends EvaluatableSegment {
+
+ public EvaluatableElementSegment(String name, String expression) throws JaxenException {
+ super(name, expression);
+ }
+
+ @Override
+ public String evaluate(OMElement message) throws JaxenException {
+ String value = "";
+
+ OMElement element = (OMElement)xpathProcessor.selectSingleNode(message);
+ if (element != null) {
+ value = element.getText();
+ }
+
+ return value;
+ }
+ }
+
+ /**
+ * EvaluatableAttributeSegment is the EvaluatableSegment extension for event attributes.
+ */
+ private class EvaluatableAttributeSegment extends EvaluatableSegment {
+
+ public EvaluatableAttributeSegment(String name, String expression) throws JaxenException {
+ super(name, expression);
+ }
+
+ @Override
+ public String evaluate(OMElement message) throws JaxenException {
+ String value = "";
+
+ OMAttribute attribute = (OMAttribute)xpathProcessor.selectSingleNode(message);
+ if (attribute != null) {
+ value = attribute.getAttributeValue();
+ }
+
+ return value;
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPSender defines an interface that should be implemented by a message sender that
+ * sends messages to one or more clients that receive messages selectively, based on a
+ * routing key. The routing key is formed by a set of fields in the message.
+ */
+public interface AMQPSender {
+
+ /**
+ * Send a message.
+ *
+ * @param message Message to be delivered.
+ * @throws AMQPException on error.
+ */
+ public void Send(OMElement message) throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPTopicReceiver defines an interface that should be implemented by a message receiver that
+ * receives messages from a topic. A message would only get delivered to a topic subscriber
+ * if and only if the routing key of message satisfies the topic.
+ */
+public interface AMQPTopicReceiver {
+
+ /**
+ * Subscribe to a topic.
+ *
+ * @param topic Topic that needs to be subscribed to.
+ * @throws AMQPException on error.
+ */
+ public void Subscribe(AMQPRoutingKey topic) throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPTopicSender defines an interface that should be implemented by a message sender that
+ * sends messages to one or more consumers that have subscribed to topics. A message
+ * would only be delivered to a topic subscriber if and only if the routing key of message
+ * satisfies the topic.
+ */
+public interface AMQPTopicSender {
+
+ /**
+ * Send a message.
+ *
+ * @param message Message to be delivered.
+ * @throws AMQPException on error.
+ */
+ public void Send(OMElement message) throws AMQPException;
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.File;
+import java.net.URL;
+
+/**
+ * AMQPUtil provides common utilities required for the AMQP transport implementation.
+ */
+public class AMQPUtil {
+
+ public static final String CONFIG_AMQP_ENABLE = "amqp.notification.enable";
+
+ public static final String CONFIG_AMQP_PROVIDER_HOST = "amqp.broker.host";
+ public static final String CONFIG_AMQP_PROVIDER_PORT = "amqp.broker.port";
+ public static final String CONFIG_AMQP_PROVIDER_USERNAME = "amqp.broker.username";
+ public static final String CONFIG_AMQP_PROVIDER_PASSWORD = "amqp.broker.password";
+
+ public static final String CONFIG_AMQP_SENDER = "amqp.sender";
+ public static final String CONFIG_AMQP_TOPIC_SENDER = "amqp.topic.sender";
+ public static final String CONFIG_AMQP_BROADCAST_SENDER = "amqp.broadcast.sender";
+
+ public static final String EXCHANGE_NAME_DIRECT = "ws-messenger-direct";
+ public static final String EXCHANGE_TYPE_DIRECT = "direct";
+ public static final String EXCHANGE_NAME_TOPIC = "ws-messenger-topic";
+ public static final String EXCHANGE_TYPE_TOPIC = "topic";
+ public static final String EXCHANGE_NAME_FANOUT = "ws-messenger-fanout";
+ public static final String EXCHANGE_TYPE_FANOUT = "fanout";
+
+ private static final String ROUTING_KEY_FILENAME = "amqp-routing-keys.xml";
+
+ /**
+ * Load routing keys from configuration file.
+ *
+ * @return Root element of routing key object model.
+ * @throws AMQPException on error.
+ */
+ public static Element loadRoutingKeys() throws AMQPException {
+ try {
+ URL resource = AMQPUtil.class.getClassLoader().getResource(ROUTING_KEY_FILENAME);
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+ Document document = docBuilder.parse(new File(resource.getPath()));
+
+ return document.getDocumentElement();
+ } catch (Exception e) {
+ throw new AMQPException(e);
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPBroadcastReceiverImpl class provides functionality to consume a broadcast message feed.
+ */
+public class AMQPBroadcastReceiverImpl extends AMQPClient implements AMQPBroadcastReceiver {
+ private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastReceiverImpl.class);
+
+ private AMQPCallback callback = null;
+
+ public AMQPBroadcastReceiverImpl(Properties properties, AMQPCallback callback) {
+ super(properties);
+
+ this.callback = callback;
+ }
+
+ public void Subscribe() throws AMQPException {
+ if (callback != null) {
+ try {
+ Connection connection = connectionFactory.newConnection();
+
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
+
+ String queueName = channel.queueDeclare().getQueue();
+ channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_FANOUT, "");
+
+ QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queueName, true, consumer);
+
+ while (true) {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+ String message = new String(delivery.getBody());
+
+ callback.onMessage(message);
+ }
+ } catch (Exception e) {
+ throw new AMQPException(e);
+ }
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPBroadcastSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * AMQPBroadcastSenderImpl provides functionality to produce a broadcast message feed.
+ */
+public class AMQPBroadcastSenderImpl extends AMQPRoutingAwareClient implements AMQPBroadcastSender {
+ private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastSenderImpl.class);
+
+ public AMQPBroadcastSenderImpl(Properties properties) {
+ super(properties);
+ }
+
+ public void Send(OMElement message) throws AMQPException {
+ try {
+ if (isRoutable(message)) {
+ Connection connection = connectionFactory.newConnection();
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
+
+ channel.basicPublish(AMQPUtil.EXCHANGE_NAME_FANOUT, "", null, message.toString().getBytes());
+
+ channel.close();
+ connection.close();
+ }
+ } catch (IOException e) {
+ throw new AMQPException(e);
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPReceiverImpl class provides functionality to receive messages selectively based on a unique routing key.
+ */
+public class AMQPReceiverImpl extends AMQPRoutingAwareClient implements AMQPReceiver {
+ private static final Logger log = LoggerFactory.getLogger(AMQPReceiverImpl.class);
+
+ private AMQPCallback callback = null;
+
+ public AMQPReceiverImpl(Properties properties, AMQPCallback callback) {
+ super(properties);
+
+ this.callback = callback;
+ }
+
+ public void Subscribe(AMQPRoutingKey key) throws AMQPException {
+ if (callback != null) {
+ try {
+ Connection connection = connectionFactory.newConnection();
+
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
+
+ String queueName = channel.queueDeclare().getQueue();
+ channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_DIRECT, key.getNativeKey());
+
+ QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queueName, true, consumer);
+
+ while (true) {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+ String message = new String(delivery.getBody());
+
+ callback.onMessage(message);
+ }
+ } catch (Exception e) {
+ throw new AMQPException(e);
+ }
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPSenderImpl class provides functionality to send messages with a unique routing key
+ * so that a receiver can consume them selectively.
+ */
+public class AMQPSenderImpl extends AMQPRoutingAwareClient implements AMQPSender {
+ private static final Logger log = LoggerFactory.getLogger(AMQPSenderImpl.class);
+
+ public AMQPSenderImpl(Properties properties) {
+ super(properties);
+ }
+
+ public void Send(OMElement message) throws AMQPException {
+ try {
+ if (isRoutable(message)) {
+ Connection connection = connectionFactory.newConnection();
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
+
+ List<String> routingKeys = new ArrayList<String>();
+ getRoutingKeys(message, routingKeys);
+
+ for (String routingKey : routingKeys) {
+ channel.basicPublish(
+ AMQPUtil.EXCHANGE_NAME_DIRECT, routingKey, null, message.toString().getBytes());
+ }
+
+ channel.close();
+ connection.close();
+ }
+ } catch (IOException e) {
+ throw new AMQPException(e);
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPTopicReceiver class provides functionality to receive messages based on a pattern.
+ * These patterns are also called Topics.
+ */
+public class AMQPTopicReceiverImpl extends AMQPRoutingAwareClient implements AMQPTopicReceiver {
+ private static final Logger log = LoggerFactory.getLogger(AMQPTopicReceiverImpl.class);
+
+ private AMQPCallback callback = null;
+
+ public AMQPTopicReceiverImpl(Properties properties, AMQPCallback callback) {
+ super(properties);
+
+ this.callback = callback;
+ }
+
+ public void Subscribe(AMQPRoutingKey topic) throws AMQPException {
+ if (callback != null) {
+ try {
+ Connection connection = connectionFactory.newConnection();
+
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
+
+ String queueName = channel.queueDeclare().getQueue();
+ channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_TOPIC, topic.getNativeKey());
+
+ QueueingConsumer consumer = new QueueingConsumer(channel);
+ channel.basicConsume(queueName, true, consumer);
+
+ while (true) {
+ QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+ String message = new String(delivery.getBody());
+
+ callback.onMessage(message);
+ }
+ } catch (Exception e) {
+ throw new AMQPException(e);
+ }
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPTopicSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPTopicSenderImpl class provides functionality to send messages that can be consumed
+ * based on a pattern. These patterns are also called Topics.
+ */
+public class AMQPTopicSenderImpl extends AMQPRoutingAwareClient implements AMQPTopicSender {
+ private static final Logger log = LoggerFactory.getLogger(AMQPTopicSenderImpl.class);
+
+ public AMQPTopicSenderImpl(Properties properties) {
+ super(properties);
+ }
+
+ public void Send(OMElement message) throws AMQPException {
+ try {
+ if (isRoutable(message)) {
+ Connection connection = connectionFactory.newConnection();
+ Channel channel = connection.createChannel();
+ channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
+
+ List<String> routingKeys = new ArrayList<String>();
+ getRoutingKeys(message, routingKeys);
+
+ for (String routingKey : routingKeys) {
+ channel.basicPublish(
+ AMQPUtil.EXCHANGE_NAME_TOPIC, routingKey, null, message.toString().getBytes());
+ }
+
+ channel.close();
+ connection.close();
+ }
+ } catch (IOException e) {
+ throw new AMQPException(e);
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml Sat Jul 20 15:39:17 2013
@@ -0,0 +1,40 @@
+<?xml version='1.0' encoding='utf-8' ?>
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+ the Apache License, Version 2.0 (theÃÂ "License"); you may not use this file except in compliance with the License. You may
+ obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+ in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+ ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+ the License. -->
+<routingKeys>
+ <!-- Routing keys for "workflowInvoked" event -->
+ <event name="workflowInvoked">
+ <key name="primaryKey">
+ <segment name="serviceID">notificationSource/@serviceID</segment>
+ </key>
+ </event>
+ <!-- Routing keys for "invokingService" event -->
+ <event name="invokingService">
+ <key name="primaryKey">
+ <segment name="serviceID">notificationSource/@serviceID</segment>
+ </key>
+ </event>
+ <!-- Routing keys for "receivedResult" event -->
+ <event name="receivedResult">
+ <key name="primaryKey">
+ <segment name="serviceID">notificationSource/@serviceID</segment>
+ </key>
+ </event>
+ <!-- Routing keys for "sendingResult" event -->
+ <event name="sendingResult">
+ <key name="primaryKey">
+ <segment name="serviceID">notificationSource/@serviceID</segment>
+ </key>
+ </event>
+ <!-- Routing keys for "workflowTerminated" event -->
+ <event name="workflowTerminated">
+ <key name="primaryKey">
+ <segment name="serviceID">notificationSource/@serviceID</segment>
+ </key>
+ </event>
+</routingKeys>
\ No newline at end of file
Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastReceiverImpl;
+import java.util.Properties;
+
+public class BroadcastSubscriber {
+ public static void main(String args[]) throws AMQPException {
+ String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+ String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+ String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+ String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+ Properties properties = new Properties();
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+ MessageConsumer consumer = new MessageConsumer();
+ AMQPBroadcastReceiver receiver = new AMQPBroadcastReceiverImpl(properties, consumer);
+ System.out.println("Waiting for broadcast messages : \n");
+
+ receiver.Subscribe();
+ }
+
+ public static class MessageConsumer implements AMQPCallback {
+ public void onMessage(String message) {
+ System.out.println("Received : " + message);
+ }
+ }
+}
Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicReceiverImpl;
+import java.util.Properties;
+
+public class TopicSubscriber {
+ public static void main(String args[]) throws AMQPException {
+ String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+ String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+ String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+ String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+ Properties properties = new Properties();
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+ MessageConsumer consumer = new MessageConsumer();
+ AMQPTopicReceiver receiver = new AMQPTopicReceiverImpl(properties, consumer);
+ System.out.println("Waiting for topic messages : \n");
+
+ AMQPRoutingKey key = new AMQPRoutingKey("workflowTerminated", "");
+ receiver.Subscribe(key);
+ }
+
+ public static class MessageConsumer implements AMQPCallback {
+ public void onMessage(String message) {
+ System.out.println("Received : " + message);
+ }
+ }
+}
Modified: airavata/trunk/modules/ws-messenger/messagebroker/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/pom.xml (original)
+++ airavata/trunk/modules/ws-messenger/messagebroker/pom.xml Sat Jul 20 15:39:17 2013
@@ -62,6 +62,11 @@
<artifactId>airavata-common-utils</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.airavata</groupId>
+ <artifactId>airavata-messenger-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>edu.berkeley</groupId>
@@ -100,12 +105,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.airavata</groupId>
- <artifactId>airavata-messenger-client</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
</dependencies>
<properties>
Modified: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java (original)
+++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Sat Jul 20 15:39:17 2013
@@ -21,13 +21,12 @@
package org.apache.airavata.wsmg.broker;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
+import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
import org.apache.airavata.wsmg.broker.context.ContextParameters;
import org.apache.airavata.wsmg.broker.context.ProcessingContext;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
@@ -44,6 +43,7 @@ import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMException;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.xpath.AXIOMXPath;
import org.apache.axis2.AxisFault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +61,11 @@ public class NotificationProcessor {
private OutGoingQueue outgoingQueue;
+ private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();
+
public NotificationProcessor(WsmgConfigurationContext config) {
init(config);
+ amqpNotificationProcessor.init();
}
private void init(WsmgConfigurationContext config) {
@@ -91,6 +94,8 @@ public class NotificationProcessor {
.getSoapAction(), ctx.getMessageContext().getMessageID());
additionalMessageContent.setTrackId(trackId);
+ handleExtendedNotifications(ctx, protocolNs);
+
if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
onWSNTMsg(ctx, additionalMessageContent);
@@ -98,9 +103,8 @@ public class NotificationProcessor {
} else { // WSE Notifications No specific namespace
onWSEMsg(ctx, trackId, additionalMessageContent);
- setResponseMsg(ctx, trackId, protocolNs);
+ setResponseMsg(ctx, trackId, protocolNs);
}
-
}
/**
@@ -302,4 +306,8 @@ public class NotificationProcessor {
logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
}
+ private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
+ // AMQP
+ amqpNotificationProcessor.notify(ctx, protocolNs);
+ }
}
Added: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java (added)
+++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+
+import javax.xml.namespace.QName;
+
+/**
+ * AMQPNotificationProcessor handles AMQP-specific notification processing.
+ */
+public class AMQPNotificationProcessor {
+
+ private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class);
+
+ private boolean amqpEnabled = false;
+ private AMQPSender amqpSender = null;
+ private AMQPTopicSender amqpTopicSender = null;
+ private AMQPBroadcastSender amqpBroadcastSender = null;
+
+ public void init() {
+ String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, "");
+ if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) {
+ try {
+ String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+ String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+ String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+ String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+ Properties properties = new Properties();
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+ properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+ String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, "");
+ Class clazz = Class.forName(className);
+ amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+ className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, "");
+ clazz = Class.forName(className);
+ amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+ className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, "");
+ clazz = Class.forName(className);
+ amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+ Element routingKeys = AMQPUtil.loadRoutingKeys();
+ if (routingKeys != null) {
+ ((AMQPRoutingAwareClient)amqpSender).init(routingKeys);
+ ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys);
+ ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys);
+ }
+
+ amqpEnabled = true;
+ } catch (Exception ex) {
+ logger.error(ex.getMessage());
+ }
+ }
+ }
+
+ public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
+ if (amqpEnabled) {
+ // Extract messages
+ List<OMElement> messages = new ArrayList<OMElement>();
+ if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
+ // WSNT
+ OMElement messageElements = ctx.getSoapBody().getFirstElement();
+ for (Iterator<OMElement> ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) {
+ try {
+ OMElement messageElement = ite.next();
+ OMElement message = messageElement.getFirstChildWithName(
+ new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
+ messages.add(message);
+ } catch (NullPointerException e) {
+ throw new OMException(e);
+ }
+ }
+ } else {
+ // WSE
+ OMElement message = ctx.getSoapBody().getFirstElement();
+ if (message != null) {
+ messages.add(message);
+ }
+ }
+
+ // Dispatch messages
+ try {
+ for (OMElement message : messages) {
+ amqpBroadcastSender.Send(message);
+ amqpTopicSender.Send(message);
+ amqpSender.Send(message);
+ }
+ } catch (AMQPException e) {
+ logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]");
+ }
+ }
+ }
+}