You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2013/07/20 17:39:18 UTC

svn commit: r1505154 - in /airavata/trunk/modules: distribution/airavata-server/src/main/resources/conf/ ws-messenger/client/ ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/ ws-messenger/client/src/main/java/org/apache/airavata/...

Author: smarru
Date: Sat Jul 20 15:39:17 2013
New Revision: 1505154

URL: http://svn.apache.org/r1505154
Log:
applying AMQP patches, thanks Danushka for the contribution

Added:
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
    airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml
    airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/
    airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java
    airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java
    airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/
    airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
Modified:
    airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
    airavata/trunk/modules/ws-messenger/client/pom.xml
    airavata/trunk/modules/ws-messenger/messagebroker/pom.xml
    airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java

Modified: airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties (original)
+++ airavata/trunk/modules/distribution/airavata-server/src/main/resources/conf/airavata-server.properties Sat Jul 20 15:39:17 2013
@@ -216,6 +216,21 @@ messagePerservationIntervalMinutes=0
 class.registry.accessor=org.apache.airavata.persistance.registry.jpa.impl.AiravataJPARegistry
 #class.registry.accessor=org.apache.airavata.rest.client.RegistryClient
 
+###########################################################################
+# AMQP Notification Configuration
+###########################################################################
+amqp.notification.enable=1
+ 
+amqp.broker.host=localhost
+amqp.broker.port=5672
+amqp.broker.username=guest
+amqp.broker.password=guest
+ 
+amqp.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPSenderImpl
+amqp.topic.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicSenderImpl
+amqp.broadcast.sender=org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastSenderImpl
+
+
 ###---------------------------Computational Middleware Configurations---------------------------###
 
 #enable.application.job.status.history=true

