You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:30 UTC
[03/15] airavata git commit: adding consumers to gfac without leader
election
adding consumers to gfac without leader election
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/0149c1af
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/0149c1af
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/0149c1af
Branch: refs/heads/master
Commit: 0149c1afd477ab1d86d19374bcb9824520d3a3bc
Parents: 88d27d9
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Feb 11 15:48:40 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Feb 11 15:48:40 2015 -0500
----------------------------------------------------------------------
.../airavata/common/utils/ServerSettings.java | 21 +-
.../main/resources/airavata-server.properties | 7 +-
.../main/resources/airavata-server.properties | 5 +-
.../airavata/gfac/server/GfacServerHandler.java | 75 ++++-
.../messaging/core/MessagingConstants.java | 3 +-
.../messaging/core/PublisherFactory.java | 2 +-
.../airavata/messaging/core/TestClient.java | 5 +-
.../messaging/core/impl/RabbitMQConsumer.java | 258 -----------------
.../core/impl/RabbitMQStatusConsumer.java | 274 +++++++++++++++++++
.../core/impl/RabbitMQStatusPublisher.java | 2 +-
.../core/impl/RabbitMQTaskLaunchConsumer.java | 239 ++++++++++++++++
.../core/impl/RabbitMQTaskLaunchPublisher.java | 12 +-
.../core/utils/OrchestratorConstants.java | 1 -
.../airavata/xbaya/messaging/Monitor.java | 5 +-
14 files changed, 614 insertions(+), 295 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 4ea0b44..b076e6a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -28,7 +28,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
public class ServerSettings extends ApplicationSettings {
- private static final String DEFAULT_USER = "default.registry.user";
+ private static final String DEFAULT_USER = "default.registry.user";
private static final String DEFAULT_USER_PASSWORD = "default.registry.password";
private static final String DEFAULT_USER_GATEWAY = "default.registry.gateway";
@@ -51,13 +51,15 @@ public class ServerSettings extends ApplicationSettings {
private static final String MY_PROXY_USER = "myproxy.user";
private static final String MY_PROXY_PASSWORD = "myproxy.password";
private static final String MY_PROXY_LIFETIME = "myproxy.life";
- private static final String ACTIVITY_PUBLISHER = "activity.publisher";
+ private static final String STATUS_PUBLISHER = "status.publisher";
private static final String TASK_LAUNCH_PUBLISHER = "task.launch.publisher";
private static final String ACTIVITY_LISTENERS = "activity.listeners";
public static final String PUBLISH_RABBITMQ = "publish.rabbitmq";
public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
+ public static final String GFAC_PASSIVE = "gfac.passive"; // by default this is desabled
+
private static boolean stopAllThreads = false;
@@ -73,7 +75,7 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(DEFAULT_USER_GATEWAY);
}
- public static String getServerContextRoot() {
+ public static String getServerContextRoot() {
return getSetting(SERVER_CONTEXT_ROOT, "axis2");
}
@@ -151,19 +153,24 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(ACTIVITY_LISTENERS).split(",");
}
- public static String getActivityPublisher() throws ApplicationSettingsException{
- return getSetting(ACTIVITY_PUBLISHER);
+ public static String getStatusPublisher() throws ApplicationSettingsException {
+ return getSetting(STATUS_PUBLISHER);
}
- public static String getTaskLaunchPublisher() throws ApplicationSettingsException{
+ public static String getTaskLaunchPublisher() throws ApplicationSettingsException {
return getSetting(TASK_LAUNCH_PUBLISHER);
}
- public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException{
+ public static boolean isRabbitMqPublishEnabled() throws ApplicationSettingsException {
String setting = getSetting(PUBLISH_RABBITMQ);
return Boolean.parseBoolean(setting);
}
+ public static boolean isGFacPassiveMode()throws ApplicationSettingsException {
+ String setting = getSetting(GFAC_PASSIVE);
+ return Boolean.parseBoolean(setting);
+ }
+
public static boolean isEmbeddedZK() {
return Boolean.parseBoolean(getSetting(EMBEDDED_ZK, "true"));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index c90fab1..e309901 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -137,6 +137,7 @@ myproxy.password=
myproxy.life=3600
# XSEDE Trusted certificates can be downloaded from https://software.xsede.org/security/xsede-certs.tar.gz
trusted.cert.location=/Users/lahirugunathilake/Downloads/certificates
+gfac.passive=false
# SSH PKI key pair or ssh password can be used SSH based authentication is used.
# if user specify both password authentication gets the higher preference
@@ -215,10 +216,12 @@ connection.name=xsede
#publisher
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
+
###########################################################################
# Orchestrator module Configuration
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
index d6be51a..2ecdeb6 100644
--- a/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
+++ b/modules/credential-store-service/credential-store-webapp/src/main/resources/airavata-server.properties
@@ -200,10 +200,11 @@ connection.name=xsede
#publisher
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.listener.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
publish.rabbitmq=false
-activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQPublisher
+status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchPublisher
rabbitmq.broker.url=amqp://localhost:5672
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
+rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
###########################################################################
# Orchestrator module Configuration
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 583ec07..d428d9c 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -27,28 +27,34 @@ import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.logger.AiravataLogger;
import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.common.utils.*;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.Future;
@@ -80,6 +86,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private List<Future> inHandlerFutures;
+ private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+
public GfacServerHandler() throws Exception{
// registering with zk
try {
@@ -102,7 +110,13 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
BetterGfacImpl.startDaemonHandlers();
BetterGfacImpl.startStatusUpdators(registry, zk, publisher);
inHandlerFutures = new ArrayList<Future>();
- } catch (ApplicationSettingsException e) {
+
+ if(ServerSettings.isGFacPassiveMode()) {
+ rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
+ }
+
+
+ } catch (ApplicationSettingsException e) {
logger.error("Error initialising GFAC", e);
throw new Exception("Error initialising GFAC", e);
} catch (InterruptedException e) {
@@ -277,4 +291,49 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
}
}
+ private class NotificationMessageHandler implements MessageHandler {
+ private String experimentId;
+
+ private NotificationMessageHandler(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public Map<String, Object> getProperties() {
+ Map<String, Object> props = new HashMap<String, Object>();
+ List<String> routingKeys = new ArrayList<String>();
+ routingKeys.add(experimentId);
+ routingKeys.add(experimentId + ".*");
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
+ return props;
+ }
+
+ public void onMessage(MessageContext message) {
+ if (message.getType().equals(MessageType.LAUNCHTASK)){
+ try {
+ TaskSubmitEvent event = new TaskSubmitEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+ }
+ }else if(message.getType().equals(MessageType.TERMINATETASK)){
+ try {
+ TaskTerminateEvent event = new TaskTerminateEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ cancelJob(event.getExperimentId(), event.getTaskId());
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index 7458d81..07b39e7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -23,7 +23,8 @@ package org.apache.airavata.messaging.core;
public abstract class MessagingConstants {
public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
- public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
+ public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
+ public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name";
public static final String RABBIT_ROUTING_KEY = "routingKey";
public static final String RABBIT_QUEUE= "queue";
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
index 59cdbdf..2e560a3 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/PublisherFactory.java
@@ -30,7 +30,7 @@ public class PublisherFactory {
private static Logger log = LoggerFactory.getLogger(PublisherFactory.class);
public static Publisher createActivityPublisher() throws AiravataException {
- String activityPublisher = ServerSettings.getActivityPublisher();
+ String activityPublisher = ServerSettings.getStatusPublisher();
if (activityPublisher == null) {
String s = "Activity publisher is not specified";
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 362f3f2..aea561f 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -25,9 +25,8 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
-import org.apache.airavata.model.messaging.event.Message;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
@@ -51,7 +50,7 @@ public class TestClient {
AiravataUtils.setExecutionAsServer();
String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
- RabbitMQConsumer consumer = new RabbitMQConsumer(brokerUrl, exchangeName);
+ RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
consumer.listen(new MessageHandler() {
@Override
public Map<String, Object> getProperties() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
deleted file mode 100644
index 1f13496..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQConsumer.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core.impl;
-
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.Consumer;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQConsumer implements Consumer {
- private static Logger log = LoggerFactory.getLogger(RabbitMQConsumer.class);
-
- private String exchangeName;
- private String url;
- private Connection connection;
- private Channel channel;
- private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
-
- public RabbitMQConsumer() throws AiravataException {
- try {
- url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
-
- createConnection();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- }
-
- public RabbitMQConsumer(String brokerUrl, String exchangeName) throws AiravataException {
- this.exchangeName = exchangeName;
- this.url = brokerUrl;
-
- createConnection();
- }
-
- private void createConnection() throws AiravataException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
-
- channel = connection.createChannel();
- channel.exchangeDeclare(exchangeName, "topic", false);
-
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + exchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public String listen(final MessageHandler handler) throws AiravataException {
- try {
- Map<String, Object> props = handler.getProperties();
- final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
- if (routing == null) {
- throw new IllegalArgumentException("The routing key must be present");
- }
-
- List<String> keys = new ArrayList<String>();
- if (routing instanceof List) {
- for (Object o : (List)routing) {
- keys.add(o.toString());
- }
- } else if (routing instanceof String) {
- keys.add((String) routing);
- }
-
- String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
- String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
- if (queueName == null) {
- if (!channel.isOpen()) {
- channel = connection.createChannel();
- channel.exchangeDeclare(exchangeName, "topic", false);
- }
- queueName = channel.queueDeclare().getQueue();
- } else {
- channel.queueDeclare(queueName, true, false, false, null);
- }
-
- final String id = getId(keys, queueName);
- if (queueDetailsMap.containsKey(id)) {
- throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
- "cannot define the same subscriber twice");
- }
-
- if (consumerTag == null) {
- consumerTag = "default";
- }
-
- // bind all the routing keys
- for (String routingKey : keys) {
- channel.queueBind(queueName, exchangeName, routingKey);
- }
-
- channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- Message message = new Message();
-
- try {
- ThriftUtils.createThriftFromBytes(body, message);
- TBase event = null;
- String gatewayId = null;
- if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
- ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- experimentStatusChangeEvent.getState());
- event = experimentStatusChangeEvent;
- gatewayId = experimentStatusChangeEvent.getGatewayId();
- } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) {
- WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- wfnStatusChangeEvent.getState());
- event = wfnStatusChangeEvent;
- gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.TASK)) {
- TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- taskStatusChangeEvent.getState());
- event = taskStatusChangeEvent;
- gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
- } else if (message.getMessageType().equals(MessageType.JOB)) {
- JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' with status " +
- jobStatusChangeEvent.getState());
- event = jobStatusChangeEvent;
- gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
- }
- MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- handler.onMessage(messageContext);
- } catch (TException e) {
- String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
- log.warn(msg, e);
- }
- }
- });
- // save the name for deleting the queue
- queueDetailsMap.put(id, new QueueDetails(queueName, keys));
- return id;
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + exchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void stopListen(final String id) throws AiravataException {
- QueueDetails details = queueDetailsMap.get(id);
- if (details != null) {
- try {
- for (String key : details.getRoutingKeys()) {
- channel.queueUnbind(details.getQueueName(), exchangeName, key);
- }
- channel.queueDelete(details.getQueueName(), true, true);
- } catch (IOException e) {
- String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
- log.debug(msg);
- }
- }
- }
-
- /**
- * Private class for holding some information about the consumers registered
- */
- private class QueueDetails {
- String queueName;
-
- List<String> routingKeys;
-
- private QueueDetails(String queueName, List<String> routingKeys) {
- this.queueName = queueName;
- this.routingKeys = routingKeys;
- }
-
- public String getQueueName() {
- return queueName;
- }
-
- public List<String> getRoutingKeys() {
- return routingKeys;
- }
- }
-
- private String getId(List<String> routingKeys, String queueName) {
- String id = "";
- for (String key : routingKeys) {
- id = id + "_" + key;
- }
- return id + "_" + queueName;
- }
-
- public void close() {
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException ignore) {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
new file mode 100644
index 0000000..d5e8c72
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusConsumer.java
@@ -0,0 +1,274 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.messaging.core.impl;
+
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.Consumer;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQStatusConsumer implements Consumer {
+ private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+ private String exchangeName;
+ private String url;
+ private Connection connection;
+ private Channel channel;
+ private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+ public RabbitMQStatusConsumer() throws AiravataException {
+ try {
+ url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
+
+ createConnection();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ }
+
+ public RabbitMQStatusConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+ this.exchangeName = exchangeName;
+ this.url = brokerUrl;
+
+ createConnection();
+ }
+
+ private void createConnection() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(url);
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for " + exchangeName);
+
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchangeName, "topic", false);
+
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + exchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public String listen(final MessageHandler handler) throws AiravataException {
+ try {
+ Map<String, Object> props = handler.getProperties();
+ final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+ if (routing == null) {
+ throw new IllegalArgumentException("The routing key must be present");
+ }
+
+ List<String> keys = new ArrayList<String>();
+ if (routing instanceof List) {
+ for (Object o : (List)routing) {
+ keys.add(o.toString());
+ }
+ } else if (routing instanceof String) {
+ keys.add((String) routing);
+ }
+
+ String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+ String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+ if (queueName == null) {
+ if (!channel.isOpen()) {
+ channel = connection.createChannel();
+ channel.exchangeDeclare(exchangeName, "topic", false);
+ }
+ queueName = channel.queueDeclare().getQueue();
+ } else {
+ channel.queueDeclare(queueName, true, false, false, null);
+ }
+
+ final String id = getId(keys, queueName);
+ if (queueDetailsMap.containsKey(id)) {
+ throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+ "cannot define the same subscriber twice");
+ }
+
+ if (consumerTag == null) {
+ consumerTag = "default";
+ }
+
+ // bind all the routing keys
+ for (String routingKey : keys) {
+ channel.queueBind(queueName, exchangeName, routingKey);
+ }
+
+ channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ TBase event = null;
+ String gatewayId = null;
+ if (message.getMessageType().equals(MessageType.EXPERIMENT)) {
+ ExperimentStatusChangeEvent experimentStatusChangeEvent = new ExperimentStatusChangeEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), experimentStatusChangeEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' with status " +
+ experimentStatusChangeEvent.getState());
+ event = experimentStatusChangeEvent;
+ gatewayId = experimentStatusChangeEvent.getGatewayId();
+ } else if (message.getMessageType().equals(MessageType.WORKFLOWNODE)) {
+ WorkflowNodeStatusChangeEvent wfnStatusChangeEvent = new WorkflowNodeStatusChangeEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), wfnStatusChangeEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' with status " +
+ wfnStatusChangeEvent.getState());
+ event = wfnStatusChangeEvent;
+ gatewayId = wfnStatusChangeEvent.getWorkflowNodeIdentity().getGatewayId();
+ } else if (message.getMessageType().equals(MessageType.TASK)) {
+ TaskStatusChangeEvent taskStatusChangeEvent = new TaskStatusChangeEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskStatusChangeEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' with status " +
+ taskStatusChangeEvent.getState());
+ event = taskStatusChangeEvent;
+ gatewayId = taskStatusChangeEvent.getTaskIdentity().getGatewayId();
+ } else if (message.getMessageType().equals(MessageType.JOB)) {
+ JobStatusChangeEvent jobStatusChangeEvent = new JobStatusChangeEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), jobStatusChangeEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' with status " +
+ jobStatusChangeEvent.getState());
+ event = jobStatusChangeEvent;
+ gatewayId = jobStatusChangeEvent.getJobIdentity().getGatewayId();
+ }else if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
+ taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+ event = taskSubmitEvent;
+ gatewayId = taskSubmitEvent.getGatewayId();
+ }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+ TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
+ taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+ event = taskTerminateEvent;
+ gatewayId = null;
+ }
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ handler.onMessage(messageContext);
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+ log.warn(msg, e);
+ }
+ }
+ });
+ // save the name for deleting the queue
+ queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+ return id;
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + exchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void stopListen(final String id) throws AiravataException {
+ QueueDetails details = queueDetailsMap.get(id);
+ if (details != null) {
+ try {
+ for (String key : details.getRoutingKeys()) {
+ channel.queueUnbind(details.getQueueName(), exchangeName, key);
+ }
+ channel.queueDelete(details.getQueueName(), true, true);
+ } catch (IOException e) {
+ String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + exchangeName;
+ log.debug(msg);
+ }
+ }
+ }
+
+ /**
+ * Private class for holding some information about the consumers registered
+ */
+ private class QueueDetails {
+ String queueName;
+
+ List<String> routingKeys;
+
+ private QueueDetails(String queueName, List<String> routingKeys) {
+ this.queueName = queueName;
+ this.routingKeys = routingKeys;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public List<String> getRoutingKeys() {
+ return routingKeys;
+ }
+ }
+
+ private String getId(List<String> routingKeys, String queueName) {
+ String id = "";
+ for (String key : routingKeys) {
+ id = id + "_" + key;
+ }
+ return id + "_" + queueName;
+ }
+
+ public void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
index a4b4d1a..fe06ed7 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQStatusPublisher.java
@@ -45,7 +45,7 @@ public class RabbitMQStatusPublisher implements Publisher {
String exchangeName;
try {
brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
log.error(message, e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
new file mode 100644
index 0000000..056dcac
--- /dev/null
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchConsumer.java
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.messaging.core.impl;
+
+import com.rabbitmq.client.*;
+import org.apache.airavata.common.exception.AiravataException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.common.utils.ThriftUtils;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.MessageHandler;
+import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.model.messaging.event.*;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RabbitMQTaskLaunchConsumer {
+ private final static Logger logger = LoggerFactory.getLogger(RabbitMQTaskLaunchConsumer.class);
+ private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
+
+ private String taskLaunchExchangeName;
+ private String url;
+ private Connection connection;
+ private Channel channel;
+ private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+
+ public RabbitMQTaskLaunchConsumer() throws AiravataException {
+ try {
+ url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+ taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+ createConnection();
+ } catch (ApplicationSettingsException e) {
+ String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+ log.error(message, e);
+ throw new AiravataException(message, e);
+ }
+ }
+
+ public RabbitMQTaskLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+ this.taskLaunchExchangeName = exchangeName;
+ this.url = brokerUrl;
+
+ createConnection();
+ }
+
+ private void createConnection() throws AiravataException {
+ try {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setUri(url);
+ connection = connectionFactory.newConnection();
+ connection.addShutdownListener(new ShutdownListener() {
+ public void shutdownCompleted(ShutdownSignalException cause) {
+ }
+ });
+ log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+
+ channel = connection.createChannel();
+ channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public String listen(final MessageHandler handler) throws AiravataException {
+ try {
+ Map<String, Object> props = handler.getProperties();
+ final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+ if (routing == null) {
+ throw new IllegalArgumentException("The routing key must be present");
+ }
+
+ List<String> keys = new ArrayList<String>();
+ if (routing instanceof List) {
+ for (Object o : (List)routing) {
+ keys.add(o.toString());
+ }
+ } else if (routing instanceof String) {
+ keys.add((String) routing);
+ }
+
+ String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+ String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+ if (queueName == null) {
+ if (!channel.isOpen()) {
+ channel = connection.createChannel();
+ channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+ }
+ queueName = channel.queueDeclare().getQueue();
+ } else {
+ channel.queueDeclare(queueName, true, false, false, null);
+ }
+
+ final String id = getId(keys, queueName);
+ if (queueDetailsMap.containsKey(id)) {
+ throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
+ "cannot define the same subscriber twice");
+ }
+
+ if (consumerTag == null) {
+ consumerTag = "default";
+ }
+
+ // bind all the routing keys
+ for (String routingKey : keys) {
+ channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+ }
+
+ channel.basicConsume(queueName, true, consumerTag, new DefaultConsumer(channel) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body) {
+ Message message = new Message();
+
+ try {
+ ThriftUtils.createThriftFromBytes(body, message);
+ TBase event = null;
+ String gatewayId = null;
+ if(message.getMessageType().equals(MessageType.LAUNCHTASK)) {
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskSubmitEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
+ taskSubmitEvent.getExperimentId() + "and taskId: " + taskSubmitEvent.getTaskId());
+ event = taskSubmitEvent;
+ gatewayId = taskSubmitEvent.getGatewayId();
+ }else if(message.getMessageType().equals(MessageType.TERMINATETASK)) {
+ TaskTerminateEvent taskTerminateEvent = new TaskTerminateEvent();
+ ThriftUtils.createThriftFromBytes(message.getEvent(), taskTerminateEvent);
+ log.debug(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getMessageType() + "' for experimentId: " +
+ taskTerminateEvent.getExperimentId() + "and taskId: " + taskTerminateEvent.getTaskId());
+ event = taskTerminateEvent;
+ gatewayId = null;
+ }
+ MessageContext messageContext = new MessageContext(event, message.getMessageType(), message.getMessageId(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+ handler.onMessage(messageContext);
+ } catch (TException e) {
+ String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+ log.warn(msg, e);
+ }
+ }
+ });
+ // save the name for deleting the queue
+ queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+ return id;
+ } catch (Exception e) {
+ String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+ log.error(msg);
+ throw new AiravataException(msg, e);
+ }
+ }
+
+ public void stopListen(final String id) throws AiravataException {
+ QueueDetails details = queueDetailsMap.get(id);
+ if (details != null) {
+ try {
+ for (String key : details.getRoutingKeys()) {
+ channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+ }
+ channel.queueDelete(details.getQueueName(), true, true);
+ } catch (IOException e) {
+ String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+ log.debug(msg);
+ }
+ }
+ }
+
+ /**
+ * Private class for holding some information about the consumers registered
+ */
+ private class QueueDetails {
+ String queueName;
+
+ List<String> routingKeys;
+
+ private QueueDetails(String queueName, List<String> routingKeys) {
+ this.queueName = queueName;
+ this.routingKeys = routingKeys;
+ }
+
+ public String getQueueName() {
+ return queueName;
+ }
+
+ public List<String> getRoutingKeys() {
+ return routingKeys;
+ }
+ }
+
+ private String getId(List<String> routingKeys, String queueName) {
+ String id = "";
+ for (String key : routingKeys) {
+ id = id + "_" + key;
+ }
+ return id + "_" + queueName;
+ }
+
+ public void close() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (IOException ignore) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
index 8029a0c..fe58042 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQTaskLaunchPublisher.java
@@ -44,7 +44,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
String exchangeName;
try {
brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_EXCHANGE_NAME);
+ exchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_STATUS_EXCHANGE_NAME);
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
log.error(message, e);
@@ -56,7 +56,7 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
public void publish(MessageContext msgCtx) throws AiravataException {
try {
- log.info("Publishing to lauch queue ...");
+ log.info("Publishing to launch queue ...");
byte[] body = ThriftUtils.serializeThriftObject(msgCtx.getEvent());
Message message = new Message();
message.setEvent(body);
@@ -65,13 +65,9 @@ public class RabbitMQTaskLaunchPublisher implements Publisher{
message.setUpdatedTime(msgCtx.getUpdatedTime().getTime());
String routingKey = null;
if (msgCtx.getType().equals(MessageType.LAUNCHTASK)){
- TaskSubmitEvent event = (TaskSubmitEvent) msgCtx.getEvent();
- routingKey = LAUNCH_TASK + "."+event.getExperimentId() + "." +
- event.getTaskId() + "." + event.getGatewayId();
+ routingKey = LAUNCH_TASK;
}else if(msgCtx.getType().equals(MessageType.TERMINATETASK)){
- TaskTerminateEvent event = (TaskTerminateEvent) msgCtx.getEvent();
- routingKey = TERMINATE_TASK + "."+event.getExperimentId() + "." +
- event.getTaskId();
+ routingKey = TERMINATE_TASK;
}
byte[] messageBody = ThriftUtils.serializeThriftObject(message);
rabbitMQProducer.send(messageBody, routingKey);
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
index 0e0e425..97b85bc 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/utils/OrchestratorConstants.java
@@ -25,7 +25,6 @@ package org.apache.airavata.orchestrator.core.utils;
*
*/
public class OrchestratorConstants {
- private static final String SUBMITTER_PROPERTY = "job.submitter";
public static final String AIRAVATA_PROPERTIES = "airavata-server.properties";
public static final int hotUpdateInterval=1000;
public static final String JOB_SUBMITTER = "job.submitter";
http://git-wip-us.apache.org/repos/asf/airavata/blob/0149c1af/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
index 896b248..6ee1111 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/messaging/Monitor.java
@@ -22,12 +22,11 @@
package org.apache.airavata.xbaya.messaging;
import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.messaging.core.Consumer;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQConsumer;
+import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.workflow.model.exceptions.WorkflowException;
@@ -101,7 +100,7 @@ public class Monitor extends EventProducer {
getEventDataRepository().triggerListenerForPreMonitorStart();
try {
// AiravataUtils.setExecutionAsServer();
- this.messageClient = new RabbitMQConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange");
+ this.messageClient = new RabbitMQStatusConsumer("amqp://localhost:5672", "airavata_rabbitmq_exchange");
} catch (AiravataException e) {
String msg = "Failed to start the consumer";
logger.error(msg, e);