You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2016/08/26 15:43:25 UTC
[30/50] [abbrv] airavata git commit: Fixed experiment cancellation
issues
Fixed experiment cancellation issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/309a9ff8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/309a9ff8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/309a9ff8
Branch: refs/heads/lahiru/AIRAVATA-2057
Commit: 309a9ff83945b59930d9b37cff52e4fb6e405b5c
Parents: bc37334
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Tue Aug 16 15:37:14 2016 -0400
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Tue Aug 16 15:37:14 2016 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 6 ++-
.../server/OrchestratorServerHandler.java | 56 +++++++++++++++-----
2 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 8ce1c65..e489b43 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -1458,13 +1458,15 @@ public class AiravataServerHandler implements Airavata.Iface {
switch (experimentStatus.getState()) {
case COMPLETED: case CANCELED: case FAILED: case CANCELING:
logger.warn("Can't terminate already {} experiment", experimentStatus.getState().name());
+ break;
case CREATED:
logger.warn("Experiment termination is only allowed for launched experiments.");
+ break;
default:
submitCancelExperiment(airavataExperimentId, gatewayId);
-
+ logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId);
+ break;
}
- logger.debug("Airavata cancelled experiment with experiment id : " + airavataExperimentId);
} catch (RegistryServiceException | AiravataException e) {
logger.error(airavataExperimentId, "Error while cancelling the experiment...", e);
AiravataSystemException exception = new AiravataSystemException();
http://git-wip-us.apache.org/repos/asf/airavata/blob/309a9ff8/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index b425c5e..17bceb4 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -90,6 +90,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.BiConsumer;
public class OrchestratorServerHandler implements OrchestratorService.Iface {
private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
@@ -612,25 +613,56 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
@Override
public void onMessage(MessageContext messageContext) {
- if (messageContext.getType() != MessageType.EXPERIMENT) {
- experimentSubscriber.sendAck(messageContext.getDeliveryTag());
- log.error("Orchestrator got un-support message type : " + messageContext.getType());
+
+ switch (messageContext.getType()) {
+ case EXPERIMENT:
+ launchExperiment(messageContext);
+ break;
+ case EXPERIMENT_CANCEL:
+ cancelExperiment(messageContext);
+ break;
+ default:
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ log.error("Orchestrator got un-support message type : " + messageContext.getType());
+ break;
}
+ }
+
+ private void cancelExperiment(MessageContext messageContext) {
try {
byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
ThriftUtils.createThriftFromBytes(bytes, expEvent);
- if (messageContext.isRedeliver()) {
- // TODO - handle redelivery scenario
- experimentSubscriber.sendAck(messageContext.getDeliveryTag());
- } else {
- launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
- experimentSubscriber.sendAck(messageContext.getDeliveryTag());
- }
+ terminateExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
} catch (TException e) {
- log.error("Experiment launch failed due to Thrift conversion error", e);
- experimentSubscriber.sendAck(messageContext.getDeliveryTag());
+ log.error("Experiment cancellation failed due to Thrift conversion error", e);
+ }finally {
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
}
+
+ }
+ }
+
+ private void launchExperiment(MessageContext messageContext) {
+ try {
+ byte[] bytes = ThriftUtils.serializeThriftObject(messageContext.getEvent());
+ ExperimentSubmitEvent expEvent = new ExperimentSubmitEvent();
+ ThriftUtils.createThriftFromBytes(bytes, expEvent);
+ if (messageContext.isRedeliver()) {
+ ExperimentModel experimentModel = (ExperimentModel) experimentCatalog.
+ get(ExperimentCatalogModelType.EXPERIMENT, expEvent.getExperimentId());
+ if (experimentModel.getExperimentStatus().getState() == ExperimentState.CREATED) {
+ launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+ }
+ } else {
+ launchExperiment(expEvent.getExperimentId(), expEvent.getGatewayId());
+ }
+ } catch (TException e) {
+ log.error("Experiment launch failed due to Thrift conversion error", e);
+ } catch (RegistryException e) {
+ log.error("Experiment launch failed due to registry access issue", e);
+ }finally {
+ experimentSubscriber.sendAck(messageContext.getDeliveryTag());
}
}