You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/09 22:19:41 UTC
[3/4] airavata git commit: Refactored messaging module to remove
duplicate code and support multiple subscribers
Refactored messaging module to remove duplicate code and support multiple subscribers
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e247b00d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e247b00d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e247b00d
Branch: refs/heads/develop
Commit: e247b00d0b4d65a684f5e5551549c329241492d7
Parents: a6670f8
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 9 18:19:09 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 9 18:19:09 2016 -0400
----------------------------------------------------------------------
.../airavata/common/utils/ServerSettings.java | 42 +-
.../main/resources/airavata-server.properties | 12 +-
.../org/apache/airavata/gfac/impl/Factory.java | 26 +-
.../apache/airavata/gfac/impl/GFacWorker.java | 3 +-
.../airavata/gfac/server/GfacServerHandler.java | 82 +--
.../messaging/client/RabbitMQListener.java | 258 +++++----
.../airavata/messaging/core/Consumer.java | 40 --
.../airavata/messaging/core/MessageHandler.java | 4 +-
.../messaging/core/MessagingConstants.java | 3 +-
.../airavata/messaging/core/Metadata.java | 25 -
.../airavata/messaging/core/Publisher.java | 1 +
.../airavata/messaging/core/TestClient.java | 56 +-
.../impl/RabbitMQProcessLaunchConsumer.java | 574 +++++++++----------
.../impl/RabbitMQProcessLaunchPublisher.java | 2 +-
.../core/impl/RabbitMQStatusConsumer.java | 286 ---------
.../server/OrchestratorServerHandler.java | 47 +-
.../ExperimentExecution.java | 391 +++++++------
.../workflow/core/WorkflowEnactmentService.java | 60 +-
.../workflow/core/WorkflowInterpreter.java | 21 +-
19 files changed, 785 insertions(+), 1148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
----------------------------------------------------------------------
diff --git a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
index 5699e9a..2459658 100644
--- a/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
+++ b/modules/commons/src/main/java/org/apache/airavata/common/utils/ServerSettings.java
@@ -84,8 +84,15 @@ public class ServerSettings extends ApplicationSettings {
public static final String JOB_NOTIFICATION_ENABLE = "job.notification.enable";
public static final String JOB_NOTIFICATION_EMAILIDS = "job.notification.emailids";
public static final String JOB_NOTIFICATION_FLAGS = "job.notification.flags";
- public static final String LAUNCH_QUEUE_NAME = "launch.queue.name";
- public static final String CANCEL_QUEUE_NAME = "cancel.queue.name";
+
+ public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
+ public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
+ public static final String RABBITMQ_PROCESS_EXCHANGE_NAME = "rabbitmq.process.exchange.name";
+ public static final String RABBITMQ_EXPERIMENT_EXCHANGE_NAME = "rabbitmq.experiment.exchange.name";
+ public static final String RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME = "process.launch.queue.name";
+ public static final String RABBITMQ_EXPERIMENT_LAUNCH_QUEUE_NAME = "experiment.launch.queue.name";
+ public static final String RABBITMQ_DURABLE_QUEUE="durable.queue";
+ public static final String RABBITMQ_PREFETCH_COUNT="prefetch.count";
// Workflow Enactment Service component configuration.
@@ -110,13 +117,36 @@ public class ServerSettings extends ApplicationSettings {
return getSetting(DEFAULT_USER);
}
- public static String getLaunchQueueName() {
- return getSetting(LAUNCH_QUEUE_NAME, "launch.queue");
+ public static String getRabbitmqProcessLaunchQueueName() {
+ return getSetting(RABBITMQ_PROCESS_LAUNCH_QUEUE_NAME, "process.launch.queue");
+ }
+
+ public static String getRabbitmqExperimentLaunchQueueName() {
+ return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment.launch.queue");
+ }
+
+ public static String getRabbitmqBrokerUrl() throws ApplicationSettingsException {
+ return getSetting(RABBITMQ_BROKER_URL);
+ }
+
+ public static String getRabbitmqStatusExchangeName(){
+ return getSetting(RABBITMQ_STATUS_EXCHANGE_NAME, "status_exchange");
+ }
+
+ public static String getRabbitmqProcessExchangeName(){
+ return getSetting(RABBITMQ_PROCESS_EXCHANGE_NAME, "process_exchange");
}
+ public static String getRabbitmqExperimentExchangeName() {
+ return getSetting(RABBITMQ_EXPERIMENT_EXCHANGE_NAME, "experiment_exchange");
+ }
+
+ public static boolean getRabbitmqDurableQueue(){
+ return Boolean.valueOf(getSetting(RABBITMQ_DURABLE_QUEUE, "false"));
+ }
- public static String getCancelQueueName() {
- return getSetting(CANCEL_QUEUE_NAME, "cancel.queue");
+ public static int getRabbitmqPrefetchCount(){
+ return Integer.valueOf(getSetting(RABBITMQ_PREFETCH_COUNT, "200"));
}
public static String getDefaultUserPassword() throws ApplicationSettingsException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 4fcdd03..29b256f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -246,16 +246,14 @@ rabbitmq.broker.url=amqp://localhost:5672
#for production scenarios, give url as amqp://userName:password@hostName:portNumber/virtualHost, create user, virtualhost
# and give permissions, refer: http://blog.dtzq.com/2012/06/rabbitmq-users-and-virtual-hosts.html
#rabbitmq.broker.url=amqp://airavata:airavata@localhost:5672/messaging
-status.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-task.launch.publisher=org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchPublisher
-rabbitmq.status.exchange.name=airavata_rabbitmq_exchange
-rabbitmq.task.launch.exchange.name=airavata_task_launch_rabbitmq_exchange
+rabbitmq.status.exchange.name=status_exchange
+rabbitmq.process.exchange.name=process_exchange
+rabbitmq.experiment.exchange.name=experiment_exchange
durable.queue=false
prefetch.count=200
-launch.queue.name=launch.queue
-cancel.queue.name=cancel.queue
+process.launch.queue.name=process.launch.queue
+experiment.launch.queue.name=experiment.launch.queue
activity.publisher=org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher
-rabbitmq.exchange.name=airavata_rabbitmq_exchange
###########################################################################
# Zookeeper Server Configuration
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
index fbeb1d8..d105c18 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/Factory.java
@@ -37,7 +37,6 @@ import org.apache.airavata.gfac.core.GFacEngine;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.JobManagerConfiguration;
-import org.apache.airavata.gfac.core.SSHApiException;
import org.apache.airavata.gfac.core.authentication.AuthenticationInfo;
import org.apache.airavata.gfac.core.authentication.SSHKeyAuthentication;
import org.apache.airavata.gfac.core.cluster.OutputParser;
@@ -55,15 +54,23 @@ import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.watcher.CancelRequestWatcher;
import org.apache.airavata.gfac.core.watcher.RedeliveryRequestWatcher;
-import org.apache.airavata.gfac.impl.job.*;
+import org.apache.airavata.gfac.impl.job.ForkJobConfiguration;
+import org.apache.airavata.gfac.impl.job.LSFJobConfiguration;
+import org.apache.airavata.gfac.impl.job.PBSJobConfiguration;
+import org.apache.airavata.gfac.impl.job.SlurmJobConfiguration;
+import org.apache.airavata.gfac.impl.job.UGEJobConfiguration;
import org.apache.airavata.gfac.impl.task.ArchiveTask;
import org.apache.airavata.gfac.impl.watcher.CancelRequestWatcherImpl;
import org.apache.airavata.gfac.impl.watcher.RedeliveryRequestWatcherImpl;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.MonitorMode;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManagerType;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.StoragePreference;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
@@ -81,6 +88,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -106,7 +114,7 @@ public abstract class Factory {
private static Map<DataMovementProtocol, Task> dataMovementTask = new HashMap<>();
private static Map<ResourceJobManagerType, ResourceConfig> resources = new HashMap<>();
private static Map<MonitorMode, JobMonitor> jobMonitorServices = new HashMap<>();
- private static RabbitMQProcessLaunchConsumer processLaunchConsumer;
+ private static Subscriber processLaunchSubscriber;
private static Map<String, Session> sessionMap = new HashMap<>();
public static GFacEngine getGFacEngine() throws GFacException {
@@ -159,11 +167,11 @@ public abstract class Factory {
return curatorClient;
}
- public static RabbitMQProcessLaunchConsumer getProcessLaunchConsumer() throws AiravataException {
- if (processLaunchConsumer == null) {
- processLaunchConsumer = new RabbitMQProcessLaunchConsumer();
+ public static synchronized Subscriber getProcessLaunchSubscriber() throws AiravataException {
+ if (processLaunchSubscriber == null) {
+ processLaunchSubscriber = MessagingFactory.getSubscriber(message -> {}, new ArrayList<>(), Subscriber.Type.PROCESS_LAUNCH);
}
- return processLaunchConsumer;
+ return processLaunchSubscriber;
}
public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws GFacException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index 6dc4c1d..001b679 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -29,7 +29,6 @@ import org.apache.airavata.gfac.core.context.ProcessContext;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
-import org.apache.airavata.registry.core.experiment.catalog.model.Process;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +239,7 @@ public class GFacWorker implements Runnable {
try {
long processDeliveryTag = GFacUtils.getProcessDeliveryTag(processContext.getCuratorClient(),
processContext.getExperimentId(), processId);
- Factory.getProcessLaunchConsumer().sendAck(processDeliveryTag);
+ Factory.getProcessLaunchSubscriber().sendAck(processDeliveryTag);
processContext.setAcknowledge(true);
log.info("expId: {}, processId: {} :- Sent ack for deliveryTag {}", processContext.getExperimentId(),
processId, processDeliveryTag);
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c59e199..a490d91 100644
--- a/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -28,7 +28,6 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
import org.apache.airavata.common.utils.listener.AbstractActivityListener;
-import org.apache.airavata.gfac.core.GFac;
import org.apache.airavata.gfac.core.GFacException;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
@@ -37,11 +36,15 @@ import org.apache.airavata.gfac.impl.Factory;
import org.apache.airavata.gfac.impl.GFacWorker;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
+import org.apache.airavata.messaging.core.MessagingFactory;
import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProcessLaunchConsumer;
+import org.apache.airavata.messaging.core.Subscriber;
import org.apache.airavata.messaging.core.impl.RabbitMQStatusPublisher;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.ProcessIdentifier;
+import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ProcessSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
import org.apache.airavata.registry.cpi.AppCatalog;
@@ -60,16 +63,14 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Calendar;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class GfacServerHandler implements GfacService.Iface {
private final static Logger log = LoggerFactory.getLogger(GfacServerHandler.class);
- private RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer;
+ private Subscriber processLaunchSubscriber;
private static int requestCount=0;
private ExperimentCatalog experimentCatalog;
private AppCatalog appCatalog;
@@ -88,7 +89,6 @@ public class GfacServerHandler implements GfacService.Iface {
initZkDataStructure();
initAMQPClient();
executorService = Executors.newFixedThreadPool(ServerSettings.getGFacThreadPoolSize());
- startStatusUpdators(experimentCatalog, curatorClient, statusPublisher, rabbitMQProcessLaunchConsumer);
} catch (Exception e) {
throw new AiravataStartupException("Gfac Server Initialization error ", e);
}
@@ -96,9 +96,10 @@ public class GfacServerHandler implements GfacService.Iface {
private void initAMQPClient() throws AiravataException {
// init process consumer
- rabbitMQProcessLaunchConsumer = Factory.getProcessLaunchConsumer();
- rabbitMQProcessLaunchConsumer.listen(new ProcessLaunchMessageHandler());
- // init status publisher
+ List<String> routingKeys = new ArrayList<>();
+ routingKeys.add(ServerSettings.getRabbitmqProcessLaunchQueueName());
+ processLaunchSubscriber = MessagingFactory.getSubscriber(new ProcessLaunchMessageHandler(),routingKeys, Subscriber.Type.PROCESS_LAUNCH);
+ // init status publisher
statusPublisher = new RabbitMQStatusPublisher();
}
@@ -173,25 +174,6 @@ public class GfacServerHandler implements GfacService.Iface {
return false;
}
- public static void startStatusUpdators(ExperimentCatalog experimentCatalog, CuratorFramework curatorClient, Publisher publisher,
-
- RabbitMQProcessLaunchConsumer rabbitMQProcessLaunchConsumer) {
- /* try {
- String[] listenerClassList = ServerSettings.getActivityListeners();
- Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
- for (String listenerClass : listenerClassList) {
- Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
- AbstractActivityListener abstractActivityListener = aClass.newInstance();
- activityListeners.add(abstractActivityListener);
- abstractActivityListener.setup(statusPublisher, experimentCatalog, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
- log.info("Registering listener: " + listenerClass);
- statusPublisher.registerListener(abstractActivityListener);
- }
- } catch (Exception e) {
- log.error("Error loading the listener classes configured in airavata-server.properties", e);
- }*/
- }
-
private class ProcessLaunchMessageHandler implements MessageHandler {
private String experimentNode;
private String gfacServerName;
@@ -201,15 +183,6 @@ public class GfacServerHandler implements GfacService.Iface {
gfacServerName = ServerSettings.getGFacServerName();
}
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- ArrayList<String> keys = new ArrayList<String>();
- keys.add(ServerSettings.getLaunchQueueName());
- keys.add(ServerSettings.getCancelQueueName());
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, keys);
- props.put(MessagingConstants.RABBIT_QUEUE, ServerSettings.getLaunchQueueName());
- return props;
- }
public void onMessage(MessageContext message) {
log.info(" Message Received with message id '" + message.getMessageId()
@@ -232,7 +205,7 @@ public class GfacServerHandler implements GfacService.Iface {
} catch (Exception e) {
log.error("Error while updating delivery tag for redelivery message , messageId : " +
message.getMessageId(), e);
- rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+ processLaunchSubscriber.sendAck(message.getDeliveryTag());
}
} else {
// read process status from registry
@@ -264,7 +237,7 @@ public class GfacServerHandler implements GfacService.Iface {
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
Factory.getDefaultExpCatalog().update(ExperimentCatalogModelType.PROCESS_STATUS, status, event.getProcessId());
publishProcessStatus(event, status);
- rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+ processLaunchSubscriber.sendAck(message.getDeliveryTag());
return;
} else {
setCancelData(event.getExperimentId(),event.getProcessId());
@@ -273,7 +246,7 @@ public class GfacServerHandler implements GfacService.Iface {
submitProcess(event.getProcessId(), event.getGatewayId(), event.getTokenId());
} catch (Exception e) {
log.error(e.getMessage(), e);
- rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
+ processLaunchSubscriber.sendAck(message.getDeliveryTag());
}
} catch (TException e) {
log.error(e.getMessage(), e); //nobody is listening so nothing to throw
@@ -283,31 +256,6 @@ public class GfacServerHandler implements GfacService.Iface {
log.error("Error while publishing process status", e);
}
}
- // TODO - Now there is no process termination type messages, use zookeeper instead of rabbitmq to do that. it is safe to remove this else part.
- else if (message.getType().equals(MessageType.TERMINATEPROCESS)) {
- ProcessTerminateEvent event = new ProcessTerminateEvent();
- TBase messageEvent = message.getEvent();
- try {
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- boolean success = GFacUtils.setExperimentCancelRequest(event.getProcessId(), curatorClient,
- message.getDeliveryTag());
- if (success) {
- log.info("processId:{} - Process cancel request save successfully", event.getProcessId());
- }
- } catch (Exception e) {
- log.error("processId:" + event.getProcessId() + " - Process cancel reqeust failed", e);
- }finally {
- try {
- if (!rabbitMQProcessLaunchConsumer.isOpen()) {
- rabbitMQProcessLaunchConsumer.reconnect();
- }
- rabbitMQProcessLaunchConsumer.sendAck(message.getDeliveryTag());
- } catch (AiravataException e) {
- log.error("processId: " + event.getProcessId() + " - Failed to send acknowledgement back to cancel request.", e);
- }
- }
- }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
----------------------------------------------------------------------
diff --git a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
index 301934b..984aa59 100644
--- a/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
+++ b/modules/messaging/client/src/main/java/org/apache/airavata/messaging/client/RabbitMQListener.java
@@ -22,25 +22,32 @@
package org.apache.airavata.messaging.client;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.commons.cli.*;
+import org.apache.airavata.messaging.core.MessagingFactory;
+import org.apache.airavata.messaging.core.Subscriber;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.JobStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskStatusChangeEvent;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class RabbitMQListener {
@@ -48,12 +55,9 @@ public class RabbitMQListener {
public static final String RABBITMQ_EXCHANGE_NAME = "rabbitmq.exchange.name";
private final static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
private static String gatewayId = "*";
- private static boolean gatewayLevelMessages = false;
- private static boolean experimentLevelMessages = false;
- private static boolean jobLevelMessages = false;
private static String experimentId = "*";
private static String jobId = "*";
- private static boolean allMessages = false;
+ private static LEVEL level = LEVEL.ALL;
public static void main(String[] args) {
File file = new File("/tmp/latency_client");
@@ -64,66 +68,39 @@ public class RabbitMQListener {
String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
System.out.println("broker url " + brokerUrl);
final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
- RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
- consumer.listen(new MessageHandler() {
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- List<String> routingKeys = new ArrayList<String>();
- if (allMessages){
- routingKeys.add("*");
- routingKeys.add("*.*");
- routingKeys.add("*.*.*");
- routingKeys.add("*.*.*.*");
- routingKeys.add("*.*.*.*.*");
- }else {
- if (gatewayLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + ".*");
- routingKeys.add(gatewayId + ".*.*");
- routingKeys.add(gatewayId + ".*.*.*");
- routingKeys.add(gatewayId + ".*.*.*.*");
- }else if (experimentLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*.*");
- }else if (jobLevelMessages){
- routingKeys.add(gatewayId);
- routingKeys.add(gatewayId + "." + experimentId);
- routingKeys.add(gatewayId + "." + experimentId+ ".*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*.*");
- routingKeys.add(gatewayId + "." + experimentId+ ".*." + jobId);
- }
- }
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
+ List<String> routingKeys = getRoutingKeys(level);
+ Subscriber subscriber = MessagingFactory.getSubscriber(null, routingKeys, Subscriber.Type.STATUS);
+ } catch (ApplicationSettingsException e) {
+ logger.error("Error reading airavata server properties", e);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ }
- @Override
- public void onMessage(MessageContext message) {
- try {
- long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
- bw.write(message.getMessageId() + " :" + latency);
- bw.newLine();
- bw.flush();
- } catch (IOException e) {
- e.printStackTrace();
- }
- if (message.getType().equals(MessageType.EXPERIMENT)){
- try {
- ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }else if (message.getType().equals(MessageType.PROCESS)){
+ }
+
+ private static MessageHandler getMessageHandler(final BufferedWriter bw) {
+ return message -> {
+ try {
+ long latency = System.currentTimeMillis() - message.getUpdatedTime().getTime();
+ bw.write(message.getMessageId() + " :" + latency);
+ bw.newLine();
+ bw.flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (message.getType().equals(MessageType.EXPERIMENT)) {
+ try {
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ } else if (message.getType().equals(MessageType.PROCESS)) {
/*try {
WorkflowNodeStatusChangeEvent event = new WorkflowNodeStatusChangeEvent();
TBase messageEvent = message.getEvent();
@@ -135,93 +112,132 @@ public class RabbitMQListener {
} catch (TException e) {
logger.error(e.getMessage(), e);
}*/
- }else if (message.getType().equals(MessageType.TASK)){
- try {
- TaskStatusChangeEvent event = new TaskStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getTaskIdentity().getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }else if (message.getType().equals(MessageType.JOB)){
- try {
- JobStatusChangeEvent event = new JobStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
- " for Gateway " + event.getJobIdentity().getGatewayId());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }
+ } else if (message.getType().equals(MessageType.TASK)) {
+ try {
+ TaskStatusChangeEvent event = new TaskStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getTaskIdentity().getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
}
- });
- } catch (ApplicationSettingsException e) {
- logger.error("Error reading airavata server properties", e);
- }catch (Exception e) {
- logger.error(e.getMessage(), e);
- }
+ } else if (message.getType().equals(MessageType.JOB)) {
+ try {
+ JobStatusChangeEvent event = new JobStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString() +
+ " for Gateway " + event.getJobIdentity().getGatewayId());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ };
}
+ private static List<String> getRoutingKeys(LEVEL level) {
+ List<String> routingKeys = new ArrayList<String>();
+ switch (level) {
+ case ALL:
+ routingKeys.add("*");
+ routingKeys.add("*.*");
+ routingKeys.add("*.*.*");
+ routingKeys.add("*.*.*.*");
+ routingKeys.add("*.*.*.*.*");
+ break;
+ case GATEWAY:
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + ".*");
+ routingKeys.add(gatewayId + ".*.*");
+ routingKeys.add(gatewayId + ".*.*.*");
+ routingKeys.add(gatewayId + ".*.*.*.*");
+ break;
+ case EXPERIMENT:
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId + ".*");
+ routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+ routingKeys.add(gatewayId + "." + experimentId + ".*.*.*");
+ break;
+ case JOB:
+ routingKeys.add(gatewayId);
+ routingKeys.add(gatewayId + "." + experimentId);
+ routingKeys.add(gatewayId + "." + experimentId + ".*");
+ routingKeys.add(gatewayId + "." + experimentId + ".*.*");
+ routingKeys.add(gatewayId + "." + experimentId + ".*." + jobId);
+ break;
+ default:
+ break;
+ }
+ return routingKeys;
+ }
+
public static void parseArguments(String[] args) {
- try{
+ try {
Options options = new Options();
- options.addOption("gId", true , "Gateway ID");
+ options.addOption("gId", true, "Gateway ID");
options.addOption("eId", true, "Experiment ID");
options.addOption("jId", true, "Job ID");
options.addOption("a", false, "All Notifications");
CommandLineParser parser = new PosixParser();
- CommandLine cmd = parser.parse( options, args);
- if (cmd.getOptions() == null || cmd.getOptions().length == 0){
+ CommandLine cmd = parser.parse(options, args);
+ if (cmd.getOptions() == null || cmd.getOptions().length == 0) {
logger.info("You have not specified any options. We assume you need to listen to all the messages...");
- allMessages = true;
+ level = LEVEL.ALL;
gatewayId = "*";
}
- if (cmd.hasOption("a")){
+ if (cmd.hasOption("a")) {
logger.info("Listening to all the messages...");
- allMessages = true;
+ level = LEVEL.ALL;
gatewayId = "*";
- }else {
+ } else {
gatewayId = cmd.getOptionValue("gId");
- if (gatewayId == null){
+ if (gatewayId == null) {
gatewayId = "*";
logger.info("You have not specified a gateway id. We assume you need to listen to all the messages...");
} else {
- gatewayLevelMessages = true;
+ level = LEVEL.GATEWAY;
}
experimentId = cmd.getOptionValue("eId");
- if (experimentId == null && !gatewayId.equals("*")){
+ if (experimentId == null && !gatewayId.equals("*")) {
experimentId = "*";
logger.info("You have not specified a experiment id. We assume you need to listen to all the messages for the gateway with id " + gatewayId);
} else if (experimentId == null && gatewayId.equals("*")) {
experimentId = "*";
logger.info("You have not specified a experiment id and a gateway id. We assume you need to listen to all the messages...");
- }else {
- experimentLevelMessages = true;
+ } else {
+ level = LEVEL.EXPERIMENT;
}
jobId = cmd.getOptionValue("jId");
- if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")){
+ if (jobId == null && !gatewayId.equals("*") && !experimentId.equals("*")) {
jobId = "*";
logger.info("You have not specified a job id. We assume you need to listen to all the messages for the gateway with id " + gatewayId
- + " with experiment id : " + experimentId );
+ + " with experiment id : " + experimentId);
} else if (jobId == null && gatewayId.equals("*") && experimentId.equals("*")) {
jobId = "*";
logger.info("You have not specified a job Id or experiment Id or a gateway Id. We assume you need to listen to all the messages...");
- }else {
- jobLevelMessages = true;
+ } else {
+ level = LEVEL.JOB;
}
}
} catch (ParseException e) {
- logger.error("Error while reading command line parameters" , e);
+ logger.error("Error while reading command line parameters", e);
}
}
+
+ private enum LEVEL {
+ ALL,
+ GATEWAY,
+ EXPERIMENT,
+ JOB;
+ }
}
+
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
deleted file mode 100644
index eb557ac..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Consumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-import org.apache.airavata.common.exception.AiravataException;
-
-/**
- * This is the basic consumer
- */
-public interface Consumer {
- /**
- * Start listening for messages, The binding properties are specified in the handler.
- * Returns and unique id to this Consumer. This id can be used to stop the listening
- * @param handler
- * @return
- * @throws AiravataException
- */
- public String listen(MessageHandler handler) throws AiravataException;
-
- public void stopListen(final String id) throws AiravataException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index c5f8b3d..23646da 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -21,10 +21,8 @@
package org.apache.airavata.messaging.core;
-import java.util.Map;
-
+@FunctionalInterface
public interface MessageHandler {
- Map<String, Object> getProperties();
void onMessage(MessageContext message);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
index 7fb77a9..a64d18d 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessagingConstants.java
@@ -21,10 +21,11 @@
package org.apache.airavata.messaging.core;
+@Deprecated
public abstract class MessagingConstants {
public static final String RABBITMQ_BROKER_URL = "rabbitmq.broker.url";
public static final String RABBITMQ_STATUS_EXCHANGE_NAME = "rabbitmq.status.exchange.name";
- public static final String RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME = "rabbitmq.task.launch.exchange.name";
+ public static final String RABBITMQ_TASK_EXCHANGE_NAME = "rabbitmq.task.exchange.name";
public static final String RABBIT_ROUTING_KEY = "routingKey";
public static final String RABBIT_QUEUE= "queue";
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
deleted file mode 100644
index cbf41c1..0000000
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Metadata.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.messaging.core;
-
-public class Metadata {
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
index dea8b00..b8b586c 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/Publisher.java
@@ -26,6 +26,7 @@ import org.apache.airavata.common.exception.AiravataException;
/**
* This is the basic publisher interface.
*/
+@FunctionalInterface
public interface Publisher {
/**
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
index 89fe7ce..daa9886 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/TestClient.java
@@ -22,10 +22,8 @@
package org.apache.airavata.messaging.core;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.thrift.TBase;
@@ -34,9 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
public class TestClient {
@@ -47,36 +43,10 @@ public class TestClient {
public static void main(String[] args) {
try {
- String brokerUrl = ServerSettings.getSetting(RABBITMQ_BROKER_URL);
- final String exchangeName = ServerSettings.getSetting(RABBITMQ_EXCHANGE_NAME);
- RabbitMQStatusConsumer consumer = new RabbitMQStatusConsumer(brokerUrl, exchangeName);
- consumer.listen(new MessageHandler() {
- @Override
- public Map<String, Object> getProperties() {
- Map<String, Object> props = new HashMap<String, Object>();
- List<String> routingKeys = new ArrayList<String>();
- routingKeys.add(experimentId);
- routingKeys.add(experimentId + ".*");
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, routingKeys);
- return props;
- }
-
- @Override
- public void onMessage(MessageContext message) {
- if (message.getType().equals(MessageType.EXPERIMENT)){
- try {
- ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
- TBase messageEvent = message.getEvent();
- byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
- ThriftUtils.createThriftFromBytes(bytes, event);
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
- } catch (TException e) {
- logger.error(e.getMessage(), e);
- }
- }
- }
- });
+ List<String> routingKeys = new ArrayList<>();
+ routingKeys.add(experimentId);
+ routingKeys.add(experimentId + ".*");
+ MessagingFactory.getSubscriber(getMessageHandler(),routingKeys, Subscriber.Type.STATUS);
} catch (ApplicationSettingsException e) {
logger.error("Error reading airavata server properties", e);
}catch (Exception e) {
@@ -84,4 +54,22 @@ public class TestClient {
}
}
+
+
+ private static MessageHandler getMessageHandler(){
+ return message -> {
+ if (message.getType().equals(MessageType.EXPERIMENT)){
+ try {
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent();
+ TBase messageEvent = message.getEvent();
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
+ ThriftUtils.createThriftFromBytes(bytes, event);
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType() + "' and with state : '" + event.getState().toString());
+ } catch (TException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
index 855ae27..456ca07 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchConsumer.java
@@ -1,288 +1,288 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.messaging.core.impl;
-
-import com.rabbitmq.client.*;
-import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.ThriftUtils;
-import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
-import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.model.messaging.event.*;
-import org.apache.thrift.TBase;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RabbitMQProcessLaunchConsumer {
- private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
- private static Logger log = LoggerFactory.getLogger(RabbitMQStatusConsumer.class);
-
- private String taskLaunchExchangeName;
- private String url;
- private Connection connection;
- private Channel channel;
- private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
- private boolean durableQueue;
- private MessageHandler messageHandler;
- private int prefetchCount;
-
-
- public RabbitMQProcessLaunchConsumer() throws AiravataException {
- try {
- url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
- taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
- prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
- createConnection();
- } catch (ApplicationSettingsException e) {
- String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
- log.error(message, e);
- throw new AiravataException(message, e);
- }
- }
-
- public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
- this.taskLaunchExchangeName = exchangeName;
- this.url = brokerUrl;
-
- createConnection();
- }
-
- private void createConnection() throws AiravataException {
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(url);
- connectionFactory.setAutomaticRecoveryEnabled(true);
- connection = connectionFactory.newConnection();
- connection.addShutdownListener(new ShutdownListener() {
- public void shutdownCompleted(ShutdownSignalException cause) {
- }
- });
- log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
-
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
-
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void reconnect() throws AiravataException{
- if(messageHandler!=null) {
- try {
- listen(messageHandler);
- } catch (AiravataException e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
-
- }
- }
- }
- public String listen(final MessageHandler handler) throws AiravataException {
- try {
- messageHandler = handler;
- Map<String, Object> props = handler.getProperties();
- final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
- if (routing == null) {
- throw new IllegalArgumentException("The routing key must be present");
- }
- List<String> keys = new ArrayList<String>();
- if (routing instanceof List) {
- for (Object o : (List)routing) {
- keys.add(o.toString());
- }
- } else if (routing instanceof String) {
- keys.add((String) routing);
- }
-
- String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
- String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
- if (queueName == null) {
- if (!channel.isOpen()) {
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
-// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
- }
- queueName = channel.queueDeclare().getQueue();
- } else {
-
- channel.queueDeclare(queueName, durableQueue, false, false, null);
- }
-
- final String id = getId(keys, queueName);
- if (queueDetailsMap.containsKey(id)) {
- throw new IllegalStateException("This subscriber is already defined for this Consumer, " +
- "cannot define the same subscriber twice");
- }
-
- if (consumerTag == null) {
- consumerTag = "default";
- }
-
- // bind all the routing keys
-// for (String routingKey : keys) {
-// channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.messaging.core.impl;
+//
+//import com.rabbitmq.client.*;
+//import org.apache.airavata.common.exception.AiravataException;
+//import org.apache.airavata.common.exception.ApplicationSettingsException;
+//import org.apache.airavata.common.utils.AiravataUtils;
+//import org.apache.airavata.common.utils.ServerSettings;
+//import org.apache.airavata.common.utils.ThriftUtils;
+//import org.apache.airavata.messaging.core.MessageContext;
+//import org.apache.airavata.messaging.core.MessageHandler;
+//import org.apache.airavata.messaging.core.MessagingConstants;
+//import org.apache.airavata.model.messaging.event.*;
+//import org.apache.thrift.TBase;
+//import org.apache.thrift.TException;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//public class RabbitMQProcessLaunchConsumer {
+// private final static Logger logger = LoggerFactory.getLogger(RabbitMQProcessLaunchConsumer.class);
+// private static Logger log = LoggerFactory.getLogger(RabbitMQStatusSubscriber.class);
+//
+// private String taskLaunchExchangeName;
+// private String url;
+// private Connection connection;
+// private Channel channel;
+// private Map<String, QueueDetails> queueDetailsMap = new HashMap<String, QueueDetails>();
+// private boolean durableQueue;
+// private MessageHandler messageHandler;
+// private int prefetchCount;
+//
+//
+// public RabbitMQProcessLaunchConsumer() throws AiravataException {
+// try {
+// url = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
+// durableQueue = Boolean.parseBoolean(ServerSettings.getSetting(MessagingConstants.DURABLE_QUEUE));
+// taskLaunchExchangeName = ServerSettings.getSetting(MessagingConstants.RABBITMQ_TASK_LAUNCH_EXCHANGE_NAME);
+// prefetchCount = Integer.valueOf(ServerSettings.getSetting(MessagingConstants.PREFETCH_COUNT, String.valueOf(64)));
+// createConnection();
+// } catch (ApplicationSettingsException e) {
+// String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
+// log.error(message, e);
+// throw new AiravataException(message, e);
+// }
+// }
+//
+// public RabbitMQProcessLaunchConsumer(String brokerUrl, String exchangeName) throws AiravataException {
+// this.taskLaunchExchangeName = exchangeName;
+// this.url = brokerUrl;
+//
+// createConnection();
+// }
+//
+// private void createConnection() throws AiravataException {
+// try {
+// ConnectionFactory connectionFactory = new ConnectionFactory();
+// connectionFactory.setUri(url);
+// connectionFactory.setAutomaticRecoveryEnabled(true);
+// connection = connectionFactory.newConnection();
+// connection.addShutdownListener(new ShutdownListener() {
+// public void shutdownCompleted(ShutdownSignalException cause) {
+// }
+// });
+// log.info("connected to rabbitmq: " + connection + " for " + taskLaunchExchangeName);
+//
+// channel = connection.createChannel();
+// channel.basicQos(prefetchCount);
+//
+//// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+//
+// } catch (Exception e) {
+// String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+// log.error(msg);
+// throw new AiravataException(msg, e);
+// }
+// }
+//
+// public void reconnect() throws AiravataException{
+// if(messageHandler!=null) {
+// try {
+// listen(messageHandler);
+// } catch (AiravataException e) {
+// String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+// log.error(msg);
+// throw new AiravataException(msg, e);
+//
// }
- // autoAck=false, we will ack after task is done
- channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body) {
- Message message = new Message();
-
- try {
- ThriftUtils.createThriftFromBytes(body, message);
- TBase event = null;
- String gatewayId = null;
- long deliveryTag = envelope.getDeliveryTag();
- if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
- ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
- ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
- log.debug(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
- " " +
- processSubmitEvent.getProcessId());
- event = processSubmitEvent;
- gatewayId = processSubmitEvent.getGatewayId();
- MessageContext messageContext = new MessageContext(event, message.getMessageType(),
- message.getMessageId(), gatewayId, deliveryTag);
- messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
- messageContext.setIsRedeliver(envelope.isRedeliver());
- handler.onMessage(messageContext);
- } else {
- log.error("{} message type is not handle in ProcessLaunch Consumer. Sending ack for " +
- "delivery tag {} ", message.getMessageType().name(), deliveryTag);
- sendAck(deliveryTag);
- }
- } catch (TException e) {
- String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
- log.warn(msg, e);
- }
- }
-
- @Override
- public void handleCancel(String consumerTag) throws IOException {
- super.handleCancel(consumerTag);
- log.info("Consumer cancelled : " + consumerTag);
- }
- });
-
- // save the name for deleting the queue
- queueDetailsMap.put(id, new QueueDetails(queueName, keys));
- return id;
- } catch (Exception e) {
- String msg = "could not open channel for exchange " + taskLaunchExchangeName;
- log.error(msg);
- throw new AiravataException(msg, e);
- }
- }
-
- public void stopListen(final String id) throws AiravataException {
- QueueDetails details = queueDetailsMap.get(id);
- if (details != null) {
- try {
- for (String key : details.getRoutingKeys()) {
- channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
- }
- } catch (IOException e) {
- String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
- log.debug(msg);
- }
- }
- }
-
- /**
- * Private class for holding some information about the consumers registered
- */
- private class QueueDetails {
- String queueName;
-
- List<String> routingKeys;
-
- private QueueDetails(String queueName, List<String> routingKeys) {
- this.queueName = queueName;
- this.routingKeys = routingKeys;
- }
-
- public String getQueueName() {
- return queueName;
- }
-
- public List<String> getRoutingKeys() {
- return routingKeys;
- }
- }
-
- private String getId(List<String> routingKeys, String queueName) {
- String id = "";
- for (String key : routingKeys) {
- id = id + "_" + key;
- }
- return id + "_" + queueName;
- }
-
- public void close() {
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException ignore) {
- }
- }
- }
- public boolean isOpen(){
- if(connection!=null){
- return connection.isOpen();
- }
- return false;
- }
-
- public void sendAck(long deliveryTag){
- try {
- if (channel.isOpen()){
- channel.basicAck(deliveryTag,false);
- }else {
- channel = connection.createChannel();
- channel.basicQos(prefetchCount);
- channel.basicAck(deliveryTag, false);
- }
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
- }
-}
+// }
+// }
+// public String listen(final MessageHandler handler) throws AiravataException {
+// try {
+// messageHandler = handler;
+// Map<String, Object> props = handler.getProperties();
+// final Object routing = props.get(MessagingConstants.RABBIT_ROUTING_KEY);
+// if (routing == null) {
+// throw new IllegalArgumentException("The routing key must be present");
+// }
+// List<String> keys = new ArrayList<String>();
+// if (routing instanceof List) {
+// for (Object o : (List)routing) {
+// keys.add(o.toString());
+// }
+// } else if (routing instanceof String) {
+// keys.add((String) routing);
+// }
+//
+// String queueName = (String) props.get(MessagingConstants.RABBIT_QUEUE);
+// String consumerTag = (String) props.get(MessagingConstants.RABBIT_CONSUMER_TAG);
+// if (queueName == null) {
+// if (!channel.isOpen()) {
+// channel = connection.createChannel();
+// channel.basicQos(prefetchCount);
+//// channel.exchangeDeclare(taskLaunchExchangeName, "fanout");
+// }
+// queueName = channel.queueDeclare().getQueue();
+// } else {
+//
+// channel.queueDeclare(queueName, durableQueue, false, false, null);
+// }
+//
+// final String id = getId(keys, queueName);
+// if (queueDetailsMap.containsKey(id)) {
+// throw new IllegalStateException("This subscriber is already defined for this Subscriber, " +
+// "cannot define the same subscriber twice");
+// }
+//
+// if (consumerTag == null) {
+// consumerTag = "default";
+// }
+//
+// // bind all the routing keys
+//// for (String routingKey : keys) {
+//// channel.queueBind(queueName, taskLaunchExchangeName, routingKey);
+//// }
+// // autoAck=false, we will ack after task is done
+// channel.basicConsume(queueName, false, consumerTag, new QueueingConsumer(channel) {
+// @Override
+// public void handleDelivery(String consumerTag,
+// Envelope envelope,
+// AMQP.BasicProperties properties,
+// byte[] body) {
+// Message message = new Message();
+//
+// try {
+// ThriftUtils.createThriftFromBytes(body, message);
+// TBase event = null;
+// String gatewayId = null;
+// long deliveryTag = envelope.getDeliveryTag();
+// if (message.getMessageType().equals(MessageType.LAUNCHPROCESS)) {
+// ProcessSubmitEvent processSubmitEvent = new ProcessSubmitEvent();
+// ThriftUtils.createThriftFromBytes(message.getEvent(), processSubmitEvent);
+// log.debug(" Message Received with message id '" + message.getMessageId()
+// + "' and with message type '" + message.getMessageType() + "' for experimentId:" +
+// " " +
+// processSubmitEvent.getProcessId());
+// event = processSubmitEvent;
+// gatewayId = processSubmitEvent.getGatewayId();
+// MessageContext messageContext = new MessageContext(event, message.getMessageType(),
+// message.getMessageId(), gatewayId, deliveryTag);
+// messageContext.setUpdatedTime(AiravataUtils.getTime(message.getUpdatedTime()));
+// messageContext.setIsRedeliver(envelope.isRedeliver());
+// handler.onMessage(messageContext);
+// } else {
+// log.error("{} message type is not handle in ProcessLaunch Subscriber. Sending ack for " +
+// "delivery tag {} ", message.getMessageType().name(), deliveryTag);
+// sendAck(deliveryTag);
+// }
+// } catch (TException e) {
+// String msg = "Failed to de-serialize the thrift message, from routing keys and queueName " + id;
+// log.warn(msg, e);
+// }
+// }
+//
+// @Override
+// public void handleCancel(String consumerTag) throws IOException {
+// super.handleCancel(consumerTag);
+// log.info("Subscriber cancelled : " + consumerTag);
+// }
+// });
+//
+// // save the name for deleting the queue
+// queueDetailsMap.put(id, new QueueDetails(queueName, keys));
+// return id;
+// } catch (Exception e) {
+// String msg = "could not open channel for exchange " + taskLaunchExchangeName;
+// log.error(msg);
+// throw new AiravataException(msg, e);
+// }
+// }
+//
+// public void stopListen(final String id) throws AiravataException {
+// QueueDetails details = queueDetailsMap.get(id);
+// if (details != null) {
+// try {
+// for (String key : details.getRoutingKeys()) {
+// channel.queueUnbind(details.getQueueName(), taskLaunchExchangeName, key);
+// }
+// } catch (IOException e) {
+// String msg = "could not un-bind queue: " + details.getQueueName() + " for exchange " + taskLaunchExchangeName;
+// log.debug(msg);
+// }
+// }
+// }
+//
+// /**
+// * Private class for holding some information about the consumers registered
+// */
+// private class QueueDetails {
+// String queueName;
+//
+// List<String> routingKeys;
+//
+// private QueueDetails(String queueName, List<String> routingKeys) {
+// this.queueName = queueName;
+// this.routingKeys = routingKeys;
+// }
+//
+// public String getQueueName() {
+// return queueName;
+// }
+//
+// public List<String> getRoutingKeys() {
+// return routingKeys;
+// }
+// }
+//
+// private String getId(List<String> routingKeys, String queueName) {
+// String id = "";
+// for (String key : routingKeys) {
+// id = id + "_" + key;
+// }
+// return id + "_" + queueName;
+// }
+//
+// public void close() {
+// if (connection != null) {
+// try {
+// connection.close();
+// } catch (IOException ignore) {
+// }
+// }
+// }
+// public boolean isOpen(){
+// if(connection!=null){
+// return connection.isOpen();
+// }
+// return false;
+// }
+//
+// public void sendAck(long deliveryTag){
+// try {
+// if (channel.isOpen()){
+// channel.basicAck(deliveryTag,false);
+// }else {
+// channel = connection.createChannel();
+// channel.basicQos(prefetchCount);
+// channel.basicAck(deliveryTag, false);
+// }
+// } catch (IOException e) {
+// logger.error(e.getMessage(), e);
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/e247b00d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
index e488f26..5cf960e 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/impl/RabbitMQProcessLaunchPublisher.java
@@ -42,7 +42,7 @@ public class RabbitMQProcessLaunchPublisher implements Publisher{
String brokerUrl;
try {
brokerUrl = ServerSettings.getSetting(MessagingConstants.RABBITMQ_BROKER_URL);
- launchTask = ServerSettings.getLaunchQueueName();
+ launchTask = ServerSettings.getRabbitmqProcessLaunchQueueName();
} catch (ApplicationSettingsException e) {
String message = "Failed to get read the required properties from airavata to initialize rabbitmq";
log.error(message, e);