You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2014/09/25 17:14:58 UTC
git commit: adding code to send the events through rabbitmq
Repository: airavata
Updated Branches:
refs/heads/messaging_framework 7ed2e3479 -> 61214dbfa
adding code to send the events through rabbitmq
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/61214dbf
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/61214dbf
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/61214dbf
Branch: refs/heads/messaging_framework
Commit: 61214dbfaa66ead4a40ad786a04d90f1375f8224
Parents: 7ed2e34
Author: Chathuri Wimalasena <ka...@gmail.com>
Authored: Thu Sep 25 10:28:03 2014 -0400
Committer: Chathuri Wimalasena <ka...@gmail.com>
Committed: Thu Sep 25 11:14:02 2014 -0400
----------------------------------------------------------------------
.../airavata/common/utils/ThriftUtils.java | 37 ++++++++
.../airavata/messaging/core/MessageContext.java | 44 +++++++++
.../airavata/messaging/core/Publisher.java | 24 ++++-
.../messaging/core/PublisherFactory.java | 50 +++++++++++
.../core/impl/AiravataRabbitMQPublisher.java | 42 ---------
.../messaging/core/impl/RabbitMQProducer.java | 23 ++---
.../messaging/core/impl/RabbitMQPublisher.java | 93 ++++++++++++++++++++
7 files changed, 260 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
new file mode 100644
index 0000000..ee86f74
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ThriftUtils.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.common.utils;
+
+import org.apache.thrift.TBase;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+
+public class ThriftUtils {
+ public static byte[] serializeThriftObject(TBase object) throws TException {
+ return new TSerializer().serialize(object);
+ }
+
+ public static void createThriftFromBytes(byte []bytes, TBase object) throws TException {
+ new TDeserializer().deserialize(object, bytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
new file mode 100644
index 0000000..48fff59
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageContext.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.thrift.TBase;
+
+public class MessageContext {
+ private final TBase event;
+
+ private final MessageType type;
+
+ public MessageContext(TBase message, MessageType type) {
+ this.event = message;
+ this.type = type;
+ }
+
+ public TBase getEvent() {
+ return event;
+ }
+
+ public MessageType getType() {
+ return type;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index 24c8e2a..4452856 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -1,7 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
package org.apache.airavata.messaging.core;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.model.messaging.event.*;
public interface Publisher {
- public void publish(Message message);
+ public void publish(MessageContext message) throws AiravataException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
new file mode 100644
index 0000000..116e9b4
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PublisherFactory {
+ private static Logger log = LoggerFactory.getLogger(PublisherFactory.class);
+
+ public Publisher createPublisher() throws AiravataException {
+ String activityPublisher = ServerSettings.getActivityPublisher();
+
+ if (activityPublisher == null) {
+ String s = "Activity publisher is not specified";
+ log.error(s);
+ throw new AiravataException(s);
+ }
+
+ try {
+ Class<? extends Publisher> aPublisher = Class.forName(activityPublisher).asSubclass(Publisher.class);
+ return aPublisher.newInstance();
+ } catch (Exception e) {
+ String msg = "Failed to load the publisher from the publisher class property: " + activityPublisher;
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
deleted file mode 100644
index 0fda442..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/AiravataRabbitMQPublisher.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.model.messaging.event.*;
-
-public class AiravataRabbitMQPublisher implements Publisher {
- private String brokerUrl;
- private String routingKey;
- private String exchangeName;
- private int prefetchCount;
- private boolean isRequeueOnFail;
-
- public AiravataRabbitMQPublisher() {
-
- RabbitMQProducer rabbitMQProducer = new RabbitMQProducer(brokerUrl, routingKey, exchangeName, prefetchCount, isRequeueOnFail);
- }
-
- public void publish(Message message) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
index e52161f..0ad8705 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProducer.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
public class RabbitMQProducer {
+ public static final int DEFAULT_PRE_FETCH = 64;
+
private static Logger log = LoggerFactory.getLogger(RabbitMQProducer.class);
private Connection connection;
@@ -40,23 +42,25 @@ public class RabbitMQProducer {
private String exchangeName;
- private String routingKey;
-
- private int prefetchCount;
+ private int prefetchCount = DEFAULT_PRE_FETCH;
private boolean isReQueueOnFail = false;
private String url;
- public RabbitMQProducer(String url, String routingKey, String exchangeName,
- int prefetchCount, boolean isReQueueOnFail) {
- this.prefetchCount = prefetchCount;
- this.isReQueueOnFail = isReQueueOnFail;
+ public RabbitMQProducer(String url, String exchangeName) {
this.exchangeName = exchangeName;
- this.routingKey = routingKey;
this.url = url;
}
+ public void setPrefetchCount(int prefetchCount) {
+ this.prefetchCount = prefetchCount;
+ }
+
+ public void setReQueueOnFail(boolean isReQueueOnFail) {
+ this.isReQueueOnFail = isReQueueOnFail;
+ }
+
private void reset() {
consumerTag = null;
}
@@ -109,7 +113,7 @@ public class RabbitMQProducer {
}
}
- public void send(byte []message) throws Exception {
+ public void send(byte []message, String routingKey) throws Exception {
try {
channel.basicPublish(exchangeName, routingKey, null, message);
} catch (IOException e) {
@@ -125,7 +129,6 @@ public class RabbitMQProducer {
connectionFactory.setUri(url);
Connection connection = connectionFactory.newConnection();
connection.addShutdownListener(new ShutdownListener() {
- @Override
public void shutdownCompleted(ShutdownSignalException cause) {
}
});
http://git-wip-us.apache.org/repos/asf/airavata/blob/61214dbf/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
new file mode 100644
index 0000000..d9ad7e4
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQPublisher.java
@@ -0,0 +1,93 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RabbitMQPublisher implements Publisher {
+ public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+ public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+ private static Logger log = LoggerFactory.getLogger(RabbitMQPublisher.class);
+
+ private RabbitMQProducer rabbitMQProducer;
+
+ public RabbitMQPublisher() throws Exception {
+ String brokerUrl;
+ String exchangeName;
+ try {
+ brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ rabbitMQProducer = new RabbitMQProducer(brokerUrl, exchangeName);
+ rabbitMQProducer.open();
+ }
+
+ public void publish(MessageContext msgCtx) throws AiravataException {
+ try {
+ byte []body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
+ Message message = new Message();
+ message.setEvent(body);
+ String routingKey = null;
+ if (msgCtx.getType().equals(MessageType.EXPERIMENT)){
+ ExperimentStatusChangeEvent event = (ExperimentStatusChangeEvent) msgCtx.getEvent();
+ routingKey = event.getExperimentId();
+ } else if (msgCtx.getType().equals(MessageType.TASK)) {
+ TaskStatusChangeEvent event = (TaskStatusChangeEvent) msgCtx.getEvent();
+ routingKey = event.getTaskIdentity().getExperimentId() + "." +
+ event.getTaskIdentity().getWorkflowNodeId() + "." + event.getTaskIdentity().getTaskId();
+ }else if (msgCtx.getType().equals(MessageType.WORKFLOWNODE)){
+ WorkflowNodeStatusChangeEvent event = (WorkflowNodeStatusChangeEvent) msgCtx.getEvent();
+ WorkflowIdentity workflowNodeIdentity = event.getWorkflowNodeIdentity();
+ routingKey = workflowNodeIdentity.getExperimentId() + "." + workflowNodeIdentity.getWorkflowNodeId();
+ }else if (msgCtx.getType().equals(MessageType.JOB)){
+ JobStatusChangeEvent event = (JobStatusChangeEvent)msgCtx.getEvent();
+ JobIdentity identity = event.getJobIdentity();
+ routingKey = identity.getExperimentId() + "." +
+ identity.getWorkflowNodeId() + "." +
+ identity.getTaskId() + "." +
+ identity.getJobId();
+ }
+ rabbitMQProducer.send(body, routingKey);
+ } catch (TException e) {
+ String msg = "Error while deserializing the object";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ } catch (Exception e) {
+ String msg = "Error while sending to rabbitmq";
+ log.error(msg, e);
+ throw new AiravataException(msg, e);
+ }
+ }
+}