You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2016/08/16 19:37:19 UTC

airavata git commit: Fixed experiment cancellation issues

Repository: airavata
Updated Branches:
  refs/heads/develop bc3733487 -> 309a9ff83


Fixed experiment cancellation issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/309a9ff8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/309a9ff8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/309a9ff8

Branch: refs/heads/develop
Commit: 309a9ff83945b59930d9b37cff52e4fb6e405b5c
Parents: bc37334
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 16 15:37:14 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 16 15:37:14 2016 -0400

----------------------------------------------------------------------
 .../server/handler/AiravataServerHandler.java   |  6 ++-
 .../server/OrchestratorServerHandler.java       | 56 +++++++++++++++-----
 2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 8ce1c65..e489b43 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -1458,13 +1458,15 @@ public class AiravataServerHandler implements Airavata.Iface {
             switch (experimentStatus.getState()) {
                 case COMPLETED: case CANCELED: case FAILED: case CANCELING:
                     logger.warn("Can't terminate already {} experiment", experimentStatus.getState().name());
+                    break;
                 case CREATED:
                     logger.warn("Experiment termination is only allowed for launched experiments.");
+                    break;
                 default:
                     submitCancelExperiment(airavataExperimentId, gatewayId);
-
+                    logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId);
+                    break;
             }
-            logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId);
         } catch (RegistryServiceException | AiravataException e) {
             logger.error(airavataExperimentId, "Error while cancelling the experiment...", e);
             AiravataSystemException exception = new AiravataSystemException();

http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b425c5e..17bceb4 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiConsumer;
 
 public class OrchestratorServerHandler implements OrchestratorService.Iface {
 	private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -612,25 +613,56 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
 		@Override
 		public void onMessage(MessageContext messageContext) {
-			if (messageContext.getType() != MessageType.EXPERIMENT) {
-				experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-				log.error("Orchestrator got un-support message type : " + messageContext.getType());
+
+			switch (messageContext.getType()) {
+				case EXPERIMENT:
+					launchExperiment(messageContext);
+					break;
+				case EXPERIMENT_CANCEL:
+                    cancelExperiment(messageContext);
+					break;
+				default:
+					experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+					log.error("Orchestrator got un-support message type : " + messageContext.getType());
+					break;
 			}
+		}
+
+		private void cancelExperiment(MessageContext messageContext) {
 			try {
 				byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
 				ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
 				ThriftUtils.createThriftFromBytes(bytes, expEvent);
-				if (messageContext.isRedeliver()) {
-                    // TODO - handle redelivery scenario
-                    experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-                } else {
-                    launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
-                    experimentSubscriber.sendAck(messageContext.getDeliveryTag());
-                }
+				terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
 			} catch (TException e) {
-				log.error("Experiment launch failed due to Thrift conversion error", e);
-                experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+				log.error("Experiment cancellation failed due to Thrift conversion error", e);
+			}finally {
+				experimentSubscriber.sendAck(messageContext.getDeliveryTag());
 			}
+
+		}
+	}
+
+	private void launchExperiment(MessageContext messageContext) {
+		try {
+            byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+            ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+            ThriftUtils.createThriftFromBytes(bytes, expEvent);
+            if (messageContext.isRedeliver()) {
+				ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.
+						get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId());
+				if (experimentModel.getExperimentStatus().getState() == ExperimentState.CREATED) {
+					launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+				}
+            } else {
+                launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+            }
+		} catch (TException e) {
+            log.error("Experiment launch failed due to Thrift conversion error", e);
+		} catch (RegistryException e) {
+			log.error("Experiment launch failed due to registry access issue", e);
+		}finally {
+			experimentSubscriber.sendAck(messageContext.getDeliveryTag());
 		}
 	}