You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/11 19:06:58 UTC
[2/3] airavata git commit: Orchestrator consumerls experiment submit
events
Orchestrator consumerls experiment submit events
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/54f5c34d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/54f5c34d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/54f5c34d
Branch: refs/heads/develop
Commit: 54f5c34d4eed40acd643a0388926c3c3b62be526
Parents: 4e3dc9a
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Aug 11 14:42:48 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Aug 11 14:42:48 2016 -0400
----------------------------------------------------------------------
.../airavata/messaging/core/MessageHandler.java | 2 +-
.../server/OrchestratorServerHandler.java | 37 ++++++++++++++++++--
2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index 23646da..bc47e68 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -24,5 +24,5 @@ package org.apache.airavata.messaging.core;
@FunctionalInterface
public interface MessageHandler {
- void onMessage(MessageContext message);
+ void onMessage(MessageContext messageContext);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 03f6f8a..b425c5e 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -48,6 +48,7 @@ import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentType;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -99,7 +100,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
private String airavataUserName;
private String gatewayName;
private Publisher publisher;
- private Subscriber statusSubscribe;
+ private final Subscriber statusSubscribe;
+ private final Subscriber experimentSubscriber;
private CuratorFramework curatorClient;
/**
@@ -112,7 +114,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
public OrchestratorServerHandler() throws OrchestratorException{
try {
publisher = MessagingFactory.getPublisher(Type.STATUS);
- setAiravataUserName(ServerSettings.getDefaultUser());
+ List<String> routingKeys = new ArrayList<>();
+ routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
+ experimentSubscriber = MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, Type.EXPERIMENT_LAUNCH);
+ setAiravataUserName(ServerSettings.getDefaultUser());
} catch (AiravataException e) {
log.error(e.getMessage(), e);
throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -601,4 +606,32 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
}
}
+
+
+ private class ExperimentHandler implements MessageHandler {
+
+ @Override
+ public void onMessage(MessageContext messageContext) {
+ if (messageContext.getType() != MessageType.EXPERIMENT) {
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ log.error("Orchestrator got un-support message type : " + messageContext.getType());
+ }
+ try {
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+ ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+ ThriftUtils.createThriftFromBytes(bytes, expEvent);
+ if (messageContext.isRedeliver()) {
+ // TODO - handle redelivery scenario
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ } else {
+ launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ }
+ } catch (TException e) {
+ log.error("Experiment launch failed due to Thrift conversion error", e);
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ }
+ }
+ }
+
}