You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/11 18:43:54 UTC

[2/2] airavata git commit: Orchestrator consumerls experiment submit events

Orchestrator consumerls experiment submit events


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/54f5c34d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/54f5c34d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/54f5c34d

Branch: refs/heads/api-orch-workqueue
Commit: 54f5c34d4eed40acd643a0388926c3c3b62be526
Parents: 4e3dc9a
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Thu Aug 11 14:42:48 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Thu Aug 11 14:42:48 2016 -0400

----------------------------------------------------------------------
 .../airavata/messaging/core/MessageHandler.java |  2 +-
 .../server/OrchestratorServerHandler.java       | 37 ++++++++++++++++++--
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
----------------------------------------------------------------------
diff --git a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
index 23646da..bc47e68 100644
--- a/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
+++ b/modules/messaging/core/src/main/java/org/apache/airavata/messaging/core/MessageHandler.java
@@ -24,5 +24,5 @@ package org.apache.airavata.messaging.core;
 @FunctionalInterface
 public interface MessageHandler {
 
-    void onMessage(MessageContext message);
+    void onMessage(MessageContext messageContext);
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/54f5c34d/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 03f6f8a..b425c5e 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -48,6 +48,7 @@ import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.ExperimentSubmitEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessIdentifier;
 import org.apache.airavata.model.messaging.event.ProcessStatusChangeEvent;
@@ -99,7 +100,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private String airavataUserName;
 	private String gatewayName;
 	private Publisher publisher;
-	private Subscriber statusSubscribe;
+	private final Subscriber statusSubscribe;
+	private final Subscriber experimentSubscriber;
 	private CuratorFramework curatorClient;
 
     /**
@@ -112,7 +114,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	public OrchestratorServerHandler() throws OrchestratorException{
 		try {
 	        publisher = MessagingFactory.getPublisher(Type.STATUS);
-            setAiravataUserName(ServerSettings.getDefaultUser());
+			List<String> routingKeys = new ArrayList<>();
+			routingKeys.add(ServerSettings.getRabbitmqExperimentLaunchQueueName());
+			experimentSubscriber = MessagingFactory.getSubscriber(new ExperimentHandler(), routingKeys, Type.EXPERIMENT_LAUNCH);
+			setAiravataUserName(ServerSettings.getDefaultUser());
 		} catch (AiravataException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -601,4 +606,32 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 			}
 		}
 	}
+
+
+	private class ExperimentHandler implements MessageHandler {
+
+		@Override
+		public void onMessage(MessageContext messageContext) {
+			if (messageContext.getType() != MessageType.EXPERIMENT) {
+				experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+				log.error("Orchestrator got un-support message type : " + messageContext.getType());
+			}
+			try {
+				byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+				ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+				ThriftUtils.createThriftFromBytes(bytes, expEvent);
+				if (messageContext.isRedeliver()) {
+                    // TODO - handle redelivery scenario
+                    experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                } else {
+                    launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+                    experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+                }
+			} catch (TException e) {
+				log.error("Experiment launch failed due to Thrift conversion error", e);
+                experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+			}
+		}
+	}
+
 }