Modified: airavata/trunk/modules/ws-messenger/client/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/pom.xml (original)
+++ airavata/trunk/modules/ws-messenger/client/pom.xml Sat Jul 20 15:39:17 2013
@@ -48,6 +48,13 @@
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>
+    	</dependency>
+	
+	<!-- RabbitMQ client -->
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>3.1.2</version>
         </dependency>
     </dependencies>
 </project>

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPBroadcastReceiver defines an interface that should be implemented by a message receiver that
+ * receives broadcast messages from the broker.
+ */
+public interface AMQPBroadcastReceiver {
+
+    /**
+     * Subscribe to the broadcast channel.
+     *
+     * @throws AMQPException
+     */
+    public void Subscribe() throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPBroadcastSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPBroadcastSender defines an interface that should be implemented by a message sender that
+ * sends messages that can be consumed by any downstream client, irrespective of message routing key.
+ */
+public interface AMQPBroadcastSender {
+
+    /**
+     * Send a message.
+     *
+     * @param message Message to be delivered.
+     * @throws AMQPException
+     */
+    public void Send(OMElement message) throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPCallback.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+public interface AMQPCallback {
+
+    /**
+     * Gets called when a message is available on the receiver.
+     *
+     * @param message Message that is available.
+     */
+    public void onMessage(String message);
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPClient.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPClient class takes care of establishing/terminating connections with the broker.
+ */
+public class AMQPClient {
+    protected final ConnectionFactory connectionFactory = new ConnectionFactory();
+
+    /**
+     * Create an instance of client.
+     *
+     * @param properties Connection properties.
+     */
+    public AMQPClient(Properties properties) {
+        connectionFactory.setHost(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST));
+        String port = properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT);
+        connectionFactory.setPort((port == null) ? 5672 : Integer.parseInt(port));
+        connectionFactory.setUsername(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME));
+        connectionFactory.setPassword(properties.getProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD));
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPException.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPException is an extension for AMQP-specific exception handling.
+ */
+public class AMQPException extends  Exception {
+    
+    public AMQPException() {
+        super();
+    }
+    
+    public AMQPException(String message) {
+        super(message);
+    }
+
+    public AMQPException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public AMQPException(Throwable cause) {
+        super(cause);
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPReceiver defines an interface that should be implemented by a message receiver that
+ * receives messages selectively based on a unique routing key. A message would only get delivered
+ * to a subscriber if and only if the routing key of message satisfies the subscription key.
+ */
+public interface AMQPReceiver {
+
+    /**
+     * Subscribe to a channel.
+     *
+     * @param key Key that defines the channel binging.
+     * @throws AMQPException
+     */
+    public void Subscribe(AMQPRoutingKey key) throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingAwareClient.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+import org.jaxen.JaxenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPRoutingAwareClient class takes care of handling routing keys so that a derived class
+ * can only have the logic for sending/receiving messages based on its intended message flow pattern.
+ */
+public class AMQPRoutingAwareClient extends AMQPClient {
+    private static final Logger log = LoggerFactory.getLogger(AMQPClient.class);
+    
+    private static final String ELEMENT_EVENT = "event";
+    private static final String ELEMENT_KEY = "key";
+    private static final String ELEMENT_SEGMENT = "segment";
+    private static final String ATTRIBUTE_NAME = "name";
+
+    private HashMap<String, HashMap<String, AMQPRoutingKey>> eventRoutingKeys = new HashMap<String, HashMap<String, AMQPRoutingKey>>();
+
+    /**
+     * Create an instance of client.
+     *
+     * @param properties Connection properties.
+     */
+    public AMQPRoutingAwareClient(Properties properties) {
+        super(properties);
+    }
+
+    /**
+     * Initialize the client.
+     *
+     * @param routingKeys Routing key configuration.
+     * @throws AMQPException on error.
+     */
+    public void init(Element routingKeys) throws AMQPException {
+        if (routingKeys != null) {
+            NodeList events = routingKeys.getElementsByTagName(ELEMENT_EVENT);
+            for (int i = 0; i < events.getLength(); i++) {
+                // event
+                Element event = (Element)(events.item(i));
+                String eventName = event.getAttribute(ATTRIBUTE_NAME).trim();
+                if ((eventName == null) || (eventName.isEmpty()) || eventRoutingKeys.containsKey(eventName)) {
+                    continue;
+                }
+
+                HashMap<String, AMQPRoutingKey> eventKeys = new HashMap<String, AMQPRoutingKey>();
+                eventRoutingKeys.put(eventName, eventKeys);
+
+                // keys
+                NodeList keys = event.getElementsByTagName(ELEMENT_KEY);
+                for (int j = 0; j < keys.getLength(); j++) {
+                    Element key = (Element)(keys.item(j));
+                    String keyName = key.getAttribute(ATTRIBUTE_NAME).trim();
+                    if ((keyName == null) || (keyName.isEmpty()) || eventKeys.containsKey(keyName)) {
+                        continue;
+                    }
+
+                    AMQPRoutingKey routingKey = new AMQPRoutingKey(eventName, keyName);
+                    eventKeys.put(keyName, routingKey);
+
+                    // segments
+                    NodeList segments = key.getElementsByTagName(ELEMENT_SEGMENT);
+                    for (int k = 0; k < segments.getLength(); k++) {
+                        Element segment = (Element)(segments.item(k));
+                        String segmentName = segment.getAttribute(ATTRIBUTE_NAME).trim();
+                        if ((segmentName == null) || (segmentName.isEmpty()) || routingKey.containsSegment(segmentName)) {
+                            continue;
+                        }
+
+                        String segmentExpression = segment.getTextContent().trim();
+                        if (-1 != segmentExpression.indexOf('@')) {
+                            // Attribute
+                            routingKey.addEvaluatableAttributeSegment(segmentName, segmentExpression);
+                        } else {
+                            // Element
+                            routingKey.addEvaluatableElementSegment(segmentName, segmentExpression);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Initialize client. Load routing configuration on its own.
+     *
+     * @throws AMQPException on error.
+     */
+    public void init() throws AMQPException {
+        init(AMQPUtil.loadRoutingKeys());
+    }
+
+    /**
+     * Check if a given message is routable as per routing configuration.
+     *
+     * @param message Message to be routed.
+     * @return true if routable or false otherwise.
+     */
+    protected boolean isRoutable(OMElement message) {
+        return eventRoutingKeys.containsKey(message.getLocalName());
+    }
+
+    /**
+     * Evaluate the set of native routing keys for a given message.
+     *
+     * @param message Message for which the routing keys are required.
+     * @param routingKeys Possible set of routing keys.
+     */
+    protected void getRoutingKeys(OMElement message, List<String> routingKeys) {
+        HashMap<String, AMQPRoutingKey> eventKeys = eventRoutingKeys.get(message.getLocalName());
+        if (eventKeys != null) {
+
+            for (AMQPRoutingKey eventKey : eventKeys.values()) {
+                try {
+                    String routingKey = eventKey.evaluate(message);
+                    if (!routingKey.isEmpty()) {
+                        routingKeys.add(routingKey);
+                    }
+                } catch (JaxenException e) {
+                    // Do nothing. The erroneous key will be ignored.
+                }
+            }
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPRoutingKey.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,369 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.xpath.AXIOMXPath;
+import org.jaxen.JaxenException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * AMQPRoutingKey represents an AMQP routing key. A key would consist of one or more segments where
+ * a segment would be an element or an attribute of an event.
+ */
+public class AMQPRoutingKey {
+    private static final Logger log = LoggerFactory.getLogger(AMQPRoutingKey.class);
+
+    private String eventName = "";
+    private String keyName = "";
+    private List<Segment> segments = new ArrayList<Segment>();
+    
+    public AMQPRoutingKey(String eventName, String keyName) {
+        this.eventName = eventName;
+        this.keyName = keyName;
+    }
+
+    /**
+     * Get associated event name.
+     *
+     * @return Event name.
+     */
+    public String getEventName() {
+        return eventName;
+    }
+
+    /**
+     * Set associated event name.
+     *
+     * @param eventName Event name.
+     */
+    public void setEventName(String eventName) {
+        this.eventName = eventName;
+    }
+
+    /**
+     * Get name of key.
+     *
+     * @return Key name.
+     */
+    public String getKeyName() {
+        return keyName;
+    }
+
+    /**
+     * Set name of key.
+     *
+     * @param keyName Key name.
+     */
+    public void setKeyName(String keyName) {
+        this.keyName = keyName;
+    }
+
+    /**
+     * Check if a segment already exists.
+     *
+     * @param name Name of the segment.
+     * @return true if exists or false otherwise.
+     */
+    boolean containsSegment(String name) {
+        boolean found = false;
+
+        for (Segment segment : segments) {
+            if (segment.getName().equals(name)) {
+                found = true;
+                break;
+            }
+        }
+
+        return found;
+    }
+
+    /**
+     * Add a segment.
+     *
+     * @param name Name of the segment.
+     * @param value Value of the segment.
+     * @throws AMQPException on duplicate segment.
+     */
+    public void addSegment(String name, String value) throws AMQPException {
+        segments.add(new Segment(name, value));
+    }
+
+    /**
+     * Add an evaluatable element segment.
+     *
+     * @param name Name of the element.
+     * @param expression Expression that needs evaluating to retrieve the value of element.
+     * @throws AMQPException on duplicate element.
+     */
+    public void addEvaluatableElementSegment(String name, String expression) throws AMQPException {
+        try {
+            segments.add(new EvaluatableElementSegment(name, expression));
+        } catch (JaxenException e) {
+            throw new AMQPException(e);
+        }
+    }
+
+    /**
+     * Add an evaluatable attribute segment.
+     *
+     * @param name Name of the attribute.
+     * @param expression Expression that needs evaluating to retrieve the value of attribute.
+     * @throws AMQPException on duplicate attribute.
+     */
+    public void addEvaluatableAttributeSegment(String name, String expression) throws AMQPException {
+        try {
+            segments.add(new EvaluatableAttributeSegment(name, expression));
+        } catch (JaxenException e) {
+            throw new AMQPException(e);
+        }
+    }
+
+    /**
+     * Generate native AMQP key using the segments.
+     *
+     * @return Routing key in native(AMQP) format.
+     */
+    public String getNativeKey() {
+        String routingKey = !eventName.isEmpty() ? eventName : "*";
+
+        boolean segmentsGiven = false;
+        for (Segment segment : segments) {
+
+            String segmentValue = segment.getValue().trim();
+            if (!segmentValue.isEmpty()) {
+                routingKey += ".";
+                routingKey += segment.getValue();
+
+                segmentsGiven = true;
+            }
+        }
+
+        if (!segmentsGiven) {
+            routingKey += ".";
+            routingKey += "#";
+        }
+
+        return routingKey;
+    }
+
+    /**
+     * Evaluate the routing key for a given message.
+     *
+     * @param message Message for which the routing key is required.
+     * @return Routing key.
+     * @throws JaxenException on expression evaluation error.
+     */
+    public String evaluate(OMElement message) throws JaxenException {
+        String routingKey = eventName;
+
+        for (Segment segment : segments) {
+
+            if (segment instanceof EvaluatableSegment) {
+                routingKey += ".";
+                routingKey += ((EvaluatableSegment)segment).evaluate(message);
+            }
+        }
+
+        return routingKey;
+    }
+
+    /**
+     * Segment provides a base implementation for segments. This class could be extended
+     * by a particular type of segment(element/attribute) based on its specific requirements.
+     */
+    private class Segment {
+
+        private String name = "";
+        private String value = "";
+
+        /**
+         * Create an instance of segment.
+         *
+         * @param name Name of segment.
+         */
+        public Segment(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Create an instance of segment.
+         *
+         * @param name Name of segment.
+         * @param value Value of segment.
+         */
+        public Segment(String name, String value) {
+            this.name = name;
+            this.value = value;
+        }
+
+        /**
+         * Get name of segment.
+         *
+         * @return  Name.
+         */
+        public String getName() {
+            return name;
+        }
+
+        /**
+         * Set name of segment.
+         *
+         * @param name Name to be set.
+         */
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        /**
+         * Get value of segment.
+         *
+         * @return  Value.
+         */
+        public String getValue() {
+            return value;
+        }
+
+        /**
+         * Set value of segment.
+         *
+         * @param value Value to be set.
+         */
+        public void setValue(String value) {
+            this.value = value;
+        }
+    }
+
+    /**
+     * EvaluatableSegment provides a base implementation for segments that are evaluated on the fly
+     * based on an incoming event. This class could be extended by a particular type of segment(element/attribute)
+     * based on its specific requirements.
+     */
+    private abstract class EvaluatableSegment extends Segment {
+
+        private static final String NAMESPACE_PREFIX = "ns";
+        private static final String NAMESPACE_URL = "http://airavata.apache.org/schemas/wft/2011/08";
+
+        protected AXIOMXPath xpathProcessor = null;
+
+        /**
+         * Create an instance of EvaluatableSegment.
+         *
+         * @param name Name of segment.
+         * @param expression Expression that needs evaluating to retrieve the value of segment.
+         * @throws JaxenException on expression evalution error.
+         */
+        protected EvaluatableSegment(String name, String expression) throws JaxenException {
+            super(name);
+
+            xpathProcessor = new AXIOMXPath(getNormalizedExpression(expression));
+            xpathProcessor.addNamespace(NAMESPACE_PREFIX, NAMESPACE_URL);
+        }
+
+        /**
+         * Normalize an expression to include namespace.
+         *
+         * @param expression Expression to be normalized.
+         * @return Normalized expression.
+         */
+        private String getNormalizedExpression(String expression) {
+            try {
+                StringBuffer normalizedExpression = new StringBuffer();
+                normalizedExpression.append(NAMESPACE_PREFIX);
+                normalizedExpression.append(":");
+
+                expression = expression.trim();
+                for (int i = 0; i < expression.length(); i++) {
+                    char c = expression.charAt(i);
+
+                    normalizedExpression.append(c);
+                    if (((c == '/') && (expression.charAt(i + 1) != '@')) || (c == '@')) {
+                        normalizedExpression.append(NAMESPACE_PREFIX);
+                        normalizedExpression.append(":");
+                    }
+                }
+
+                return normalizedExpression.toString();
+            } catch (ArrayIndexOutOfBoundsException e) {
+                return "";
+            }
+        }
+
+        /**
+         * Returns value of segment.
+         *
+         * @param message Message from which the data is extracted.
+         * @return Value of segment.
+         * @throws JaxenException on error.
+         */
+        public abstract String evaluate(OMElement message) throws JaxenException;
+    }
+
+    /**
+     * EvaluatableElementSegment is the EvaluatableSegment extension for event elements.
+     */
+    private class EvaluatableElementSegment extends EvaluatableSegment {
+
+        public EvaluatableElementSegment(String name, String expression) throws JaxenException {
+            super(name, expression);
+        }
+
+        @Override
+        public String evaluate(OMElement message) throws JaxenException {
+            String value = "";
+            
+            OMElement element = (OMElement)xpathProcessor.selectSingleNode(message);
+            if (element != null) {
+                value = element.getText();
+            }
+
+            return value;
+        }
+    }
+
+    /**
+     * EvaluatableAttributeSegment is the EvaluatableSegment extension for event attributes.
+     */
+    private class EvaluatableAttributeSegment extends EvaluatableSegment {
+
+        public EvaluatableAttributeSegment(String name, String expression) throws JaxenException {
+            super(name, expression);
+        }
+
+        @Override
+        public String evaluate(OMElement message) throws JaxenException {
+            String value = "";
+            
+            OMAttribute attribute = (OMAttribute)xpathProcessor.selectSingleNode(message);
+            if (attribute != null) {
+                value = attribute.getAttributeValue();
+            }
+            
+            return value;
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPSender defines an interface that should be implemented by a message sender that
+ * sends messages to one or more clients that receive messages selectively, based on a
+ * routing key. The routing key is formed by a set of fields in the message.
+ */
+public interface AMQPSender {
+
+    /**
+     * Send a message.
+     *
+     * @param message Message to be delivered.
+     * @throws AMQPException on error.
+     */
+    public void Send(OMElement message) throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicReceiver.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+/**
+ * AMQPTopicReceiver defines an interface that should be implemented by a message receiver that
+ * receives messages from a topic. A message would only get delivered to a topic subscriber
+ * if and only if the routing key of message satisfies the topic.
+ */
+public interface AMQPTopicReceiver {
+
+    /**
+     * Subscribe to a topic.
+     *
+     * @param topic Topic that needs to be subscribed to.
+     * @throws AMQPException on error.
+     */
+    public void Subscribe(AMQPRoutingKey topic) throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPTopicSender.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,41 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.axiom.om.OMElement;
+
+/**
+ * AMQPTopicSender defines an interface that should be implemented by a message sender that
+ * sends messages to one or more consumers that have subscribed to topics. A message
+ * would only be delivered to a topic subscriber if and only if the routing key of message
+ * satisfies the topic.
+ */
+public interface AMQPTopicSender {
+
+    /**
+     * Send a message.
+     *
+     * @param message Message to be delivered.
+     * @throws AMQPException on error.
+     */
+    public void Send(OMElement message) throws AMQPException;
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/AMQPUtil.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.File;
+import java.net.URL;
+
+/**
+ *  AMQPUtil provides common utilities required for the AMQP transport implementation.
+ */
+public class AMQPUtil {
+    
+    public static final String CONFIG_AMQP_ENABLE = "amqp.notification.enable";
+
+    public static final String CONFIG_AMQP_PROVIDER_HOST = "amqp.broker.host";
+    public static final String CONFIG_AMQP_PROVIDER_PORT = "amqp.broker.port";
+    public static final String CONFIG_AMQP_PROVIDER_USERNAME = "amqp.broker.username";
+    public static final String CONFIG_AMQP_PROVIDER_PASSWORD = "amqp.broker.password";
+
+    public static final String CONFIG_AMQP_SENDER = "amqp.sender";
+    public static final String CONFIG_AMQP_TOPIC_SENDER = "amqp.topic.sender";
+    public static final String CONFIG_AMQP_BROADCAST_SENDER = "amqp.broadcast.sender";
+
+    public static final String EXCHANGE_NAME_DIRECT = "ws-messenger-direct";
+    public static final String EXCHANGE_TYPE_DIRECT = "direct";
+    public static final String EXCHANGE_NAME_TOPIC = "ws-messenger-topic";
+    public static final String EXCHANGE_TYPE_TOPIC = "topic";
+    public static final String EXCHANGE_NAME_FANOUT = "ws-messenger-fanout";
+    public static final String EXCHANGE_TYPE_FANOUT = "fanout";
+
+    private static final String ROUTING_KEY_FILENAME = "amqp-routing-keys.xml";
+
+    /**
+     * Load routing keys from configuration file.
+     *
+     * @return Root element of routing key object model.
+     * @throws AMQPException on error.
+     */
+    public static Element loadRoutingKeys() throws AMQPException {
+        try {
+            URL resource = AMQPUtil.class.getClassLoader().getResource(ROUTING_KEY_FILENAME);
+            DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+            DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+            Document document = docBuilder.parse(new File(resource.getPath()));
+
+            return document.getDocumentElement();
+        } catch (Exception e) {
+            throw new AMQPException(e);
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPBroadcastReceiverImpl class provides functionality to consume a broadcast message feed.
+ */
+public class AMQPBroadcastReceiverImpl extends AMQPClient implements AMQPBroadcastReceiver {
+    private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastReceiverImpl.class);
+    
+    private AMQPCallback callback = null;
+
+    public AMQPBroadcastReceiverImpl(Properties properties, AMQPCallback callback) {
+        super(properties);
+
+        this.callback = callback;
+    }
+
+    public void Subscribe() throws AMQPException {
+        if (callback != null) {
+            try {
+                Connection connection = connectionFactory.newConnection();
+
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
+
+                String queueName = channel.queueDeclare().getQueue();
+                channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_FANOUT, "");
+
+                QueueingConsumer consumer = new QueueingConsumer(channel);
+                channel.basicConsume(queueName, true, consumer);
+
+                while (true) {
+                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                    String message = new String(delivery.getBody());
+
+                    callback.onMessage(message);
+                }
+            } catch (Exception e) {
+                throw new AMQPException(e);
+            }
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPBroadcastSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPBroadcastSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * AMQPBroadcastSenderImpl provides functionality to produce a broadcast message feed.
+ */
+public class AMQPBroadcastSenderImpl extends AMQPRoutingAwareClient implements AMQPBroadcastSender {
+    private static final Logger log = LoggerFactory.getLogger(AMQPBroadcastSenderImpl.class);
+
+    public AMQPBroadcastSenderImpl(Properties properties) {
+        super(properties);
+    }
+
+    public void Send(OMElement message) throws AMQPException {
+        try {
+            if (isRoutable(message)) {
+                Connection connection = connectionFactory.newConnection();
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_FANOUT, AMQPUtil.EXCHANGE_TYPE_FANOUT);
+
+                channel.basicPublish(AMQPUtil.EXCHANGE_NAME_FANOUT, "", null, message.toString().getBytes());
+
+                channel.close();
+                connection.close();
+            }
+        } catch (IOException e) {
+            throw new AMQPException(e);
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPReceiverImpl class provides functionality to receive messages selectively based on a unique routing key.
+ */
+public class AMQPReceiverImpl extends AMQPRoutingAwareClient implements AMQPReceiver {
+    private static final Logger log = LoggerFactory.getLogger(AMQPReceiverImpl.class);
+
+    private AMQPCallback callback = null;
+
+    public AMQPReceiverImpl(Properties properties, AMQPCallback callback) {
+        super(properties);
+
+        this.callback = callback;
+    }
+
+    public void Subscribe(AMQPRoutingKey key) throws AMQPException {
+        if (callback != null) {
+            try {
+                Connection connection = connectionFactory.newConnection();
+
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
+
+                String queueName = channel.queueDeclare().getQueue();
+                channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_DIRECT, key.getNativeKey());
+
+                QueueingConsumer consumer = new QueueingConsumer(channel);
+                channel.basicConsume(queueName, true, consumer);
+
+                while (true) {
+                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                    String message = new String(delivery.getBody());
+
+                    callback.onMessage(message);
+                }
+            } catch (Exception e) {
+                throw new AMQPException(e);
+            }
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPSenderImpl class provides functionality to send messages with a unique routing key
+ * so that a receiver can consume them selectively.
+ */
+public class AMQPSenderImpl extends AMQPRoutingAwareClient implements AMQPSender {
+    private static final Logger log = LoggerFactory.getLogger(AMQPSenderImpl.class);
+
+    public AMQPSenderImpl(Properties properties) {
+        super(properties);
+    }
+
+    public void Send(OMElement message) throws AMQPException {
+        try {
+            if (isRoutable(message)) {
+                Connection connection = connectionFactory.newConnection();
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_DIRECT, AMQPUtil.EXCHANGE_TYPE_DIRECT);
+
+                List<String> routingKeys = new ArrayList<String>();
+                getRoutingKeys(message, routingKeys);
+
+                for (String routingKey : routingKeys) {
+                    channel.basicPublish(
+                            AMQPUtil.EXCHANGE_NAME_DIRECT, routingKey, null, message.toString().getBytes());
+                }
+                
+                channel.close();
+                connection.close();
+            }
+        } catch (IOException e) {
+            throw new AMQPException(e);
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicReceiverImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.QueueingConsumer;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+/**
+ * AMQPTopicReceiver class provides functionality to receive messages based on a pattern.
+ * These patterns are also called Topics.
+ */
+public class AMQPTopicReceiverImpl extends AMQPRoutingAwareClient implements AMQPTopicReceiver {
+    private static final Logger log = LoggerFactory.getLogger(AMQPTopicReceiverImpl.class);
+    
+    private AMQPCallback callback = null;
+
+    public AMQPTopicReceiverImpl(Properties properties, AMQPCallback callback) {
+        super(properties);
+        
+        this.callback = callback;
+    }
+
+    public void Subscribe(AMQPRoutingKey topic) throws AMQPException {
+        if (callback != null) {
+            try {
+                Connection connection = connectionFactory.newConnection();
+
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
+
+                String queueName = channel.queueDeclare().getQueue();
+                channel.queueBind(queueName, AMQPUtil.EXCHANGE_NAME_TOPIC, topic.getNativeKey());
+
+                QueueingConsumer consumer = new QueueingConsumer(channel);
+                channel.basicConsume(queueName, true, consumer);
+
+                while (true) {
+                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
+                    String message = new String(delivery.getBody());
+
+                    callback.onMessage(message);
+                }
+            } catch (Exception e) {
+                throw new AMQPException(e);
+            }
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/java/org/apache/airavata/wsmg/client/amqp/rabbitmq/AMQPTopicSenderImpl.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp.rabbitmq;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import org.apache.airavata.wsmg.client.amqp.AMQPException;
+import org.apache.airavata.wsmg.client.amqp.AMQPRoutingAwareClient;
+import org.apache.airavata.wsmg.client.amqp.AMQPTopicSender;
+import org.apache.airavata.wsmg.client.amqp.AMQPUtil;
+import org.apache.axiom.om.OMElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * AMQPTopicSenderImpl class provides functionality to send messages that can be consumed
+ * based on a pattern. These patterns are also called Topics.
+ */
+public class AMQPTopicSenderImpl extends AMQPRoutingAwareClient implements AMQPTopicSender {
+    private static final Logger log = LoggerFactory.getLogger(AMQPTopicSenderImpl.class);
+
+    public AMQPTopicSenderImpl(Properties properties) {
+        super(properties);
+    }
+
+    public void Send(OMElement message) throws AMQPException {
+        try {
+            if (isRoutable(message)) {
+                Connection connection = connectionFactory.newConnection();
+                Channel channel = connection.createChannel();
+                channel.exchangeDeclare(AMQPUtil.EXCHANGE_NAME_TOPIC, AMQPUtil.EXCHANGE_TYPE_TOPIC);
+
+                List<String> routingKeys = new ArrayList<String>();
+                getRoutingKeys(message, routingKeys);
+
+                for (String routingKey : routingKeys) {
+                    channel.basicPublish(
+                            AMQPUtil.EXCHANGE_NAME_TOPIC, routingKey, null, message.toString().getBytes());
+                }
+
+                channel.close();
+                connection.close();
+            }
+        } catch (IOException e) {
+            throw new AMQPException(e);
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml (added)
+++ airavata/trunk/modules/ws-messenger/client/src/main/resources/amqp-routing-keys.xml Sat Jul 20 15:39:17 2013
@@ -0,0 +1,40 @@
+<?xml version='1.0' encoding='utf-8' ?>
+<!--Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file
+	distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under
+	the Apache License, Version 2.0 (theÏ "License"); you may not use this file except in compliance with the License. You may
+	obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to
+	in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
+	ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under
+	the License. -->
+<routingKeys>
+    <!-- Routing keys for "workflowInvoked" event -->
+    <event name="workflowInvoked">
+        <key name="primaryKey">
+            <segment name="serviceID">notificationSource/@serviceID</segment>
+        </key>
+    </event>
+    <!-- Routing keys for "invokingService" event -->
+    <event name="invokingService">
+        <key name="primaryKey">
+            <segment name="serviceID">notificationSource/@serviceID</segment>
+        </key>
+    </event>
+    <!-- Routing keys for "receivedResult" event -->
+    <event name="receivedResult">
+        <key name="primaryKey">
+            <segment name="serviceID">notificationSource/@serviceID</segment>
+        </key>
+    </event>
+    <!-- Routing keys for "sendingResult" event -->
+    <event name="sendingResult">
+        <key name="primaryKey">
+            <segment name="serviceID">notificationSource/@serviceID</segment>
+        </key>
+    </event>
+    <!-- Routing keys for "workflowTerminated" event -->
+    <event name="workflowTerminated">
+        <key name="primaryKey">
+            <segment name="serviceID">notificationSource/@serviceID</segment>
+        </key>
+    </event>
+</routingKeys>
\ No newline at end of file

Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/BroadcastSubscriber.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPBroadcastReceiverImpl;
+import java.util.Properties;
+
+public class BroadcastSubscriber {
+    public static void main(String args[]) throws AMQPException {
+        String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+        String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+        String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+        String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+        Properties properties = new Properties();
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+        MessageConsumer consumer = new MessageConsumer();
+        AMQPBroadcastReceiver receiver = new AMQPBroadcastReceiverImpl(properties, consumer);
+        System.out.println("Waiting for broadcast messages : \n");
+
+        receiver.Subscribe();
+    }
+
+    public static class MessageConsumer implements AMQPCallback {
+        public void onMessage(String message) {
+            System.out.println("Received : " + message);
+        }
+    }
+}

Added: airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java (added)
+++ airavata/trunk/modules/ws-messenger/client/src/test/java/org/apache/airavata/wsmg/client/amqp/TopicSubscriber.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.client.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.rabbitmq.AMQPTopicReceiverImpl;
+import java.util.Properties;
+
+public class TopicSubscriber {
+    public static void main(String args[]) throws AMQPException {
+        String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+        String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+        String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+        String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+        Properties properties = new Properties();
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+        properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+        MessageConsumer consumer = new MessageConsumer();
+        AMQPTopicReceiver receiver = new AMQPTopicReceiverImpl(properties, consumer);
+        System.out.println("Waiting for topic messages : \n");
+
+        AMQPRoutingKey key = new AMQPRoutingKey("workflowTerminated", "");
+        receiver.Subscribe(key);
+    }
+
+    public static class MessageConsumer implements AMQPCallback {
+        public void onMessage(String message) {
+            System.out.println("Received : " + message);
+        }
+    }
+}

Modified: airavata/trunk/modules/ws-messenger/messagebroker/pom.xml
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/pom.xml?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/pom.xml (original)
+++ airavata/trunk/modules/ws-messenger/messagebroker/pom.xml Sat Jul 20 15:39:17 2013
@@ -62,6 +62,11 @@
             <artifactId>airavata-common-utils</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.airavata</groupId>
+            <artifactId>airavata-messenger-client</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         
         <dependency>
             <groupId>edu.berkeley</groupId>
@@ -100,12 +105,6 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.airavata</groupId>
-            <artifactId>airavata-messenger-client</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
 
     </dependencies>
     <properties>

Modified: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java?rev=1505154&r1=1505153&r2=1505154&view=diff
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java (original)
+++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/NotificationProcessor.java Sat Jul 20 15:39:17 2013
@@ -21,13 +21,12 @@
 
 package org.apache.airavata.wsmg.broker;
 
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 
 import javax.xml.namespace.QName;
 import javax.xml.stream.XMLStreamException;
 
+import org.apache.airavata.wsmg.broker.amqp.AMQPNotificationProcessor;
 import org.apache.airavata.wsmg.broker.context.ContextParameters;
 import org.apache.airavata.wsmg.broker.context.ProcessingContext;
 import org.apache.airavata.wsmg.commons.NameSpaceConstants;
@@ -44,6 +43,7 @@ import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMException;
 import org.apache.axiom.om.OMFactory;
 import org.apache.axiom.om.OMNamespace;
+import org.apache.axiom.om.xpath.AXIOMXPath;
 import org.apache.axis2.AxisFault;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +61,11 @@ public class NotificationProcessor {
 
     private OutGoingQueue outgoingQueue;
 
+    private AMQPNotificationProcessor amqpNotificationProcessor = new AMQPNotificationProcessor();
+
     public NotificationProcessor(WsmgConfigurationContext config) {
         init(config);
+        amqpNotificationProcessor.init();
     }
 
     private void init(WsmgConfigurationContext config) {
@@ -91,6 +94,8 @@ public class NotificationProcessor {
                 .getSoapAction(), ctx.getMessageContext().getMessageID());
         additionalMessageContent.setTrackId(trackId);
 
+        handleExtendedNotifications(ctx, protocolNs);
+
         if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
 
             onWSNTMsg(ctx, additionalMessageContent);
@@ -98,9 +103,8 @@ public class NotificationProcessor {
         } else { // WSE Notifications No specific namespace
 
             onWSEMsg(ctx, trackId, additionalMessageContent);
-            setResponseMsg(ctx, trackId, protocolNs);
+           setResponseMsg(ctx, trackId, protocolNs);
         }
-
     }
 
     /**
@@ -302,4 +306,8 @@ public class NotificationProcessor {
             logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
     }
 
+    private void handleExtendedNotifications(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
+        // AMQP
+        amqpNotificationProcessor.notify(ctx, protocolNs);
+    }
 }

Added: airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java?rev=1505154&view=auto
==============================================================================
--- airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java (added)
+++ airavata/trunk/modules/ws-messenger/messagebroker/src/main/java/org/apache/airavata/wsmg/broker/amqp/AMQPNotificationProcessor.java Sat Jul 20 15:39:17 2013
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.wsmg.broker.amqp;
+
+import org.apache.airavata.common.utils.ApplicationSettings;
+import org.apache.airavata.wsmg.client.amqp.*;
+import org.apache.airavata.wsmg.commons.NameSpaceConstants;
+import org.apache.airavata.wsmg.broker.context.ProcessingContext;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMException;
+import org.apache.axiom.om.OMNamespace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.w3c.dom.Element;
+
+import javax.xml.namespace.QName;
+
+/**
+ * AMQPNotificationProcessor handles AMQP-specific notification processing.
+ */
+public class AMQPNotificationProcessor {
+
+    private static final Logger logger = LoggerFactory.getLogger(AMQPNotificationProcessor.class);
+    
+    private boolean amqpEnabled = false;
+    private AMQPSender amqpSender = null;
+    private AMQPTopicSender amqpTopicSender = null;
+    private AMQPBroadcastSender amqpBroadcastSender = null;
+
+    public void init() {
+        String amqpEnabledAppSetting = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_ENABLE, "");
+        if (!amqpEnabledAppSetting.isEmpty() && (1 == Integer.parseInt(amqpEnabledAppSetting))) {
+            try {
+                String host = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, "localhost");
+                String port = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, "5672");
+                String username = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, "guest");
+                String password = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, "guest");
+
+                Properties properties = new Properties();
+                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_HOST, host);
+                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PORT, port);
+                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_USERNAME, username);
+                properties.setProperty(AMQPUtil.CONFIG_AMQP_PROVIDER_PASSWORD, password);
+
+                String className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_SENDER, "");
+                Class clazz = Class.forName(className);
+                amqpSender = (AMQPSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_TOPIC_SENDER, "");
+                clazz = Class.forName(className);
+                amqpTopicSender = (AMQPTopicSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+                className = ApplicationSettings.getSetting(AMQPUtil.CONFIG_AMQP_BROADCAST_SENDER, "");
+                clazz = Class.forName(className);
+                amqpBroadcastSender = (AMQPBroadcastSender)clazz.getDeclaredConstructor(Properties.class).newInstance(properties);
+
+                Element routingKeys = AMQPUtil.loadRoutingKeys();
+                if (routingKeys != null) {
+                    ((AMQPRoutingAwareClient)amqpSender).init(routingKeys);
+                    ((AMQPRoutingAwareClient)amqpTopicSender).init(routingKeys);
+                    ((AMQPRoutingAwareClient)amqpBroadcastSender).init(routingKeys);
+                }
+
+                amqpEnabled = true;
+            } catch (Exception ex) {
+                logger.error(ex.getMessage());
+            }
+        }
+    }
+
+    public void notify(ProcessingContext ctx, OMNamespace protocolNs) throws OMException {
+        if (amqpEnabled) {
+            // Extract messages
+            List<OMElement> messages = new ArrayList<OMElement>();
+            if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {
+                // WSNT
+                OMElement messageElements = ctx.getSoapBody().getFirstElement();
+                for (Iterator<OMElement> ite = messageElements.getChildrenWithLocalName("NotificationMessage"); ite.hasNext(); ) {
+                    try {
+                        OMElement messageElement = ite.next();
+                        OMElement message = messageElement.getFirstChildWithName(
+                                new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();
+                        messages.add(message);
+                    } catch (NullPointerException e) {
+                        throw new OMException(e);
+                    }
+                }
+            } else {
+                // WSE
+                OMElement message = ctx.getSoapBody().getFirstElement();
+                if (message != null) {
+                    messages.add(message);
+                }
+            }
+
+            // Dispatch messages
+            try {
+                for (OMElement message : messages) {
+                    amqpBroadcastSender.Send(message);
+                    amqpTopicSender.Send(message);
+                    amqpSender.Send(message);
+                }
+            } catch (AMQPException e) {
+                logger.warn("Failed to send AMQP notification.[Reason=" + e.getMessage() + "]");
+            }
+        }
+    }
+}