You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/04/23 04:08:24 UTC
[1/2] git commit: introducing workflow node status updater + refactor
Repository: airavata
Updated Branches:
refs/heads/master f2b5df444 -> 49eea6f15
introducing workflow node status updater + refactor
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/67b44a15
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/67b44a15
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/67b44a15
Branch: refs/heads/master
Commit: 67b44a15c5957baa18ad3cb883a9e31f3fc0b146
Parents: aed31d3
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Tue Apr 22 19:07:28 2014 -0700
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Tue Apr 22 19:07:28 2014 -0700
----------------------------------------------------------------------
.../main/resources/airavata-server.properties | 2 +-
.../gfac/provider/impl/AbstractProvider.java | 3 +-
.../AiravataExperimentStatusUpdator.java | 2 +-
.../job/monitor/AiravataJobStatusUpdator.java | 41 +------
.../job/monitor/AiravataTaskStatusUpdator.java | 26 ++--
.../AiravataWorkflowNodeStatusUpdator.java | 71 +++++++----
.../airavata/job/monitor/AMQPMonitorTest.java | 2 +-
.../QstatMonitorTestWithMyProxyAuth.java | 2 +-
.../server/OrchestratorServerHandler.java | 123 ++++++++++---------
.../jpa/utils/ThriftDataModelConversion.java | 6 +-
.../org/apache/airavata/server/ServerMain.java | 14 +++
.../job/monitor/ExperimentIdentity.java | 36 ++++++
.../airavata/job/monitor/JobIdentity.java | 39 ++++++
.../apache/airavata/job/monitor/MonitorID.java | 14 ++-
.../airavata/job/monitor/TaskIdentity.java | 38 ++++++
.../job/monitor/WorkflowNodeIdentity.java | 37 ++++++
.../job/monitor/impl/LocalJobMonitor.java | 3 +-
.../job/monitor/impl/push/amqp/AMQPMonitor.java | 3 +-
.../monitor/impl/push/amqp/BasicConsumer.java | 2 +-
.../state/AbstractStateChangeRequest.java | 10 --
.../state/ExperimentStatusChangeRequest.java | 16 ++-
.../monitor/state/JobStatusChangeRequest.java | 30 ++++-
.../monitor/state/TaskStatusChangeRequest.java | 16 ++-
.../state/WorkflowNodeStatusChangeRequest.java | 63 ++++++++++
.../apache/airavata/job/AMQPMonitorTest.java | 2 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 2 +-
26 files changed, 433 insertions(+), 170 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 13f78d5..4a65a71 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -262,7 +262,7 @@ monitors=org.apache.airavata.job.monitor.impl.pull.qstat.QstatMonitor,org.apache
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
-activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.job.monitor.AiravataJobStatusUpdator,org.apache.airavata.job.monitor.AiravataTaskStatusUpdator,org.apache.airavata.job.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.job.monitor.AiravataExperimentStatusUpdator
###---------------------------Orchestrator module Configurations---------------------------###
job.submitter=org.apache.airavata.orchestrator.core.impl.EmbeddedGFACJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
index 3ba02b9..5966233 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/provider/impl/AbstractProvider.java
@@ -27,6 +27,7 @@ import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
+import org.apache.airavata.job.monitor.JobIdentity;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.command.TaskCancelRequest;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -68,7 +69,7 @@ public abstract class AbstractProvider implements GFacProvider{
JobState jobState = jd.getJobStatus().getJobState();
if (jobState!=JobState.CANCELED || jobState!=JobState.CANCELING || jobState!=JobState.COMPLETE || jobState!=JobState.FAILED){
MonitorID monitorId = new MonitorID(null, jd.getJobID(), request.getTaskId(), request.getExperimentId(), null, null);
- monitorPublisher.publish(new JobStatusChangeRequest(monitorId, JobState.CANCELING));
+ monitorPublisher.publish(new JobStatusChangeRequest(monitorId, new JobIdentity(monitorId.getExperimentID(), monitorId.getWorkflowNodeID(), monitorId.getTaskID(), monitorId.getJobID()), JobState.CANCELING));
log.debug("Canceling job "+jd.getJobID());
cancelJob(jd.getJobID(), jobExecutionContext);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
index 5455f1b..f172e6c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataExperimentStatusUpdator.java
@@ -50,7 +50,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
ExperimentState state = experimentStatus.getState();
if (state != null) {
try {
- String experimentID = experimentStatus.getMonitorID().getExperimentID();
+ String experimentID = experimentStatus.getIdentity().getExperimentID();
updateExperimentStatus(experimentID, state);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
index 74fdf43..0a56543 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataJobStatusUpdator.java
@@ -70,48 +70,17 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
JobState state = jobStatus.getState();
if (state != null) {
try {
- String taskID = jobStatus.getMonitorID().getTaskID();
- String jobID = jobStatus.getMonitorID().getJobID();
+ String taskID = jobStatus.getIdentity().getTaskId();
+ String jobID = jobStatus.getIdentity().getJobId();
updateJobStatus(taskID, jobID, state);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
+ logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
switch (state) {
- case COMPLETE:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is DONE");
+ case COMPLETE: case UNKNOWN: case CANCELED:case FAILED:case SUSPENDED:
jobsToMonitor.remove(jobStatus.getMonitorID());
break;
- case UNKNOWN:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is UNKNOWN");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- //todo implement this logic
- break;
- case QUEUED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is QUEUED");
- break;
- case SUBMITTED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUBMITTED");
- break;
- case ACTIVE:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is ACTIVE");
- break;
- case CANCELED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CANCELED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case FAILED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is FAILED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case HELD:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is HELD");
- break;
- case SUSPENDED:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is SUSPENDED");
- jobsToMonitor.remove(jobStatus.getMonitorID());
- break;
- case CANCELING:
- logger.info("Job ID:" + jobStatus.getMonitorID().getJobID() + " is CENCELING");
default:
break;
}
@@ -144,7 +113,7 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener{
break;
}
logger.debug("Publishing Task Status "+state.toString());
- monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getMonitorID(),state));
+ monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
}
public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
index 40b095a..86ae26a 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataTaskStatusUpdator.java
@@ -23,11 +23,11 @@ package org.apache.airavata.job.monitor;
import java.util.Calendar;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
-import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
import org.apache.airavata.job.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
TaskState state = taskStatus.getState();
if (state != null) {
try {
- String taskID = taskStatus.getMonitorID().getTaskID();
+ String taskID = taskStatus.getIdentity().getTaskId();
updateTaskStatus(taskID, state);
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
@@ -64,28 +64,28 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener{
}
@Subscribe
- public void setupExperimentStatus(TaskStatusChangeRequest taskStatus){
- ExperimentState state=ExperimentState.UNKNOWN;
+ public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
+ WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
switch(taskStatus.getState()){
case CANCELED:
- state=ExperimentState.CANCELED; break;
+ state=WorkflowNodeState.CANCELED; break;
case COMPLETED:
- state=ExperimentState.COMPLETED; break;
+ state=WorkflowNodeState.COMPLETED; break;
case CONFIGURING_WORKSPACE:
- state=ExperimentState.LAUNCHED; break;
+ state=WorkflowNodeState.INVOKED; break;
case FAILED:
- state=ExperimentState.FAILED; break;
+ state=WorkflowNodeState.FAILED; break;
case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
- state=ExperimentState.EXECUTING; break;
+ state=WorkflowNodeState.EXECUTING; break;
case STARTED:
- state=ExperimentState.LAUNCHED; break;
+ state=WorkflowNodeState.INVOKED; break;
case CANCELING:
- state=ExperimentState.CANCELING; break;
+ state=WorkflowNodeState.CANCELING; break;
default:
break;
}
logger.debug("Publishing Experiment Status "+state.toString());
- monitorPublisher.publish(new ExperimentStatusChangeRequest(taskStatus.getMonitorID(),state));
+ monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
}
public void updateTaskStatus(String taskId, TaskState state) throws Exception {
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
index f6dc360..cd07a1d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/job/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -22,9 +22,13 @@ package org.apache.airavata.job.monitor;
import java.util.Calendar;
+import org.apache.airavata.job.monitor.event.MonitorPublisher;
import org.apache.airavata.job.monitor.state.ExperimentStatusChangeRequest;
-import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.job.monitor.state.WorkflowNodeStatusChangeRequest;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
import org.slf4j.Logger;
@@ -37,7 +41,7 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
private Registry airavataRegistry;
-// private MonitorPublisher monitorPublisher;
+ private MonitorPublisher monitorPublisher;
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -48,29 +52,52 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
}
@Subscribe
- public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
-// ExperimentState state = experimentStatus.getState();
-// if (state != null) {
-// try {
-// String experimentID = experimentStatus.getMonitorID().getExperimentID();
-// updateWorkflowNodeStatus(experimentID, state);
-// } catch (Exception e) {
-// logger.error("Error persisting data" + e.getLocalizedMessage(), e);
-// }
-// }
+ public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
+ WorkflowNodeState state = workflowNodeStatus.getState();
+ if (state != null) {
+ try {
+ String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
+ updateWorkflowNodeStatus(workflowNodeID, state);
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+ }
+
+ @Subscribe
+ public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus){
+ ExperimentState state=ExperimentState.UNKNOWN;
+ switch(nodeStatus.getState()){
+ case CANCELED:
+ state=ExperimentState.CANCELED; break;
+ case COMPLETED:
+ state=ExperimentState.COMPLETED; break;
+ case INVOKED:
+ state=ExperimentState.LAUNCHED; break;
+ case FAILED:
+ state=ExperimentState.FAILED; break;
+ case EXECUTING:
+ state=ExperimentState.EXECUTING; break;
+ case CANCELING:
+ state=ExperimentState.CANCELING; break;
+ default:
+ break;
+ }
+ logger.debug("Publishing Experiment Status "+state.toString());
+ monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(),state));
}
- public void updateWorkflowNodeStatus(String experimentId, ExperimentState state) throws Exception {
- Experiment details = (Experiment)airavataRegistry.get(DataType.EXPERIMENT, experimentId);
+ public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+ WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(DataType.WORKFLOW_NODE_DETAIL, workflowNodeId);
if(details == null) {
- details = new Experiment();
- details.setExperimentID(experimentId);
+ details = new WorkflowNodeDetails();
+ details.setNodeInstanceId(workflowNodeId);
}
- org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
- status.setExperimentState(state);
+ WorkflowNodeStatus status = new WorkflowNodeStatus();
+ status.setWorkflowNodeState(state);
status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setExperimentStatus(status);
- airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.EXPERIMENT, details, experimentId);
+ details.setWorkflowNodeStatus(status);
+ airavataRegistry.update(org.apache.airavata.registry.cpi.DataType.WORKFLOW_NODE_DETAIL, details, workflowNodeId);
}
@Override
@@ -78,8 +105,8 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
for (Object configuration : configurations) {
if (configuration instanceof Registry){
this.airavataRegistry=(Registry)configuration;
-// } else if (configuration instanceof MonitorPublisher){
-// this.monitorPublisher=(MonitorPublisher) configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
index 65ab8d0..cb16540 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
@@ -134,7 +134,7 @@ public class AMQPMonitorTest {
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println(jobID);
try {
- monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, "ogce"));
+ monitorManager.addAJobToMonitor(new MonitorID(hostDescription, jobID,null,null, null, "ogce"));
} catch (AiravataMonitorException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
index 33ffa09..5d7314a 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/QstatMonitorTestWithMyProxyAuth.java
@@ -131,7 +131,7 @@ public class QstatMonitorTestWithMyProxyAuth {
for (int i = 0; i < 1; i++) {
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println("Job submitted successfully, Job ID: " + jobID);
- MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+ MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce");
monitorID.setAuthenticationInfo(authenticationInfo);
try {
monitorManager.addAJobToMonitor(monitorID);
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index bb9865d..26f447e 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -49,6 +49,8 @@ import org.apache.airavata.orchestrator.cpi.impl.SimpleOrchestratorImpl;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.DataType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
+import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -101,29 +103,25 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
String proxyPath = ServerSettings.getSetting("proxy.file.path");
String connectionName = ServerSettings.getSetting("connection.name");
- if (monitors == null) {
- log.error("Error loading primaryMonitor and there has to be a primary monitor");
- } else {
- for (String monitorClass : monitorList) {
- Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
- Monitor monitor = aClass.newInstance();
- if (monitor instanceof PullMonitor) {
- if (monitor instanceof QstatMonitor) {
- monitorManager.addQstatMonitor((QstatMonitor) monitor);
- }
- } else if (monitor instanceof PushMonitor) {
- if (monitor instanceof AMQPMonitor) {
- ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
- monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
- }
- } else if(monitor instanceof LocalJobMonitor){
- monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
- } else {
- log.error("Wrong class is given to primary Monitor");
+ for (String monitorClass : monitorList) {
+ Class<? extends Monitor> aClass = Class.forName(monitorClass).asSubclass(Monitor.class);
+ Monitor monitor = aClass.newInstance();
+ if (monitor instanceof PullMonitor) {
+ if (monitor instanceof QstatMonitor) {
+ monitorManager.addQstatMonitor((QstatMonitor) monitor);
}
+ } else if (monitor instanceof PushMonitor) {
+ if (monitor instanceof AMQPMonitor) {
+ ((AMQPMonitor) monitor).initialize(proxyPath, connectionName, list);
+ monitorManager.addAMQPMonitor((AMQPMonitor) monitor);
+ }
+ } else if(monitor instanceof LocalJobMonitor){
+ monitorManager.addLocalMonitor((LocalJobMonitor)monitor);
+ } else {
+ log.error("Wrong class is given to primary Monitor");
}
-
}
+
monitorManager.registerListener(orchestrator);
// Now Monitor Manager is properly configured, now we have to start the monitoring system.
// This will initialize all the required threads and required queues
@@ -163,46 +161,51 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
if (tasks.size() > 1) {
log.info("There are multiple tasks for this experiment, So Orchestrator will launch multiple Jobs");
}
- for (TaskDetails taskID : tasks) {
- //iterate through all the generated tasks and performs the job submisssion+monitoring
- String jobID = null;
- Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
- if (experiment == null) {
- log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
- return false;
- }
- String userName = experiment.getUserName();
-
- HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
-
- // creating monitorID to register with monitoring queue
- // this is a special case because amqp has to be in place before submitting the job
- if ((hostDescription instanceof GsisshHostType) &&
- Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
- monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), experimentId, userName);
- monitorManager.addAJobToMonitor(monitorID);
- jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
- if("none".equals(jobID)) {
- log.error("Job submission Failed, so we remove the job from monitoring");
-
- }else{
- log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
- }
- } else {
- // Launching job for each task
- // if the monitoring is pull mode then we add the monitorID for each task after submitting
- // the job with the jobID, otherwise we don't need the jobID
- jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
- log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
- monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), experimentId, userName, authenticationInfo);
- if("none".equals(jobID)) {
- log.error("Job submission Failed, so we remove the job from monitoring");
-
- }else{
- monitorManager.addAJobToMonitor(monitorID);
- }
- }
- }
+ List<String> ids = registry.getIds(DataType.WORKFLOW_NODE_DETAIL,WorkflowNodeConstants.EXPERIMENT_ID,experimentId);
+ for (String workflowNodeId : ids) {
+ List<Object> taskDetailList = registry.get(DataType.TASK_DETAIL,TaskDetailConstants.NODE_ID,workflowNodeId);
+ for (Object o : taskDetailList) {
+ TaskDetails taskID=(TaskDetails)o;
+ //iterate through all the generated tasks and performs the job submisssion+monitoring
+ String jobID = null;
+ Experiment experiment = (Experiment) registry.get(DataType.EXPERIMENT, experimentId);
+ if (experiment == null) {
+ log.error("Error retrieving the Experiment by the given experimentID: " + experimentId);
+ return false;
+ }
+ String userName = experiment.getUserName();
+
+ HostDescription hostDescription = OrchestratorUtils.getHostDescription(orchestrator, taskID);
+
+ // creating monitorID to register with monitoring queue
+ // this is a special case because amqp has to be in place before submitting the job
+ if ((hostDescription instanceof GsisshHostType) &&
+ Constants.PUSH.equals(((GsisshHostType) hostDescription).getMonitorMode())) {
+ monitorID = new MonitorID(hostDescription, null, taskID.getTaskID(), workflowNodeId, experimentId, userName);
+ monitorManager.addAJobToMonitor(monitorID);
+ jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+ if("none".equals(jobID)) {
+ log.error("Job submission Failed, so we remove the job from monitoring");
+
+ }else{
+ log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+ }
+ } else {
+ // Launching job for each task
+ // if the monitoring is pull mode then we add the monitorID for each task after submitting
+ // the job with the jobID, otherwise we don't need the jobID
+ jobID = orchestrator.launchExperiment(experimentId, taskID.getTaskID());
+ log.info("Job Launched to the resource by GFAC and jobID returned : " + jobID);
+ monitorID = new MonitorID(hostDescription, jobID, taskID.getTaskID(), workflowNodeId, experimentId, userName, authenticationInfo);
+ if("none".equals(jobID)) {
+ log.error("Job submission Failed, so we remove the job from monitoring");
+
+ }else{
+ monitorManager.addAJobToMonitor(monitorID);
+ }
+ }
+ }
+ }
} catch (Exception e) {
throw new TException(e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
----------------------------------------------------------------------
diff --git a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
index 5ed0bce..409f8a6 100644
--- a/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
+++ b/modules/registry/airavata-jpa-registry/src/main/java/org/apache/airavata/persistance/registry/jpa/utils/ThriftDataModelConversion.java
@@ -39,8 +39,10 @@ public class ThriftDataModelConversion {
if (pr != null) {
project.setProjectID(pr.getName());
project.setName(pr.getName());
- project.setCreationTime(pr.getCreationTime().getTime());
- project.setDescription(pr.getDescription());
+ if (pr.getCreationTime()!=null) {
+ project.setCreationTime(pr.getCreationTime().getTime());
+ }
+ project.setDescription(pr.getDescription());
project.setOwner(pr.getWorker().getUser());
List<ProjectUserResource> projectUserList = pr.getProjectUserList();
List<String> sharedUsers = new ArrayList<String>();
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
----------------------------------------------------------------------
diff --git a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
index 69964ef..80e0c99 100644
--- a/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
+++ b/modules/server/src/main/java/org/apache/airavata/server/ServerMain.java
@@ -49,6 +49,7 @@ public class ServerMain {
private static int serverPID=-1;
private static final String serverStartedFileNamePrefix = "server-start";
private static boolean systemShutDown=false;
+ private static boolean shutdownHookCalledBefore=false;
static{
servers = new ArrayList<IServer>();
}
@@ -89,6 +90,19 @@ public class ServerMain {
});
}
+// private static void addSecondaryShutdownHook(){
+// Runtime.getRuntime().addShutdownHook(new Thread(){
+// @Override
+// public void run() {
+// System.out.print("Graceful shutdown attempt is still active. Do you want to exit instead? (y/n)");
+// String command=System.console().readLine().trim().toLowerCase();
+// if (command.equals("yes") || command.equals("y")){
+// System.exit(1);
+// }
+// }
+// });
+// }
+
public static void main(String args[]) throws ParseException, IOException {
AiravataUtils.setExecutionAsServer();
CommandLineParameters commandLineParameters = StringUtil.getCommandLineParser(args);
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
new file mode 100644
index 0000000..652ad1d
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/ExperimentIdentity.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class ExperimentIdentity {
+ private String experimentID;
+ public ExperimentIdentity(String experimentId) {
+ setExperimentID(experimentId);
+ }
+ public String getExperimentID() {
+ return experimentID;
+ }
+
+ public void setExperimentID(String experimentID) {
+ this.experimentID = experimentID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
new file mode 100644
index 0000000..5753d9d
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/JobIdentity.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class JobIdentity extends TaskIdentity {
+ private String jobId;
+
+ public JobIdentity(String experimentId, String workflowNodeId, String taskId, String jobId) {
+ super(experimentId,workflowNodeId,taskId);
+ setJobId(jobId);
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
index bd6bfcb..241e3b0 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/MonitorID.java
@@ -52,7 +52,7 @@ public class MonitorID {
private String experimentID;
-// private String workflowNodeID;
+ private String workflowNodeID;
private String taskID;
@@ -62,7 +62,7 @@ public class MonitorID {
private JobState state;
- public MonitorID(HostDescription host, String jobID,String taskID, String experimentID, String userName) {
+ public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName) {
this.host = host;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.userName = userName;
@@ -71,7 +71,7 @@ public class MonitorID {
this.experimentID = experimentID;
}
- public MonitorID(HostDescription host, String jobID,String taskID,String experimentID, String userName,AuthenticationInfo authenticationInfo) {
+ public MonitorID(HostDescription host, String jobID,String taskID, String workflowNodeID, String experimentID, String userName,AuthenticationInfo authenticationInfo) {
this.host = host;
this.jobStartedTime = new Timestamp((new Date()).getTime());
this.authenticationInfo = authenticationInfo;
@@ -207,6 +207,14 @@ public class MonitorID {
}
}
+ public String getWorkflowNodeID() {
+ return workflowNodeID;
+ }
+
+ public void setWorkflowNodeID(String workflowNodeID) {
+ this.workflowNodeID = workflowNodeID;
+ }
+
// public String getWorkflowNodeID() {
// return workflowNodeID;
// }
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
new file mode 100644
index 0000000..f7bc785
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/TaskIdentity.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class TaskIdentity extends WorkflowNodeIdentity {
+ private String taskId;
+
+ public TaskIdentity(String experimentId, String workflowNodeId, String taskId) {
+ super(experimentId,workflowNodeId);
+ setTaskId(taskId);
+ }
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
new file mode 100644
index 0000000..a8fe09f
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/WorkflowNodeIdentity.java
@@ -0,0 +1,37 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.job.monitor;
+
+public class WorkflowNodeIdentity extends ExperimentIdentity {
+ private String workflowNodeID;
+ public WorkflowNodeIdentity(String experimentId, String workflowNodeId) {
+ super(experimentId);
+ setWorkflowNodeID(workflowNodeId);
+ }
+ public String getWorkflowNodeID() {
+ return workflowNodeID;
+ }
+
+ public void setWorkflowNodeID(String workflowNodeID) {
+ this.workflowNodeID = workflowNodeID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
index de7cf90..ec4cb40 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/LocalJobMonitor.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job.monitor.impl;
import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.job.monitor.JobIdentity;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.AiravataAbstractMonitor;
import org.apache.airavata.job.monitor.state.JobStatusChangeRequest;
@@ -41,7 +42,7 @@ public class LocalJobMonitor extends AiravataAbstractMonitor {
do {
try {
MonitorID take = jobQueue.take();
- getPublisher().publish(new JobStatusChangeRequest(take, JobState.COMPLETE));
+ getPublisher().publish(new JobStatusChangeRequest(take, new JobIdentity(take.getExperimentID(), take.getWorkflowNodeID(), take.getTaskID(), take.getJobID()), JobState.COMPLETE));
} catch (Exception e) {
e.printStackTrace();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
index 88a5198..dc6d193 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/AMQPMonitor.java
@@ -30,6 +30,7 @@ import java.util.concurrent.BlockingQueue;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.job.monitor.JobIdentity;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.job.monitor.core.PushMonitor;
import org.apache.airavata.job.monitor.event.MonitorPublisher;
@@ -201,7 +202,7 @@ public class AMQPMonitor extends PushMonitor {
}
}
next.setStatus(monitorID.getStatus());
- publisher.publish(new JobStatusChangeRequest(next,next.getStatus()));
+ publisher.publish(new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()),next.getStatus()));
return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
index 53bcc8b..5a2d40d 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/impl/push/amqp/BasicConsumer.java
@@ -76,7 +76,7 @@ public class BasicConsumer implements Consumer {
logger.debug("************************************************************");
try {
String jobID = envelope.getRoutingKey().split("\\.")[0];
- MonitorID monitorID = new MonitorID(null, jobID, null, null, null);
+ MonitorID monitorID = new MonitorID(null, jobID, null, null, null, null);
monitorID.setStatus(parser.parseMessage(message));
publisher.publish(monitorID);
} catch (AiravataMonitorException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
index 909f10e..bacd8df 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/AbstractStateChangeRequest.java
@@ -21,17 +21,7 @@
package org.apache.airavata.job.monitor.state;
-import org.apache.airavata.job.monitor.MonitorID;
public abstract class AbstractStateChangeRequest implements PublisherMessage{
- private MonitorID monitorID;
-
- public MonitorID getMonitorID() {
- return monitorID;
- }
-
- public void setMonitorID(MonitorID monitorID) {
- this.monitorID = monitorID;
- }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
index d664161..9bee5ca 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/ExperimentStatusChangeRequest.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.job.monitor.state;
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.ExperimentIdentity;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
/**
@@ -32,15 +32,15 @@ import org.apache.airavata.model.workspace.experiment.ExperimentState;
*/
public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{
private ExperimentState state;
-
+ private ExperimentIdentity identity;
// this constructor can be used in Qstat monitor to handle errors
public ExperimentStatusChangeRequest() {
}
- public ExperimentStatusChangeRequest(MonitorID monitorID, ExperimentState state) {
- setMonitorID(monitorID);
+ public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
this.state = state;
+ setIdentity(experimentIdentity);
}
public ExperimentState getState() {
@@ -51,5 +51,13 @@ public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest{
this.state = state;
}
+ public ExperimentIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(ExperimentIdentity identity) {
+ this.identity = identity;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
index 9669b75..0db9da6 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/JobStatusChangeRequest.java
@@ -20,11 +20,10 @@
*/
package org.apache.airavata.job.monitor.state;
+import org.apache.airavata.job.monitor.JobIdentity;
import org.apache.airavata.job.monitor.MonitorID;
import org.apache.airavata.model.workspace.experiment.JobState;
-import java.util.Properties;
-
/**
* This is the primary job state object used in
* through out the monitor module. This use airavata-data-model JobState enum
@@ -34,15 +33,18 @@ import java.util.Properties;
*/
public class JobStatusChangeRequest extends AbstractStateChangeRequest{
private JobState state;
+ private JobIdentity identity;
-
+ private MonitorID monitorID;
+
// this constructor can be used in Qstat monitor to handle errors
public JobStatusChangeRequest() {
}
- public JobStatusChangeRequest(MonitorID monitorID, JobState state) {
- setMonitorID(monitorID);
- this.state = state;
+ public JobStatusChangeRequest(MonitorID monitorID, JobIdentity jobId, JobState state) {
+ setIdentity(jobId);
+ setMonitorID(monitorID);
+ this.state = state;
}
public JobState getState() {
@@ -53,4 +55,20 @@ public class JobStatusChangeRequest extends AbstractStateChangeRequest{
this.state = state;
}
+ public JobIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(JobIdentity identity) {
+ this.identity = identity;
+ }
+
+ public MonitorID getMonitorID() {
+ return monitorID;
+ }
+
+ public void setMonitorID(MonitorID monitorID) {
+ this.monitorID = monitorID;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
index f35310b..e8e58db 100644
--- a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/TaskStatusChangeRequest.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.job.monitor.state;
-import org.apache.airavata.job.monitor.MonitorID;
+import org.apache.airavata.job.monitor.TaskIdentity;
import org.apache.airavata.model.workspace.experiment.TaskState;
/**
@@ -32,14 +32,14 @@ import org.apache.airavata.model.workspace.experiment.TaskState;
*/
public class TaskStatusChangeRequest extends AbstractStateChangeRequest{
private TaskState state;
-
+ private TaskIdentity identity;
// this constructor can be used in Qstat monitor to handle errors
public TaskStatusChangeRequest() {
}
- public TaskStatusChangeRequest(MonitorID monitorID, TaskState state) {
- setMonitorID(monitorID);
+ public TaskStatusChangeRequest(TaskIdentity taskIdentity, TaskState state) {
this.state = state;
+ setIdentity(taskIdentity);
}
public TaskState getState() {
@@ -50,4 +50,12 @@ public class TaskStatusChangeRequest extends AbstractStateChangeRequest{
this.state = state;
}
+ public TaskIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(TaskIdentity identity) {
+ this.identity = identity;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
new file mode 100644
index 0000000..7e58e35
--- /dev/null
+++ b/tools/job-monitor/src/main/java/org/apache/airavata/job/monitor/state/WorkflowNodeStatusChangeRequest.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.job.monitor.state;
+
+import org.apache.airavata.job.monitor.WorkflowNodeIdentity;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest{
+ private WorkflowNodeState state;
+ private WorkflowNodeIdentity identity;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public WorkflowNodeStatusChangeRequest() {
+ }
+
+ public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
+ this.state = state;
+ setIdentity(identity);
+ }
+
+ public WorkflowNodeState getState() {
+ return state;
+ }
+
+ public void setState(WorkflowNodeState state) {
+ this.state = state;
+ }
+
+ public WorkflowNodeIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(WorkflowNodeIdentity identity) {
+ this.identity = identity;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 980c2fa..c0e579e 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -150,7 +150,7 @@ public class AMQPMonitorTest {
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println(jobID);
try {
- pushQueue.add(new MonitorID(hostDescription, jobID,null,null, "ogce"));
+ pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce"));
} catch (Exception e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/67b44a15/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 735d1d2..d85f465 100644
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -144,7 +144,7 @@ public class QstatMonitorTestWithMyProxyAuth {
for (int i = 0; i < 1; i++) {
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println("Job submitted successfully, Job ID: " + jobID);
- MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null, "ogce");
+ MonitorID monitorID = new MonitorID(hostDescription, jobID,null,null,null, "ogce");
monitorID.setAuthenticationInfo(authenticationInfo);
try {
org.apache.airavata.job.monitor.util.CommonUtils.addMonitortoQueue(pullQueue, monitorID);
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sa...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/49eea6f1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/49eea6f1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/49eea6f1
Branch: refs/heads/master
Commit: 49eea6f151b28f55933dc370cbc3f31c987eb030
Parents: 67b44a1 f2b5df4
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Tue Apr 22 19:08:02 2014 -0700
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Tue Apr 22 19:08:02 2014 -0700
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 6 +-
modules/airavata-client/pom.xml | 12 +-
.../context/security/SSHSecurityContext.java | 120 ---------
.../gfac/handler/AdvancedSCPInputHandler.java | 139 ----------
.../gfac/handler/AdvancedSCPOutputHandler.java | 115 --------
.../gfac/handler/SCPDirectorySetupHandler.java | 100 -------
.../airavata/gfac/handler/SCPInputHandler.java | 138 ----------
.../airavata/gfac/handler/SCPOutputHandler.java | 175 -------------
.../gfac/provider/impl/SSHProvider.java | 261 -------------------
.../services/impl/BigRed2TestWithSSHAuth.java | 253 ------------------
.../impl/SSHProviderTestWithSSHAuth.java | 172 ------------
.../airavata/job/monitor/AMQPMonitorTest.java | 18 +-
modules/gfac/gfac-ssh/pom.xml | 132 ++++++++++
.../context/security/SSHSecurityContext.java | 120 +++++++++
.../gfac/handler/AdvancedSCPInputHandler.java | 139 ++++++++++
.../gfac/handler/AdvancedSCPOutputHandler.java | 115 ++++++++
.../gfac/handler/SCPDirectorySetupHandler.java | 100 +++++++
.../airavata/gfac/handler/SCPInputHandler.java | 138 ++++++++++
.../airavata/gfac/handler/SCPOutputHandler.java | 175 +++++++++++++
.../gfac/provider/impl/SSHProvider.java | 261 +++++++++++++++++++
.../src/main/resources/errors.properties | 197 ++++++++++++++
.../src/main/resources/service.properties | 58 +++++
.../services/impl/BigRed2TestWithSSHAuth.java | 253 ++++++++++++++++++
.../impl/SSHProviderTestWithSSHAuth.java | 172 ++++++++++++
.../src/test/resources/PBSTemplate.xslt | 73 ++++++
.../gfac-ssh/src/test/resources/gfac-config.xml | 33 +++
.../src/test/resources/logging.properties | 42 +++
modules/gfac/pom.xml | 1 +
.../airavata/gsi/ssh/config/ConfigReader.java | 6 +
.../apache/airavata/job/AMQPMonitorTest.java | 22 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 10 +-
31 files changed, 2045 insertions(+), 1511 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/49eea6f1/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/monitor/AMQPMonitorTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/49eea6f1/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --cc tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index c0e579e,83586b3..f2b9ccd
--- a/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/tools/job-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@@ -150,9 -150,9 +150,9 @@@ public class AMQPMonitorTest
String jobID = pbsCluster.submitBatchJob(jobDescriptor);
System.out.println(jobID);
try {
- pushQueue.add(new MonitorID(hostDescription, jobID,null,null, "ogce"));
+ pushQueue.add(new MonitorID(hostDescription, jobID,null,null,null, "ogce"));
} catch (Exception e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
}
try {
pushThread.join();
http://git-wip-us.apache.org/repos/asf/airavata/blob/49eea6f1/tools/job-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------