You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2016/02/08 17:48:46 UTC

[04/50] [abbrv] airavata git commit: publishing experiment statuses

publishing experiment statuses


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20fe7b44
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20fe7b44
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20fe7b44

Branch: refs/heads/master
Commit: 20fe7b44a1353b9b5ee060bcf0820b1265951e13
Parents: 70358df
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jan 19 16:43:57 2016 -0500
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jan 19 16:43:57 2016 -0500

----------------------------------------------------------------------
 .../server/OrchestratorServerHandler.java       | 19 +++++-------------
 .../orchestrator/util/OrchestratorUtils.java    | 21 ++++++++++++++++----
 2 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 66861dd..f04fdae 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -27,7 +27,6 @@ import org.apache.airavata.common.utils.AiravataUtils;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.ThriftUtils;
 import org.apache.airavata.common.utils.ZkConstants;
-import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.core.GFacUtils;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.messaging.core.MessageContext;
@@ -44,7 +43,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.experiment.ExperimentModel;
 import org.apache.airavata.model.experiment.ExperimentType;
-import org.apache.airavata.model.experiment.UserConfigurationDataModel;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.ProcessIdentifier;
@@ -180,15 +178,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
                 ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
                 status.setReason("submitted all processes");
                 status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                OrchestratorUtils.updageExperimentStatus(experimentId, status);
+                OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
                 log.info("expId: {}, Launched experiment ", experimentId);
-	            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
-			            experimentId,
-			            gatewayId);
-	            String messageId = AiravataUtils.getId("EXPERIMENT");
-	            MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
-	            messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-	            publisher.publish(messageContext);
                 OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token, gatewayId));
             } else if (executionType == ExperimentType.WORKFLOW) {
                 //its a workflow execution experiment
@@ -368,7 +359,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 					ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING);
 					status.setReason("Experiment cancel request processed");
 					status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-					OrchestratorUtils.updageExperimentStatus(experimentId, status);
+					OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
 					log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState());
 					return true;
 				}
@@ -423,12 +414,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 //				ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
 //				status.setReason("submitted all processes");
 //				status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-//				OrchestratorUtils.updageExperimentStatus(experimentId, status);
+//				OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status);
 //				log.info("expId: {}, Launched experiment ", experimentId);
 			} catch (Exception e) {
 	            ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED);
 	            status.setReason("Error while updating task status");
-	            OrchestratorUtils.updageExperimentStatus(experimentId, status);
+	            OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
 	            log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " +
 			            ExperimentState.FAILED, e);
                 ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.FAILED,
@@ -547,7 +538,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 					}
 					if (status.getState() != null) {
 						status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-						OrchestratorUtils.updageExperimentStatus(processIdentity.getExperimentId(), status);
+						OrchestratorUtils.updageAndPublishExperimentStatus(processIdentity.getExperimentId(), status, publisher, gatewayName);
 						log.info("expId : " + processIdentity.getExperimentId() + " :- Experiment status updated to " +
 								status.getState());
 					}

http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
index 834d3b6..0a9617d 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
@@ -20,8 +20,12 @@
  */
 package org.apache.airavata.orchestrator.util;
 
+import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.status.ExperimentStatus;
 import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
@@ -32,14 +36,23 @@ import org.slf4j.LoggerFactory;
 public class OrchestratorUtils {
 	private static final Logger log = LoggerFactory.getLogger(OrchestratorUtils.class);
 
-	public static void updageExperimentStatus(String experimentId, ExperimentStatus status) {
+	public static void updageAndPublishExperimentStatus(String experimentId, ExperimentStatus status, Publisher publisher, String gatewayId) {
 		try {
 			RegistryFactory.getDefaultExpCatalog().update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status,
 					experimentId);
+            ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(status.getState(),
+                    experimentId,
+                    gatewayId);
+            String messageId = AiravataUtils.getId("EXPERIMENT");
+            MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
+            messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+            publisher.publish(messageContext);
 		} catch (RegistryException e) {
 			log.error("expId : " + experimentId + " Error while updating experiment status to " + status.toString(), e);
-		}
-	}
+		} catch (AiravataException e) {
+            log.error("expId : " + experimentId + " Error while publishing experiment status to " + status.toString(), e);
+        }
+    }
 
 	public static ExperimentStatus getExperimentStatus(String experimentId) throws RegistryException {
 		return ((ExperimentStatus) RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType