You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/25 17:14:58 UTC

git commit: adding code to send the events through rabbitmq

Repository: airavata
Updated Branches:
  refs/heads/messaging_framework 7ed2e3479 -> 61214dbfa


adding code to send the events through rabbitmq


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/61214dbf
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/61214dbf
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/61214dbf

Branch: refs/heads/messaging_framework
Commit: 61214dbfaa66ead4a40ad786a04d90f1375f8224
Parents: 7ed2e34
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Sep 25 10:28:03 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Sep 25 11:14:02 2014 -0400

----------------------------------------------------------------------
 .../airavata/common/utils/ThriftUtils.java      | 37 ++++++++
 .../airavata/messaging/core/MessageContext.java | 44 +++++++++
 .../airavata/messaging/core/Publisher.java      | 24 ++++-
 .../messaging/core/PublisherFactory.java        | 50 +++++++++++
 .../core/impl/AiravataRabbitMQPublisher.java    | 42 ---------
 .../messaging/core/impl/RabbitMQProducer.java   | 23 ++---
 .../messaging/core/impl/RabbitMQPublisher.java  | 93 ++++++++++++++++++++
 7 files changed, 260 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
new file mode 100644
index 0000000..ee86f74
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.common.utils;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+public class ThriftUtils {
+    public static byte[] serializeThriftObject(TBase object) throws TException {
+        return new TSerializer().serialize(object);
+    }
+
+    public static void createThriftFromBytes(byte []bytes, TBase object) throws TException {
+        new TDeserializer().deserialize(object, bytes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
new file mode 100644
index 0000000..48fff59
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TBase;
+
+public class MessageContext {
+    private final TBase event;
+
+    private final MessageType type;
+
+    public MessageContext(TBase message, MessageType type) {
+        this.event = message;
+        this.type = type;
+    }
+
+    public TBase getEvent() {
+        return event;
+    }
+
+    public MessageType getType() {
+        return type;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index 24c8e2a..4452856 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -1,7 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
 package org.apache.airavata.messaging.core;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.model.messaging.event.*;
 
 public interface Publisher {
-    public void publish(Message message);
+    public void publish(MessageContext message) throws AiravataException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
new file mode 100644
index 0000000..116e9b4
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PublisherFactory {
+    private static Logger log = LoggerFactory.getLogger(PublisherFactory.class);
+
+    public Publisher createPublisher() throws AiravataException {
+        String activityPublisher = ServerSettings.getActivityPublisher();
+
+        if (activityPublisher == null) {
+            String s = "Activity publisher is not specified";
+            log.error(s);
+            throw new AiravataException(s);
+        }
+
+        try {
+            Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class);
+            return aPublisher.newInstance();
+        } catch (Exception e) {
+            String msg = "Failed to load the publisher from the publisher class property: " + activityPublisher;
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
deleted file mode 100644
index 0fda442..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-
-public class AiravataRabbitMQPublisher implements Publisher {
-    private String brokerUrl;
-    private String routingKey;
-    private String exchangeName;
-    private int prefetchCount;
-    private boolean isRequeueOnFail;
-
-    public AiravataRabbitMQPublisher() {
-
-        RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
-    }
-
-    public void publish(Message message) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index e52161f..0ad8705 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 public class RabbitMQProducer {
+    public static final int DEFAULT_PRE_FETCH = 64;
+
     private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
 
     private Connection connection;
@@ -40,23 +42,25 @@ public class RabbitMQProducer {
 
     private String exchangeName;
 
-    private String routingKey;
-
-    private int prefetchCount;
+    private int prefetchCount = DEFAULT_PRE_FETCH;
 
     private boolean isReQueueOnFail = false;
 
     private String url;
 
-    public RabbitMQProducer(String url, String routingKey, String exchangeName,
-                            int prefetchCount, boolean isReQueueOnFail) {
-        this.prefetchCount = prefetchCount;
-        this.isReQueueOnFail = isReQueueOnFail;
+    public RabbitMQProducer(String url, String exchangeName) {
         this.exchangeName = exchangeName;
-        this.routingKey = routingKey;
         this.url = url;
     }
 
+    public void setPrefetchCount(int prefetchCount) {
+        this.prefetchCount = prefetchCount;
+    }
+
+    public void setReQueueOnFail(boolean isReQueueOnFail) {
+        this.isReQueueOnFail = isReQueueOnFail;
+    }
+
     private void reset() {
         consumerTag = null;
     }
@@ -109,7 +113,7 @@ public class RabbitMQProducer {
         }
     }
 
-    public void send(byte []message) throws Exception {
+    public void send(byte []message, String routingKey) throws Exception {
         try {
             channel.basicPublish(exchangeName, routingKey, null, message);
         } catch (IOException e) {
@@ -125,7 +129,6 @@ public class RabbitMQProducer {
             connectionFactory.setUri(url);
             Connection connection = connectionFactory.newConnection();
             connection.addShutdownListener(new ShutdownListener() {
-                @Override
                 public void shutdownCompleted(ShutdownSignalException cause) {
                 }
             });

http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
new file mode 100644
index 0000000..d9ad7e4
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQPublisher implements Publisher {
+    public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+    public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+    private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
+
+    private RabbitMQProducer rabbitMQProducer;
+
+    public RabbitMQPublisher() throws Exception {
+        String brokerUrl;
+        String exchangeName;
+        try {
+            brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
+            exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+        } catch (ApplicationSettingsException e) {
+            String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+            log.error(message, e);
+            throw new AiravataException(message, e);
+        }
+        rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+        rabbitMQProducer.open();
+    }
+
+    public void publish(MessageContext msgCtx) throws AiravataException {
+        try {
+            byte []body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+            Message message = new Message();
+            message.setEvent(body);
+            String routingKey = null;
+            if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+                ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+                routingKey = event.getExperimentId();
+            } else if (msgCtx.getType().equals(MessageType.TASK)) {
+                TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+                routingKey = event.getTaskIdentity().getExperimentId() + "." +
+                        event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+            }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+                WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
+                WorkflowIdentity workflowNodeIdentity = event.getWorkflowNodeIdentity();
+                routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+            }else if (msgCtx.getType().equals(MessageType.JOB)){
+                JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+                JobIdentity identity = event.getJobIdentity();
+                routingKey = identity.getExperimentId() + "." +
+                        identity.getWorkflowNodeId() + "." +
+                        identity.getTaskId() + "." +
+                        identity.getJobId();
+            }
+            rabbitMQProducer.send(body, routingKey);
+        } catch (TException e) {
+            String msg = "Error while deserializing the object";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        } catch (Exception e) {
+            String msg = "Error while sending to rabbitmq";
+            log.error(msg, e);
+            throw new AiravataException(msg, e);
+        }
+    }
+}