You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2016/02/08 17:48:46 UTC
[04/50] [abbrv] airavata git commit: publishing experiment statuses
publishing experiment statuses
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/20fe7b44
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/20fe7b44
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/20fe7b44
Branch: refs/heads/master
Commit: 20fe7b44a1353b9b5ee060bcf0820b1265951e13
Parents: 70358df
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Tue Jan 19 16:43:57 2016 -0500
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Tue Jan 19 16:43:57 2016 -0500
----------------------------------------------------------------------
.../server/OrchestratorServerHandler.java | 19 +++++-------------
.../orchestrator/util/OrchestratorUtils.java | 21 ++++++++++++++++----
2 files changed, 22 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 66861dd..f04fdae 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -27,7 +27,6 @@ import org.apache.airavata.common.utils.AiravataUtils;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.common.utils.ThriftUtils;
import org.apache.airavata.common.utils.ZkConstants;
-import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.core.GFacUtils;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.messaging.core.MessageContext;
@@ -44,7 +43,6 @@ import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfil
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.ExperimentType;
-import org.apache.airavata.model.experiment.UserConfigurationDataModel;
import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.ProcessIdentifier;
@@ -180,15 +178,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
status.setReason("submitted all processes");
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- OrchestratorUtils.updageExperimentStatus(experimentId, status);
+ OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
log.info("expId: {}, Launched experiment ", experimentId);
- ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.LAUNCHED,
- experimentId,
- gatewayId);
- String messageId = AiravataUtils.getId("EXPERIMENT");
- MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- publisher.publish(messageContext);
OrchestratorServerThreadPoolExecutor.getCachedThreadPool().execute(new SingleAppExperimentRunner(experimentId, token, gatewayId));
} else if (executionType == ExperimentType.WORKFLOW) {
//its a workflow execution experiment
@@ -368,7 +359,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
ExperimentStatus status = new ExperimentStatus(ExperimentState.CANCELING);
status.setReason("Experiment cancel request processed");
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- OrchestratorUtils.updageExperimentStatus(experimentId, status);
+ OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
log.info("expId : " + experimentId + " :- Experiment status updated to " + status.getState());
return true;
}
@@ -423,12 +414,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
// ExperimentStatus status = new ExperimentStatus(ExperimentState.LAUNCHED);
// status.setReason("submitted all processes");
// status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-// OrchestratorUtils.updageExperimentStatus(experimentId, status);
+// OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status);
// log.info("expId: {}, Launched experiment ", experimentId);
} catch (Exception e) {
ExperimentStatus status = new ExperimentStatus(ExperimentState.FAILED);
status.setReason("Error while updating task status");
- OrchestratorUtils.updageExperimentStatus(experimentId, status);
+ OrchestratorUtils.updageAndPublishExperimentStatus(experimentId, status, publisher, gatewayId);
log.error("expId: " + experimentId + ", Error while updating task status, hence updated experiment status to " +
ExperimentState.FAILED, e);
ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(ExperimentState.FAILED,
@@ -547,7 +538,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
}
if (status.getState() != null) {
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- OrchestratorUtils.updageExperimentStatus(processIdentity.getExperimentId(), status);
+ OrchestratorUtils.updageAndPublishExperimentStatus(processIdentity.getExperimentId(), status, publisher, gatewayName);
log.info("expId : " + processIdentity.getExperimentId() + " :- Experiment status updated to " +
status.getState());
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/20fe7b44/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
index 834d3b6..0a9617d 100644
--- a/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
+++ b/modules/orchestrator/orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorUtils.java
@@ -20,8 +20,12 @@
*/
package org.apache.airavata.orchestrator.util;
+import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.model.status.ExperimentState;
+import org.apache.airavata.messaging.core.MessageContext;
+import org.apache.airavata.messaging.core.Publisher;
+import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
+import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.status.ExperimentStatus;
import org.apache.airavata.registry.core.experiment.catalog.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.ExperimentCatalogModelType;
@@ -32,14 +36,23 @@ import org.slf4j.LoggerFactory;
public class OrchestratorUtils {
private static final Logger log = LoggerFactory.getLogger(OrchestratorUtils.class);
- public static void updageExperimentStatus(String experimentId, ExperimentStatus status) {
+ public static void updageAndPublishExperimentStatus(String experimentId, ExperimentStatus status, Publisher publisher, String gatewayId) {
try {
RegistryFactory.getDefaultExpCatalog().update(ExperimentCatalogModelType.EXPERIMENT_STATUS, status,
experimentId);
+ ExperimentStatusChangeEvent event = new ExperimentStatusChangeEvent(status.getState(),
+ experimentId,
+ gatewayId);
+ String messageId = AiravataUtils.getId("EXPERIMENT");
+ MessageContext messageContext = new MessageContext(event, MessageType.EXPERIMENT, messageId, gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
} catch (RegistryException e) {
log.error("expId : " + experimentId + " Error while updating experiment status to " + status.toString(), e);
- }
- }
+ } catch (AiravataException e) {
+ log.error("expId : " + experimentId + " Error while publishing experiment status to " + status.toString(), e);
+ }
+ }
public static ExperimentStatus getExperimentStatus(String experimentId) throws RegistryException {
return ((ExperimentStatus) RegistryFactory.getDefaultExpCatalog().get(ExperimentCatalogModelType