You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sa...@apache.org on 2014/07/19 16:08:29 UTC
[1/3] git commit: removing unwanted expeirment status publishing in
the gfac
Repository: airavata
Updated Branches:
refs/heads/workflow-support 4752d1a3f -> 92838e4a8
removing unwanted expeirment status publishing in the gfac
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/601d5add
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/601d5add
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/601d5add
Branch: refs/heads/workflow-support
Commit: 601d5add58e51046ffffa974f0dcb3b686c7db4e
Parents: 4752d1a
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Sat Jul 12 16:20:46 2014 -0400
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Sat Jul 12 16:20:46 2014 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 50 ++++++------
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 25 +++---
.../AiravataExperimentStatusUpdator.java | 45 ++++++++++-
.../core/monitor/AiravataJobStatusUpdator.java | 44 ++---------
.../core/monitor/AiravataTaskStatusUpdator.java | 75 ++++++++++--------
.../AiravataWorkflowNodeStatusUpdator.java | 80 +++++++++-----------
.../state/ExperimentStatusChangeRequest.java | 63 ---------------
.../state/ExperimentStatusChangedEvent.java | 63 +++++++++++++++
.../monitor/state/JobStatusChangedEvent.java | 80 ++++++++++++++++++++
.../monitor/state/TaskStatusChangedEvent.java | 61 +++++++++++++++
.../state/WorkflowNodeStatusChangeRequest.java | 63 ---------------
.../state/WorkflowNodeStatusChangedEvent.java | 63 +++++++++++++++
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 9 ++-
13 files changed, 439 insertions(+), 282 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 24ab0c3..62c44ab 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -56,10 +56,11 @@ import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
@@ -540,15 +541,15 @@ public class BetterGfacImpl implements GFac {
try {
// we make the experiment as failed due to exception scenario
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.FAILED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.FAILED));
// Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
- ));
+// monitorPublisher.publish(new TaskStatusChangedEvent(
+// new TaskIdentity(jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+// ));
monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -585,15 +586,15 @@ public class BetterGfacImpl implements GFac {
try {
// we make the experiment as failed due to exception scenario
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.FAILED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.FAILED));
// Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
- ));
+// monitorPublisher.publish(new TaskStatusChangeRequest(
+// new TaskIdentity(jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+// ));
monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -771,9 +772,9 @@ public class BetterGfacImpl implements GFac {
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.COMPLETED));
// Updating the task status if there's any task associated
monitorPublisher.publish(new TaskStatusChangeRequest(
new TaskIdentity(jobExecutionContext.getExperimentID(),
@@ -900,11 +901,12 @@ public class BetterGfacImpl implements GFac {
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.COMPLETED));
// Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
+
+ monitorPublisher.publish(new TaskStatusChangedEvent(
new TaskIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 34800e9..2065cee 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -44,9 +44,10 @@ import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
import org.apache.airavata.gfac.core.monitor.*;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
@@ -322,15 +323,15 @@ public class GFacImpl implements GFac {
} catch (Exception e) {
try {
// we make the experiment as failed due to exception scenario
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.FAILED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.FAILED));
// Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
- ));
+// monitorPublisher.publish(new TaskStatusChangedEvent(
+// new TaskIdentity(jobExecutionContext.getExperimentID(),
+// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+// jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+// ));
monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
new JobIdentity(jobExecutionContext.getExperimentID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -449,9 +450,9 @@ public class GFacImpl implements GFac {
monitorPublisher.publish(GfacExperimentState.COMPLETED);
// At this point all the execution is finished so we update the task and experiment statuses.
// Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
+// monitorPublisher.publish(new
+// ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+// ExperimentState.COMPLETED));
// Updating the task status if there's any task associated
monitorPublisher.publish(new TaskStatusChangeRequest(
new TaskIdentity(jobExecutionContext.getExperimentID(),
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
index f5d82ff..664f237 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
@@ -21,7 +21,10 @@
package org.apache.airavata.gfac.core.monitor;
import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
+import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.registry.cpi.RegistryModelType;
@@ -35,6 +38,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
private Registry airavataRegistry;
+ private MonitorPublisher monitorPublisher;
public Registry getAiravataRegistry() {
return airavataRegistry;
@@ -45,7 +49,7 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
@Subscribe
- public void updateRegistry(ExperimentStatusChangeRequest experimentStatus) {
+ public void updateRegistry(ExperimentStatusChangedEvent experimentStatus) {
ExperimentState state = experimentStatus.getState();
if (state != null) {
try {
@@ -57,6 +61,41 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}
}
+
+ @Subscribe
+ public void setupExperimentStatus(WorkflowNodeStatusChangedEvent nodeStatus) {
+ ExperimentState state = ExperimentState.UNKNOWN;
+ switch (nodeStatus.getState()) {
+ case CANCELED:
+ state = ExperimentState.CANCELED;
+ break;
+ case COMPLETED:
+ state = ExperimentState.COMPLETED;
+ break;
+ case INVOKED:
+ state = ExperimentState.LAUNCHED;
+ break;
+ case FAILED:
+ state = ExperimentState.FAILED;
+ break;
+ case EXECUTING:
+ state = ExperimentState.EXECUTING;
+ break;
+ case CANCELING:
+ state = ExperimentState.CANCELING;
+ break;
+ default:
+ break;
+ }
+ try {
+ updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state);
+ logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString());
+ monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state));
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+
public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString());
Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
@@ -76,6 +115,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
for (Object configuration : configurations) {
if (configuration instanceof Registry){
this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 29ee82f..6ad55d2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -20,21 +20,20 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
+
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
-import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Calendar;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.eventbus.Subscribe;
public class AiravataJobStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataJobStatusUpdator.class);
@@ -64,43 +63,16 @@ public class AiravataJobStatusUpdator implements AbstractActivityListener {
String taskID = jobStatus.getIdentity().getTaskId();
String jobID = jobStatus.getIdentity().getJobId();
updateJobStatus(taskID, jobID, state);
+ logger.debug("Publishing job status for "+jobStatus.getIdentity().getJobId()+":"+state.toString());
+ monitorPublisher.publish(new JobStatusChangedEvent(jobStatus.getMonitorID(),jobStatus.getIdentity(),state));
} catch (Exception e) {
logger.error("Error persisting data" + e.getLocalizedMessage(), e);
}
- logger.info("Job ID:" + jobStatus.getIdentity().getJobId() + " is "+state.toString());
}
}
- @Subscribe
- public void setupTaskStatus(JobStatusChangeRequest jobStatus){
- TaskState state=TaskState.UNKNOWN;
- switch(jobStatus.getState()){
- case ACTIVE:
- state=TaskState.EXECUTING; break;
- case CANCELED:
- state=TaskState.CANCELED; break;
- case COMPLETE:
- state=TaskState.COMPLETED; break;
- case FAILED:
- state=TaskState.FAILED; break;
- case HELD: case SUSPENDED: case QUEUED:
- state=TaskState.WAITING; break;
- case SETUP:
- state=TaskState.PRE_PROCESSING; break;
- case SUBMITTED:
- state=TaskState.STARTED; break;
- case UN_SUBMITTED:
- state=TaskState.CANCELED; break;
- case CANCELING:
- state=TaskState.CANCELING; break;
- default:
- break;
- }
- logger.debug("Publishing Task Status "+state.toString());
- monitorPublisher.publish(new TaskStatusChangeRequest(jobStatus.getIdentity(),state));
- }
-
public void updateJobStatus(String taskId, String jobID, JobState state) throws Exception {
+ logger.debug("Updating job status for "+jobID+":"+state.toString());
CompositeIdentifier ids = new CompositeIdentifier(taskId, jobID);
JobDetails details = (JobDetails)airavataRegistry.get(RegistryModelType.JOB_DETAIL, ids);
if(details == null) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index a6ab613..26d49c0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -20,19 +20,20 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import com.google.common.eventbus.Subscribe;
+import java.util.Calendar;
+
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
-import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Calendar;
+import com.google.common.eventbus.Subscribe;
public class AiravataTaskStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataTaskStatusUpdator.class);
@@ -48,46 +49,54 @@ public class AiravataTaskStatusUpdator implements AbstractActivityListener {
public void setAiravataRegistry(Registry airavataRegistry) {
this.airavataRegistry = airavataRegistry;
}
-
+
@Subscribe
- public void updateRegistry(TaskStatusChangeRequest taskStatus) {
- TaskState state = taskStatus.getState();
- if (state != null) {
- try {
- String taskID = taskStatus.getIdentity().getTaskId();
- updateTaskStatus(taskID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
+ public void setupTaskStatus(TaskStatusChangeRequest taskStatus){
+ try {
+ updateTaskStatus(taskStatus.getIdentity().getTaskId(), taskStatus.getState());
+ logger.debug("Publishing task status for "+taskStatus.getIdentity().getTaskId()+":"+taskStatus.getState().toString());
+ monitorPublisher.publish(new TaskStatusChangedEvent(taskStatus.getIdentity(),taskStatus.getState()));
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
}
-
+
@Subscribe
- public void setupWorkflowNodeStatus(TaskStatusChangeRequest taskStatus){
- WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
- switch(taskStatus.getState()){
+ public void setupTaskStatus(JobStatusChangedEvent jobStatus){
+ TaskState state=TaskState.UNKNOWN;
+ switch(jobStatus.getState()){
+ case ACTIVE:
+ state=TaskState.EXECUTING; break;
case CANCELED:
- state=WorkflowNodeState.CANCELED; break;
- case COMPLETED:
- state=WorkflowNodeState.COMPLETED; break;
- case CONFIGURING_WORKSPACE:
- state=WorkflowNodeState.INVOKED; break;
+ state=TaskState.CANCELED; break;
+ case COMPLETE:
+ state=TaskState.COMPLETED; break;
case FAILED:
- state=WorkflowNodeState.FAILED; break;
- case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
- state=WorkflowNodeState.EXECUTING; break;
- case STARTED:
- state=WorkflowNodeState.INVOKED; break;
+ state=TaskState.FAILED; break;
+ case HELD: case SUSPENDED: case QUEUED:
+ state=TaskState.WAITING; break;
+ case SETUP:
+ state=TaskState.PRE_PROCESSING; break;
+ case SUBMITTED:
+ state=TaskState.STARTED; break;
+ case UN_SUBMITTED:
+ state=TaskState.CANCELED; break;
case CANCELING:
- state=WorkflowNodeState.CANCELING; break;
+ state=TaskState.CANCELING; break;
default:
break;
}
- logger.debug("Publishing Experiment Status "+state.toString());
- monitorPublisher.publish(new WorkflowNodeStatusChangeRequest(taskStatus.getIdentity(),state));
+ try {
+ updateTaskStatus(jobStatus.getIdentity().getTaskId(), state);
+ logger.debug("Publishing task status for "+jobStatus.getIdentity().getTaskId()+":"+state.toString());
+ monitorPublisher.publish(new TaskStatusChangedEvent(jobStatus.getIdentity(),state));
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
}
public void updateTaskStatus(String taskId, TaskState state) throws Exception {
+ logger.debug("Updating task status for "+taskId+":"+state.toString());
TaskDetails details = (TaskDetails)airavataRegistry.get(RegistryModelType.TASK_DETAIL, taskId);
if(details == null) {
details = new TaskDetails();
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index 8e92e87..5f6629c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -20,20 +20,20 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import com.google.common.eventbus.Subscribe;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangeRequest;
+import java.util.Calendar;
+
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
+import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
-import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Calendar;
+import com.google.common.eventbus.Subscribe;
public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListener {
private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
@@ -51,49 +51,37 @@ public class AiravataWorkflowNodeStatusUpdator implements AbstractActivityListen
}
@Subscribe
- public void updateRegistry(WorkflowNodeStatusChangeRequest workflowNodeStatus) {
- WorkflowNodeState state = workflowNodeStatus.getState();
- if (state != null) {
- try {
- String workflowNodeID = workflowNodeStatus.getIdentity().getWorkflowNodeID();
- updateWorkflowNodeStatus(workflowNodeID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
- }
-
- @Subscribe
- public void setupExperimentStatus(WorkflowNodeStatusChangeRequest nodeStatus) {
- ExperimentState state = ExperimentState.UNKNOWN;
- switch (nodeStatus.getState()) {
- case CANCELED:
- state = ExperimentState.CANCELED;
- break;
- case COMPLETED:
- state = ExperimentState.COMPLETED;
- break;
- case INVOKED:
- state = ExperimentState.LAUNCHED;
- break;
- case FAILED:
- state = ExperimentState.FAILED;
- break;
- case EXECUTING:
- state = ExperimentState.EXECUTING;
- break;
- case CANCELING:
- state = ExperimentState.CANCELING;
- break;
- default:
- break;
- }
- logger.debug("Publishing Experiment Status " + state.toString());
- monitorPublisher.publish(new ExperimentStatusChangeRequest(nodeStatus.getIdentity(), state));
+ public void setupWorkflowNodeStatus(TaskStatusChangedEvent taskStatus){
+ WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+ switch(taskStatus.getState()){
+ case CANCELED:
+ state=WorkflowNodeState.CANCELED; break;
+ case COMPLETED:
+ state=WorkflowNodeState.COMPLETED; break;
+ case CONFIGURING_WORKSPACE:
+ state=WorkflowNodeState.INVOKED; break;
+ case FAILED:
+ state=WorkflowNodeState.FAILED; break;
+ case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+ state=WorkflowNodeState.EXECUTING; break;
+ case STARTED:
+ state=WorkflowNodeState.INVOKED; break;
+ case CANCELING:
+ state=WorkflowNodeState.CANCELING; break;
+ default:
+ break;
+ }
+ try {
+ updateWorkflowNodeStatus(taskStatus.getIdentity().getWorkflowNodeID(), state);
+ logger.debug("Publishing workflow node status for "+taskStatus.getIdentity().getWorkflowNodeID()+":"+state.toString());
+ monitorPublisher.publish(new WorkflowNodeStatusChangedEvent(taskStatus.getIdentity(),state));
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
}
-
public void updateWorkflowNodeStatus(String workflowNodeId, WorkflowNodeState state) throws Exception {
+ logger.debug("Updating workflow node status for "+workflowNodeId+":"+state.toString());
WorkflowNodeDetails details = (WorkflowNodeDetails)airavataRegistry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
if(details == null) {
details = new WorkflowNodeDetails();
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java
deleted file mode 100644
index a8bc6b4..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangeRequest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.monitor.state;
-
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class ExperimentStatusChangeRequest extends AbstractStateChangeRequest {
- private ExperimentState state;
- private ExperimentIdentity identity;
-
- // this constructor can be used in Qstat monitor to handle errors
- public ExperimentStatusChangeRequest() {
- }
-
- public ExperimentStatusChangeRequest(ExperimentIdentity experimentIdentity, ExperimentState state) {
- this.state = state;
- setIdentity(experimentIdentity);
- }
-
- public ExperimentState getState() {
- return state;
- }
-
- public void setState(ExperimentState state) {
- this.state = state;
- }
-
- public ExperimentIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(ExperimentIdentity identity) {
- this.identity = identity;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java
new file mode 100644
index 0000000..a95d46c
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/ExperimentStatusChangedEvent.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class ExperimentStatusChangedEvent extends AbstractStateChangeRequest {
+ private ExperimentState state;
+ private ExperimentIdentity identity;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public ExperimentStatusChangedEvent() {
+ }
+
+ public ExperimentStatusChangedEvent(ExperimentIdentity experimentIdentity, ExperimentState state) {
+ this.state = state;
+ setIdentity(experimentIdentity);
+ }
+
+ public ExperimentState getState() {
+ return state;
+ }
+
+ public void setState(ExperimentState state) {
+ this.state = state;
+ }
+
+ public ExperimentIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(ExperimentIdentity identity) {
+ this.identity = identity;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java
new file mode 100644
index 0000000..d995f03
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/JobStatusChangedEvent.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.model.workspace.experiment.JobState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class JobStatusChangedEvent extends AbstractStateChangeRequest {
+ private JobState state;
+ private JobIdentity identity;
+
+ private MonitorID monitorID;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public JobStatusChangedEvent() {
+ }
+
+ public JobStatusChangedEvent(MonitorID monitorID) {
+ setIdentity(new JobIdentity(monitorID.getExperimentID(),monitorID.getWorkflowNodeID(),
+ monitorID.getTaskID(),monitorID.getJobID()));
+ setMonitorID(monitorID);
+ this.state = monitorID.getStatus();
+ }
+ public JobStatusChangedEvent(MonitorID monitorID, JobIdentity jobId, JobState state) {
+ setIdentity(jobId);
+ setMonitorID(monitorID);
+ this.state = state;
+ }
+
+ public JobState getState() {
+ return state;
+ }
+
+ public void setState(JobState state) {
+ this.state = state;
+ }
+
+ public JobIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(JobIdentity identity) {
+ this.identity = identity;
+ }
+
+ public MonitorID getMonitorID() {
+ return monitorID;
+ }
+
+ public void setMonitorID(MonitorID monitorID) {
+ this.monitorID = monitorID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java
new file mode 100644
index 0000000..ec217bc
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/TaskStatusChangedEvent.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
+import org.apache.airavata.model.workspace.experiment.TaskState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class TaskStatusChangedEvent extends AbstractStateChangeRequest {
+ private TaskState state;
+ private TaskIdentity identity;
+ // this constructor can be used in Qstat monitor to handle errors
+ public TaskStatusChangedEvent() {
+ }
+
+ public TaskStatusChangedEvent(TaskIdentity taskIdentity, TaskState state) {
+ this.state = state;
+ setIdentity(taskIdentity);
+ }
+
+ public TaskState getState() {
+ return state;
+ }
+
+ public void setState(TaskState state) {
+ this.state = state;
+ }
+
+ public TaskIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(TaskIdentity identity) {
+ this.identity = identity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java
deleted file mode 100644
index 9e52dd4..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangeRequest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.monitor.state;
-
-import org.apache.airavata.gfac.core.monitor.WorkflowNodeIdentity;
-import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
-
-/**
- * This is the primary job state object used in
- * through out the monitor module. This use airavata-data-model JobState enum
- * Ideally after processing each event or monitoring message from remote system
- * Each monitoring implementation has to return this object with a state and
- * the monitoring ID
- */
-public class WorkflowNodeStatusChangeRequest extends AbstractStateChangeRequest {
- private WorkflowNodeState state;
- private WorkflowNodeIdentity identity;
-
- // this constructor can be used in Qstat monitor to handle errors
- public WorkflowNodeStatusChangeRequest() {
- }
-
- public WorkflowNodeStatusChangeRequest(WorkflowNodeIdentity identity, WorkflowNodeState state) {
- this.state = state;
- setIdentity(identity);
- }
-
- public WorkflowNodeState getState() {
- return state;
- }
-
- public void setState(WorkflowNodeState state) {
- this.state = state;
- }
-
- public WorkflowNodeIdentity getIdentity() {
- return identity;
- }
-
- public void setIdentity(WorkflowNodeIdentity identity) {
- this.identity = identity;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java
new file mode 100644
index 0000000..6671add
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/WorkflowNodeStatusChangedEvent.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.WorkflowNodeIdentity;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+
+/**
+ * This is the primary job state object used in
+ * through out the monitor module. This use airavata-data-model JobState enum
+ * Ideally after processing each event or monitoring message from remote system
+ * Each monitoring implementation has to return this object with a state and
+ * the monitoring ID
+ */
+public class WorkflowNodeStatusChangedEvent extends AbstractStateChangeRequest {
+ private WorkflowNodeState state;
+ private WorkflowNodeIdentity identity;
+
+ // this constructor can be used in Qstat monitor to handle errors
+ public WorkflowNodeStatusChangedEvent() {
+ }
+
+ public WorkflowNodeStatusChangedEvent(WorkflowNodeIdentity identity, WorkflowNodeState state) {
+ this.state = state;
+ setIdentity(identity);
+ }
+
+ public WorkflowNodeState getState() {
+ return state;
+ }
+
+ public void setState(WorkflowNodeState state) {
+ this.state = state;
+ }
+
+ public WorkflowNodeIdentity getIdentity() {
+ return identity;
+ }
+
+ public void setIdentity(WorkflowNodeIdentity identity) {
+ this.identity = identity;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/601d5add/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index a1f38fc..fa6714d 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -29,9 +29,10 @@ import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
@@ -178,8 +179,10 @@ public class HPCPullMonitor extends PullMonitor {
} catch (GFacException e) {
publisher.publish(new TaskStatusChangeRequest(new TaskIdentity(iMonitorID.getExperimentID(), iMonitorID.getWorkflowNodeID(),
iMonitorID.getTaskID()), TaskState.FAILED));
- publisher.publish(new ExperimentStatusChangeRequest(new ExperimentIdentity(iMonitorID.getExperimentID()),
- ExperimentState.FAILED));
+ //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
+ //should be done understanding whole workflow of job submission and data transfer
+// publisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(iMonitorID.getExperimentID()),
+// ExperimentState.FAILED));
logger.info(e.getLocalizedMessage(), e);
}
} else if (iMonitorID.getFailedCount() > 2 && iMonitorID.getStatus().equals(JobState.UNKNOWN)) {
[3/3] git commit: moving about status listener classes out of hte gfac
Posted by sa...@apache.org.
moving about status listener classes out of hte gfac
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/92838e4a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/92838e4a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/92838e4a
Branch: refs/heads/workflow-support
Commit: 92838e4a8d9a678e795c9deedaba76aeac652dc3
Parents: 601d5ad
Author: Saminda Wijeratne <sa...@gmail.com>
Authored: Sat Jul 19 09:23:19 2014 -0400
Committer: Saminda Wijeratne <sa...@gmail.com>
Committed: Sat Jul 19 09:23:19 2014 -0400
----------------------------------------------------------------------
.../server/handler/AiravataServerHandler.java | 98 ++++---
.../util/AiravataExperimentStatusUpdator.java | 117 ++++++++
.../api/server/util/DataModelUtils.java | 56 ++++
.../tools/RegisterSampleApplications.java | 207 +++++++++++---
.../airavata/model/util/ExecutionType.java | 28 ++
.../model/util/ExperimentModelUtil.java | 16 +-
.../experimentModel.thrift | 3 +-
modules/commons/utils/pom.xml | 7 +-
.../common/utils/AbstractActivityListener.java | 27 ++
.../airavata/common/utils/MonitorPublisher.java | 47 ++++
.../main/resources/airavata-server.properties | 2 +-
modules/distribution/server/pom.xml | 5 +
.../server/src/main/assembly/bin-assembly.xml | 1 +
.../airavata/gfac/server/GfacServerHandler.java | 20 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 34 ++-
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 33 ++-
.../gfac/core/handler/AbstractHandler.java | 2 +-
.../handler/AbstractRecoverableHandler.java | 2 +-
.../core/monitor/AbstractActivityListener.java | 54 ++--
.../AiravataExperimentStatusUpdator.java | 232 ++++++++--------
.../core/monitor/AiravataJobStatusUpdator.java | 3 +-
.../core/monitor/AiravataTaskStatusUpdator.java | 3 +-
.../AiravataWorkflowNodeStatusUpdator.java | 3 +-
.../core/monitor/GfacInternalStatusUpdator.java | 17 +-
.../core/notification/MonitorPublisher.java | 94 +++----
.../gfac/core/provider/AbstractProvider.java | 3 +-
.../provider/AbstractRecoverableProvider.java | 2 +-
.../gfac/services/impl/LocalProviderTest.java | 17 +-
.../monitor/core/AiravataAbstractMonitor.java | 2 +-
.../handlers/GridPushMonitorHandler.java | 13 +-
.../monitor/impl/pull/qstat/HPCPullMonitor.java | 22 +-
.../monitor/impl/push/amqp/AMQPMonitor.java | 21 +-
.../monitor/impl/push/amqp/BasicConsumer.java | 11 +-
.../apache/airavata/job/AMQPMonitorTest.java | 21 +-
.../job/QstatMonitorTestWithMyProxyAuth.java | 21 +-
.../airavata/workflow/engine/WorkflowUtils.java | 56 ++++
.../engine/interpretor/WorkflowInterpreter.java | 267 ++++++++++---------
.../engine/util/ProxyMonitorPublisher.java | 62 +++++
.../model/component/ws/WSComponent.java | 2 +-
.../ui/dialogs/registry/RegistryWindow.java | 4 +-
.../WorkflowInterpreterLaunchWindow.java | 127 +++++----
41 files changed, 1190 insertions(+), 572 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index 58d8acf..3c610f0 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -33,7 +33,6 @@ import java.util.Random;
import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.airavata.appcatalog.cpi.ApplicationDeployment;
-import org.airavata.appcatalog.cpi.ApplicationInterface;
import org.airavata.appcatalog.cpi.ComputeResource;
import org.airavata.appcatalog.cpi.GwyResourceProfile;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
@@ -46,6 +45,7 @@ import org.apache.aiaravata.application.catalog.data.resources.SshJobSubmissionR
import org.apache.aiaravata.application.catalog.data.util.AppCatalogThriftConversion;
import org.apache.airavata.api.Airavata;
import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.api.server.util.DataModelUtils;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentDescription;
@@ -53,7 +53,16 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationModule;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementInterface;
+import org.apache.airavata.model.appcatalog.computeresource.DataMovementProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.GridFTPDataMovement;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionProtocol;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALDataMovement;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SCPDataMovement;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
import org.apache.airavata.model.appcatalog.gatewayprofile.GatewayResourceProfile;
import org.apache.airavata.model.error.AiravataClientException;
@@ -63,6 +72,7 @@ import org.apache.airavata.model.error.ExperimentNotFoundException;
import org.apache.airavata.model.error.InvalidRequestException;
import org.apache.airavata.model.error.LaunchValidationException;
import org.apache.airavata.model.error.ProjectNotFoundException;
+import org.apache.airavata.model.util.ExecutionType;
import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
@@ -88,8 +98,6 @@ import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.utils.Constants;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
-import org.apache.airavata.workflow.catalog.WorkflowCatalogException;
-import org.apache.airavata.workflow.catalog.WorkflowCatalogFactory;
import org.apache.airavata.workflow.engine.WorkflowEngine;
import org.apache.airavata.workflow.engine.WorkflowEngineException;
import org.apache.airavata.workflow.engine.WorkflowEngineFactory;
@@ -1027,55 +1035,41 @@ public class AiravataServerHandler implements Airavata.Iface, Watcher {
final String expID = airavataExperimentId;
final String token = airavataCredStoreToken;
synchronized (this) {
- try {
- ApplicationInterface applicationInterface = AppCatalogFactory.getAppCatalog().getApplicationInterface();
- List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
- Experiment experiment = getExperiment(expID);
- String applicationId = experiment.getApplicationId();
- Thread thread = null;
- if (allApplicationInterfaceIds.contains(applicationId)){
- //its an single application execution experiment
- final OrchestratorService.Client orchestratorClient = getOrchestratorClient();
- if (orchestratorClient.validateExperiment(expID)) {
- thread = new Thread() {
- public void run() {
- try {
- launchSingleAppExperiment(expID, token, orchestratorClient);
- } catch (TException e) {
- e.printStackTrace();
- }
- }
- };
- } else {
- throw new InvalidRequestException("Experiment Validation Failed, please check the configuration");
- }
-
- } else {
- List<String> allWorkflows = WorkflowCatalogFactory.getWorkflowCatalog().getAllWorkflows();
- if (allWorkflows.contains(applicationId)){
- //its a workflow execution experiment
- thread = new Thread() {
- public void run() {
- try {
- launchWorkflowExperiment(expID, token);
- } catch (TException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- };
- } else {
- throw new InvalidRequestException("Experiment '"+expID+"' launch failed. Unable to locate an application or a workflow with the id "+applicationId);
- }
- }
- thread.start();
- } catch (AppCatalogException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
- } catch (WorkflowCatalogException e1) {
- // TODO Auto-generated catch block
- e1.printStackTrace();
+ Experiment experiment = getExperiment(expID);
+ ExecutionType executionType = DataModelUtils.getExecutionType(experiment);
+ Thread thread = null;
+ if (executionType==ExecutionType.SINGLE_APP){
+ //its an single application execution experiment
+ final OrchestratorService.Client orchestratorClient = getOrchestratorClient();
+ if (orchestratorClient.validateExperiment(expID)) {
+ thread = new Thread() {
+ public void run() {
+ try {
+ launchSingleAppExperiment(expID, token, orchestratorClient);
+ } catch (TException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ } else {
+ throw new InvalidRequestException("Experiment Validation Failed, please check the configuration");
+ }
+
+ } else if (executionType == ExecutionType.WORKFLOW){
+ //its a workflow execution experiment
+ thread = new Thread() {
+ public void run() {
+ try {
+ launchWorkflowExperiment(expID, token);
+ } catch (TException e) {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ }
+ };
+ } else {
+ throw new InvalidRequestException("Experiment '"+expID+"' launch failed. Unable to figureout execution type for application "+experiment.getApplicationId());
}
-
+ thread.start();
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/AiravataExperimentStatusUpdator.java
new file mode 100644
index 0000000..f75d95c
--- /dev/null
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/AiravataExperimentStatusUpdator.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.api.server.util;
+
+import java.util.Calendar;
+
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
+import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
+import org.apache.airavata.model.util.ExecutionType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.Subscribe;
+
+public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
+ private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
+
+ private Registry airavataRegistry;
+ private MonitorPublisher monitorPublisher;
+
+ public Registry getAiravataRegistry() {
+ return airavataRegistry;
+ }
+
+ public void setAiravataRegistry(Registry airavataRegistry) {
+ this.airavataRegistry = airavataRegistry;
+ }
+
+ @Subscribe
+ public void setupExperimentStatus(WorkflowNodeStatusChangedEvent nodeStatus) {
+ try {
+ boolean updateExperimentStatus=true;
+ ExperimentState state = ExperimentState.UNKNOWN;
+ switch (nodeStatus.getState()) {
+ case CANCELED:
+ state = ExperimentState.CANCELED; updateExperimentStatus = true;
+ break;
+ case COMPLETED:
+ state = ExperimentState.COMPLETED; updateExperimentStatus = false;
+ break;
+ case INVOKED:
+ state = ExperimentState.LAUNCHED; updateExperimentStatus = false;
+ break;
+ case FAILED:
+ state = ExperimentState.FAILED; updateExperimentStatus = true;
+ break;
+ case EXECUTING:
+ state = ExperimentState.EXECUTING; updateExperimentStatus = true;
+ break;
+ case CANCELING:
+ state = ExperimentState.CANCELING; updateExperimentStatus = true;
+ break;
+ default:
+ break;
+ }
+ if (!updateExperimentStatus){
+ ExecutionType executionType = DataModelUtils.getExecutionType((Experiment) airavataRegistry.get(RegistryModelType.EXPERIMENT, nodeStatus.getIdentity().getExperimentID()));
+ updateExperimentStatus=(executionType==ExecutionType.SINGLE_APP);
+ }
+ updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state);
+ logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString());
+ monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state));
+ } catch (Exception e) {
+ logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+ }
+ }
+
+ public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
+ logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString());
+ Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
+ if(details == null) {
+ details = new Experiment();
+ details.setExperimentID(experimentId);
+ }
+ org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+ status.setExperimentState(state);
+ status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+ details.setExperimentStatus(status);
+ airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+
+ }
+
+ public void setup(Object... configurations) {
+ for (Object configuration : configurations) {
+ if (configuration instanceof Registry){
+ this.airavataRegistry=(Registry)configuration;
+ } else if (configuration instanceof MonitorPublisher){
+ this.monitorPublisher=(MonitorPublisher) configuration;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/DataModelUtils.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/DataModelUtils.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/DataModelUtils.java
new file mode 100644
index 0000000..9a0536c
--- /dev/null
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/DataModelUtils.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.api.server.util;
+
+import java.util.List;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.ApplicationInterface;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.model.util.ExecutionType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.workflow.catalog.WorkflowCatalogException;
+import org.apache.airavata.workflow.catalog.WorkflowCatalogFactory;
+
+public class DataModelUtils {
+
+ public static ExecutionType getExecutionType(Experiment experiment){
+ try {
+ ApplicationInterface applicationInterface = AppCatalogFactory.getAppCatalog().getApplicationInterface();
+ List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
+ String applicationId = experiment.getApplicationId();
+ if (allApplicationInterfaceIds.contains(applicationId)){
+ return ExecutionType.SINGLE_APP;
+ } else {
+ List<String> allWorkflows = WorkflowCatalogFactory.getWorkflowCatalog().getAllWorkflows();
+ if (allWorkflows.contains(applicationId)){
+ return ExecutionType.WORKFLOW;
+ }
+ }
+ } catch (AppCatalogException e) {
+ e.printStackTrace();
+ } catch (WorkflowCatalogException e) {
+ e.printStackTrace();
+ }
+ return ExecutionType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java
index e991b06..1374f9f 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/tools/RegisterSampleApplications.java
@@ -47,10 +47,10 @@ import org.apache.thrift.TException;
public class RegisterSampleApplications {
- public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
-// public static final String THRIFT_SERVER_HOST = "localhost";
- public static final int THRIFT_SERVER_PORT = 9930;
-// public static final int THRIFT_SERVER_PORT = 8930;
+// public static final String THRIFT_SERVER_HOST = "gw111.iu.xsede.org";
+ public static final String THRIFT_SERVER_HOST = "localhost";
+// public static final int THRIFT_SERVER_PORT = 9930;
+ public static final int THRIFT_SERVER_PORT = 8930;
private final static Logger logger = LoggerFactory.getLogger(RegisterSampleApplications.class);
private static final String DEFAULT_GATEWAY = "default";
private static Airavata.Client airavataClient;
@@ -110,22 +110,22 @@ public class RegisterSampleApplications {
System.out.println("API version is " + airavataClient.getAPIVersion());
//Register all compute hosts
- registerXSEDEHosts();
+// registerXSEDEHosts();
//Register Gateway Resource Preferences
- registerGatewayResourceProfile();
+// registerGatewayResourceProfile();
//Register all application modules
registerAppModules();
//Register all application deployments
- registerAppDeployments();
+// registerAppDeployments();
//Register all application interfaces
registerAppInterfaces();
//write output into propertiesFile
- writeIdPropertyFile();
+// writeIdPropertyFile();
} catch (Exception e) {
logger.error("Error while connecting with server", e.getMessage());
@@ -237,12 +237,13 @@ public class RegisterSampleApplications {
public static void registerAppInterfaces() {
System.out.println("\n #### Registering Application Interfaces #### \n");
-
+ registerGromaxWorkflowInterfaces();
+
//Registering Echo
- registerEchoInterface();
+// registerEchoInterface();
//Registering Amber
- registerAmberInterface();
+// registerAmberInterface();
//Registering AutoDock
// registerAutoDockInterface();
@@ -263,10 +264,152 @@ public class RegisterSampleApplications {
// registerTrinityInterface();
//Registering WRF
- registerWRFInterface();
+// registerWRFInterface();
+
+ }
+
+ public static void registerGromaxWorkflowInterfaces() {
+ try {
+ System.out.println("#### Registering Echo Interface #### \n");
+
+ List<String> appModules = new ArrayList<String>();
+ appModules.add(echoModuleId);
+
+
+ List<InputDataObjectType> applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("s_struct", "",
+ DataType.URI, null, false, "Starting Structure File", null));
+
+
+ List<OutputDataObjectType> applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("ffcomplient_struct",
+ "", DataType.URI));
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("topology",
+ "", DataType.URI));
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("pb2gmx", "pb2gmx",
+ appModules, applicationInputs, applicationOutputs));
+
+
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("ffcomplient_struct", "",
+ DataType.URI, null, false, "FFComplient Structure File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("topology", "",
+ DataType.URI, null, false, "Topology File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("control_param_emv", "",
+ DataType.URI, null, false, "Controlled parameters array of EM Vacuum", null));
+
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("energy_min_struct",
+ "", DataType.URI));
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("simulation1", "simulation1",
+ appModules, applicationInputs, applicationOutputs));
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("energy_min_struct", "",
+ DataType.URI, null, false, "Energy Minimized Structure File", null));
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("struct_with_pbc",
+ "", DataType.URI));
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("editconf", "Edit configuration",
+ appModules, applicationInputs, applicationOutputs));
+
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("struct_with_pbc", "",
+ DataType.URI, null, false, "Structure with PBC File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("solvent_struct", "",
+ DataType.URI, null, false, "Solvent Structure File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("topology", "",
+ DataType.URI, null, false, "Topology File", null));
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("struct_with_water",
+ "", DataType.URI));
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("topology_with_water",
+ "", DataType.URI));
+
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("genbox", "genbox",
+ appModules, applicationInputs, applicationOutputs));
+
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("struct_with_water", "",
+ DataType.URI, null, false, "Structure with water File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("topology_with_water", "",
+ DataType.URI, null, false, "Topology including water File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("control_param_emv", "",
+ DataType.URI, null, false, "Controlled parameters array of EM Vacuum", null));
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("struct_topoogy",
+ "", DataType.URI));
+
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("grompp", "grompp",
+ appModules, applicationInputs, applicationOutputs));
+
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("struct_topoogy", "",
+ DataType.URI, null, false, "Structure and Topology File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("topology_with_water", "",
+ DataType.URI, null, false, "Topology including water File", null));
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("sys_topoogy",
+ "", DataType.URI));
+
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("sys_config",
+ "", DataType.URI));
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("genion", "grompp",
+ appModules, applicationInputs, applicationOutputs));
+
+ applicationInputs.clear();
+ applicationInputs = new ArrayList<InputDataObjectType>();
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("sys_topoogy", "",
+ DataType.URI, null, false, "Structure and Topology File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("sys_config", "",
+ DataType.URI, null, false, "Topology including water File", null));
+ applicationInputs.add(RegisterSampleApplicationsUtils.createAppInput("control_param_ems", "",
+ DataType.URI, null, false, "Controlled parameters array of EM Solvent", null));
+
+ applicationOutputs.clear();
+ applicationOutputs = new ArrayList<OutputDataObjectType>();
+ applicationOutputs.add(RegisterSampleApplicationsUtils.createAppOutput("energymin_sys",
+ "", DataType.URI));
+
+ echoInterfaceId = airavataClient.registerApplicationInterface(
+ RegisterSampleApplicationsUtils.createApplicationInterfaceDescription("simulation2", "simulation2",
+ appModules, applicationInputs, applicationOutputs));
+
+ System.out.println("Echo Application Interface Id " + echoInterfaceId);
+
+ } catch (TException e) {
+ e.printStackTrace();
+ }
}
+
public static void registerEchoInterface() {
try {
System.out.println("#### Registering Echo Interface #### \n");
@@ -787,26 +930,26 @@ public class RegisterSampleApplications {
}
public static void writeIdPropertyFile() {
-
- try {
- Properties properties = new Properties();
- properties.setProperty("stampedeResourceId", stampedeResourceId);
- properties.setProperty("trestlesResourceId", trestlesResourceId);
- properties.setProperty("bigredResourceId", bigredResourceId);
-
- properties.setProperty("echoInterfaceId", echoInterfaceId);
- properties.setProperty("amberInterfaceId", amberInterfaceId);
- properties.setProperty("wrfInterfaceId", wrfInterfaceId);
-
- File file = new File("airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/conf/app-catalog-identifiers.ini");
- FileOutputStream fileOut = new FileOutputStream(file);
- properties.store(fileOut, "Apache Airavata Gateway to Airavata Deployment Identifiers");
- fileOut.close();
- } catch (FileNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
+//
+// try {
+// Properties properties = new Properties();
+// properties.setProperty("stampedeResourceId", stampedeResourceId);
+// properties.setProperty("trestlesResourceId", trestlesResourceId);
+// properties.setProperty("bigredResourceId", bigredResourceId);
+//
+// properties.setProperty("echoInterfaceId", echoInterfaceId);
+// properties.setProperty("amberInterfaceId", amberInterfaceId);
+// properties.setProperty("wrfInterfaceId", wrfInterfaceId);
+//
+// File file = new File("airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/conf/app-catalog-identifiers.ini");
+// FileOutputStream fileOut = new FileOutputStream(file);
+// properties.store(fileOut, "Apache Airavata Gateway to Airavata Deployment Identifiers");
+// fileOut.close();
+// } catch (FileNotFoundException e) {
+// e.printStackTrace();
+// } catch (IOException e) {
+// e.printStackTrace();
+// }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExecutionType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExecutionType.java b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExecutionType.java
new file mode 100644
index 0000000..87fcec4
--- /dev/null
+++ b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExecutionType.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.model.util;
+
+public enum ExecutionType {
+ UNKNOWN,
+ SINGLE_APP,
+ WORKFLOW
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
index 1b53d38..7b08bcd 100644
--- a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
+++ b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
@@ -22,14 +22,24 @@
package org.apache.airavata.model.util;
-import org.apache.airavata.model.workspace.experiment.*;
-
import java.util.Calendar;
import java.util.List;
+import org.apache.airavata.model.workspace.experiment.AdvancedInputDataHandling;
+import org.apache.airavata.model.workspace.experiment.AdvancedOutputDataHandling;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.QualityOfServiceParams;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.UserConfigurationData;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
+import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
-public class ExperimentModelUtil {
+public class ExperimentModelUtil {
+
public static WorkflowNodeStatus createWorkflowNodeStatus(WorkflowNodeState state){
WorkflowNodeStatus status = new WorkflowNodeStatus();
status.setWorkflowNodeState(state);
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/airavata-api/thrift-interface-descriptions/experimentModel.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/experimentModel.thrift b/airavata-api/thrift-interface-descriptions/experimentModel.thrift
index 5338d2b..7943ecd 100644
--- a/airavata-api/thrift-interface-descriptions/experimentModel.thrift
+++ b/airavata-api/thrift-interface-descriptions/experimentModel.thrift
@@ -319,7 +319,8 @@ struct TaskDetails {
enum ExecutionUnit {
INPUT,
APPLICATION,
- OUTPUT
+ OUTPUT,
+ OTHER
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index aca8fea..5d92f1a 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -65,7 +65,12 @@
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
-
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>12.0</version>
+ </dependency>
+
<!-- Logging -->
<dependency>
<groupId>org.slf4j</groupId>
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AbstractActivityListener.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AbstractActivityListener.java
new file mode 100644
index 0000000..51922a0
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AbstractActivityListener.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.common.utils;
+
+
+public interface AbstractActivityListener {
+ public void setup(Object... configurations);
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
new file mode 100644
index 0000000..7f64e86
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/MonitorPublisher.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.common.utils;
+
+import com.google.common.eventbus.EventBus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MonitorPublisher{
+ private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
+ private EventBus eventBus;
+
+ public MonitorPublisher(EventBus eventBus) {
+ this.eventBus = eventBus;
+ }
+
+ public void registerListener(Object listener) {
+ eventBus.register(listener);
+ }
+
+ public void unregisterListener(Object listener) {
+ eventBus.unregister(listener);
+ }
+
+ public void publish(Object o) {
+ eventBus.post(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 6a8367d..166741f 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -276,7 +276,7 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.api.server.util.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator,org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher
###---------------------------Orchestrator module Configurations---------------------------###
#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index e7ccde2..5328ef9 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -238,6 +238,11 @@
<version>2.4</version>
</dependency>
<dependency>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ <version>1.6</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.airavata</groupId>
<artifactId>airavata-standalone-server</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 39ebbbb..75f941d 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -251,6 +251,7 @@
<include>org.apache.xmlbeans:xmlbeans</include>
<include>org.apache.thrift:libthrift:jar:0.9.1</include>
<include>commons-cli:commons-cli</include>
+ <include>commons-codec:commons-codec</include>
<include>com.rabbitmq:amqp-client</include>
<include>com.fasterxml.jackson.core:jackson-databind</include>
<include>com.fasterxml.jackson.core:jackson-core</include>
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index ea6fd0e..6f3247c 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,17 +20,18 @@
*/
package org.apache.airavata.gfac.server;
-import com.google.common.eventbus.EventBus;
+import java.io.File;
+import java.io.IOException;
+
import org.apache.airavata.common.exception.AiravataConfigurationException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
-import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -40,16 +41,17 @@ import org.apache.airavata.registry.api.Gateway;
import org.apache.airavata.registry.api.exception.RegException;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.util.Random;
+import com.google.common.eventbus.EventBus;
public class GfacServerHandler implements GfacService.Iface, Watcher{
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 62c44ab..30e5c0a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -34,7 +34,9 @@ import org.airavata.appcatalog.cpi.AppCatalog;
import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AbstractActivityListener;
import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -51,17 +53,13 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.GFacRecoverableHandler;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.monitor.AbstractActivityListener;
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
@@ -75,18 +73,38 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
+import org.apache.airavata.model.appcatalog.computeresource.JobManagerCommand;
+import org.apache.airavata.model.appcatalog.computeresource.JobSubmissionInterface;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.ResourceJobManager;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
+import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.airavata.schemas.gfac.*;
+import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.DataType;
+import org.apache.airavata.schemas.gfac.GsisshHostType;
+import org.apache.airavata.schemas.gfac.HostDescriptionType;
+import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.airavata.schemas.gfac.InputParameterType;
+import org.apache.airavata.schemas.gfac.JobTypeType;
+import org.apache.airavata.schemas.gfac.OutputParameterType;
+import org.apache.airavata.schemas.gfac.ParameterType;
+import org.apache.airavata.schemas.gfac.ProjectAccountType;
+import org.apache.airavata.schemas.gfac.QueueType;
+import org.apache.airavata.schemas.gfac.SSHHostType;
+import org.apache.airavata.schemas.gfac.ServiceDescriptionType;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
-import org.ogf.schemas.jsdl.hpcpa.HPCProfileApplicationDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index 2065cee..83fb43a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -28,10 +28,13 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
-import com.google.common.eventbus.EventBus;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
import org.apache.airavata.client.api.AiravataAPI;
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.commons.gfac.type.HostDescription;
@@ -43,34 +46,36 @@ import org.apache.airavata.gfac.Scheduler;
import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.monitor.*;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.TaskIdentity;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
-import org.apache.airavata.gfac.core.handler.GFacHandler;
import org.apache.airavata.gfac.core.provider.GFacProvider;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
-import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
-import org.apache.airavata.gfac.core.handler.GFacHandlerException;
-import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.TaskDetails;
+import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.api.AiravataRegistry2;
-import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryModelType;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.xpath.XPathExpressionException;
+import com.google.common.eventbus.EventBus;
/**
* This is the GFac CPI class for external usage, this simply have a single method to submit a job to
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index 81d2072..b0383c0 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -20,9 +20,9 @@
*/
package org.apache.airavata.gfac.core.handler;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
index 431c202..423b5d9 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
@@ -20,9 +20,9 @@
*/
package org.apache.airavata.gfac.core.handler;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AbstractActivityListener.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AbstractActivityListener.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AbstractActivityListener.java
index d642154..6eb1067 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AbstractActivityListener.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AbstractActivityListener.java
@@ -1,27 +1,27 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.airavata.gfac.core.monitor;
-
-
-public interface AbstractActivityListener {
- public void setup(Object... configurations);
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+// */
+//
+//package org.apache.airavata.gfac.core.monitor;
+//
+//
+//public interface AbstractActivityListener {
+// public void setup(Object... configurations);
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
index 664f237..7feac06 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataExperimentStatusUpdator.java
@@ -1,123 +1,109 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.monitor;
-
-import com.google.common.eventbus.Subscribe;
-
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
-import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.airavata.registry.cpi.Registry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Calendar;
-
-public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
- private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
-
- private Registry airavataRegistry;
- private MonitorPublisher monitorPublisher;
-
- public Registry getAiravataRegistry() {
- return airavataRegistry;
- }
-
- public void setAiravataRegistry(Registry airavataRegistry) {
- this.airavataRegistry = airavataRegistry;
- }
-
- @Subscribe
- public void updateRegistry(ExperimentStatusChangedEvent experimentStatus) {
- ExperimentState state = experimentStatus.getState();
- if (state != null) {
- try {
- String experimentID = experimentStatus.getIdentity().getExperimentID();
- updateExperimentStatus(experimentID, state);
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
- }
-
-
- @Subscribe
- public void setupExperimentStatus(WorkflowNodeStatusChangedEvent nodeStatus) {
- ExperimentState state = ExperimentState.UNKNOWN;
- switch (nodeStatus.getState()) {
- case CANCELED:
- state = ExperimentState.CANCELED;
- break;
- case COMPLETED:
- state = ExperimentState.COMPLETED;
- break;
- case INVOKED:
- state = ExperimentState.LAUNCHED;
- break;
- case FAILED:
- state = ExperimentState.FAILED;
- break;
- case EXECUTING:
- state = ExperimentState.EXECUTING;
- break;
- case CANCELING:
- state = ExperimentState.CANCELING;
- break;
- default:
- break;
- }
- try {
- updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state);
- logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString());
- monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state));
- } catch (Exception e) {
- logger.error("Error persisting data" + e.getLocalizedMessage(), e);
- }
- }
-
- public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
- logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString());
- Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
- if(details == null) {
- details = new Experiment();
- details.setExperimentID(experimentId);
- }
- org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
- status.setExperimentState(state);
- status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
- details.setExperimentStatus(status);
- airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
-
- }
-
- public void setup(Object... configurations) {
- for (Object configuration : configurations) {
- if (configuration instanceof Registry){
- this.airavataRegistry=(Registry)configuration;
- } else if (configuration instanceof MonitorPublisher){
- this.monitorPublisher=(MonitorPublisher) configuration;
- }
- }
- }
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.core.monitor;
+//
+//import com.google.common.eventbus.Subscribe;
+//
+//import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
+//import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
+//import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+//import org.apache.airavata.model.workspace.experiment.Experiment;
+//import org.apache.airavata.model.workspace.experiment.ExperimentState;
+//import org.apache.airavata.registry.cpi.RegistryModelType;
+//import org.apache.airavata.registry.cpi.Registry;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//import java.util.Calendar;
+//
+//public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
+// private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
+//
+// private Registry airavataRegistry;
+// private MonitorPublisher monitorPublisher;
+//
+// public Registry getAiravataRegistry() {
+// return airavataRegistry;
+// }
+//
+// public void setAiravataRegistry(Registry airavataRegistry) {
+// this.airavataRegistry = airavataRegistry;
+// }
+//
+// @Subscribe
+// public void setupExperimentStatus(WorkflowNodeStatusChangedEvent nodeStatus) {
+// ExperimentState state = ExperimentState.UNKNOWN;
+// switch (nodeStatus.getState()) {
+// case CANCELED:
+// state = ExperimentState.CANCELED;
+// break;
+// case COMPLETED:
+// state = ExperimentState.COMPLETED;
+// break;
+// case INVOKED:
+// state = ExperimentState.LAUNCHED;
+// break;
+// case FAILED:
+// state = ExperimentState.FAILED;
+// break;
+// case EXECUTING:
+// state = ExperimentState.EXECUTING;
+// break;
+// case CANCELING:
+// state = ExperimentState.CANCELING;
+// break;
+// default:
+// break;
+// }
+// try {
+// updateExperimentStatus(nodeStatus.getIdentity().getExperimentID(), state);
+// logger.debug("Publishing experiment status for "+nodeStatus.getIdentity().getExperimentID()+":"+state.toString());
+// monitorPublisher.publish(new ExperimentStatusChangedEvent(nodeStatus.getIdentity(), state));
+// } catch (Exception e) {
+// logger.error("Error persisting data" + e.getLocalizedMessage(), e);
+// }
+// }
+//
+// public void updateExperimentStatus(String experimentId, ExperimentState state) throws Exception {
+// logger.info("Updating the experiment status of experiment: " + experimentId + " to " + state.toString());
+// Experiment details = (Experiment)airavataRegistry.get(RegistryModelType.EXPERIMENT, experimentId);
+// if(details == null) {
+// details = new Experiment();
+// details.setExperimentID(experimentId);
+// }
+// org.apache.airavata.model.workspace.experiment.ExperimentStatus status = new org.apache.airavata.model.workspace.experiment.ExperimentStatus();
+// status.setExperimentState(state);
+// status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
+// details.setExperimentStatus(status);
+// airavataRegistry.update(RegistryModelType.EXPERIMENT_STATUS, status, experimentId);
+//
+// }
+//
+// public void setup(Object... configurations) {
+// for (Object configuration : configurations) {
+// if (configuration instanceof Registry){
+// this.airavataRegistry=(Registry)configuration;
+// } else if (configuration instanceof MonitorPublisher){
+// this.monitorPublisher=(MonitorPublisher) configuration;
+// }
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
index 6ad55d2..84541fc 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataJobStatusUpdator.java
@@ -22,9 +22,10 @@ package org.apache.airavata.gfac.core.monitor;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.registry.cpi.CompositeIdentifier;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
index 26d49c0..5d8cc33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataTaskStatusUpdator.java
@@ -22,10 +22,11 @@ package org.apache.airavata.gfac.core.monitor;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.registry.cpi.Registry;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
index 5f6629c..cf88793 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/AiravataWorkflowNodeStatusUpdator.java
@@ -22,9 +22,10 @@ package org.apache.airavata.gfac.core.monitor;
import java.util.Calendar;
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.WorkflowNodeStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeState;
import org.apache.airavata.model.workspace.experiment.WorkflowNodeStatus;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 97bb49d..870ba26 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -20,22 +20,29 @@
*/
package org.apache.airavata.gfac.core.monitor;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.io.IOException;
+
import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AbstractActivityListener;
import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.common.utils.Constants;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
+import com.google.common.eventbus.Subscribe;
public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
- private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+ private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
private ZooKeeper zk;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/notification/MonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/notification/MonitorPublisher.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/notification/MonitorPublisher.java
index e44974b..98fdc19 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/notification/MonitorPublisher.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/notification/MonitorPublisher.java
@@ -1,47 +1,47 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.notification;
-
-import com.google.common.eventbus.EventBus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MonitorPublisher{
- private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
- private EventBus eventBus;
-
- public MonitorPublisher(EventBus eventBus) {
- this.eventBus = eventBus;
- }
-
- public void registerListener(Object listener) {
- eventBus.register(listener);
- }
-
- public void unregisterListener(Object listener) {
- eventBus.unregister(listener);
- }
-
- public void publish(Object o) {
- eventBus.post(o);
- }
-
-}
+///*
+// *
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing,
+// * software distributed under the License is distributed on an
+// * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// * KIND, either express or implied. See the License for the
+// * specific language governing permissions and limitations
+// * under the License.
+// *
+//*/
+//package org.apache.airavata.gfac.core.notification;
+//
+//import com.google.common.eventbus.EventBus;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//
+//public class MonitorPublisher{
+// private final static Logger logger = LoggerFactory.getLogger(MonitorPublisher.class);
+// private EventBus eventBus;
+//
+// public MonitorPublisher(EventBus eventBus) {
+// this.eventBus = eventBus;
+// }
+//
+// public void registerListener(Object listener) {
+// eventBus.register(listener);
+// }
+//
+// public void unregisterListener(Object listener) {
+// eventBus.unregister(listener);
+// }
+//
+// public void publish(Object o) {
+// eventBus.post(o);
+// }
+//
+//}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 6e3f59e..5dbb9d3 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -21,12 +21,11 @@
package org.apache.airavata.gfac.core.provider;
-import com.google.common.eventbus.EventBus;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
index 4d4a58e..daed08e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
@@ -20,11 +20,11 @@
*/
package org.apache.airavata.gfac.core.provider;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.model.workspace.experiment.JobDetails;
import org.apache.airavata.model.workspace.experiment.JobStatus;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
[2/3] moving about status listener classes out of hte gfac
Posted by sa...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
index 818e4be..9ba7e5d 100644
--- a/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
+++ b/modules/gfac/gfac-local/src/test/java/org/apache/airavata/core/gfac/services/impl/LocalProviderTest.java
@@ -20,15 +20,21 @@
*/
package org.apache.airavata.core.gfac.services.impl;
-import com.google.common.eventbus.EventBus;
+import java.io.File;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
-import org.apache.airavata.commons.gfac.type.*;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.commons.gfac.type.ActualParameter;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.ApplicationContext;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.local.handler.LocalDirectorySetupHandler;
import org.apache.airavata.gfac.local.provider.impl.LocalProvider;
@@ -45,10 +51,7 @@ import org.apache.commons.lang.SystemUtils;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-import java.io.File;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.eventbus.EventBus;
public class LocalProviderTest {
private JobExecutionContext jobExecutionContext;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
index 26f25b8..c754d3c 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/core/AiravataAbstractMonitor.java
@@ -20,7 +20,7 @@
*/
package org.apache.airavata.gfac.monitor.core;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
index 66f8467..c69b516 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -20,23 +20,20 @@
*/
package org.apache.airavata.gfac.monitor.handlers;
-import com.google.common.eventbus.EventBus;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
-
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.HPCMonitorID;
-import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
-import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
index fa6714d..4208c85 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/pull/qstat/HPCPullMonitor.java
@@ -20,20 +20,25 @@
*/
package org.apache.airavata.gfac.monitor.impl.pull.qstat;
-import com.google.common.eventbus.EventBus;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.cpi.GFac;
-import org.apache.airavata.gfac.core.monitor.ExperimentIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.TaskIdentity;
-import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangedEvent;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
-import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.HostMonitorData;
import org.apache.airavata.gfac.monitor.UserMonitorData;
import org.apache.airavata.gfac.monitor.core.PullMonitor;
@@ -41,19 +46,14 @@ import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.authentication.AuthenticationInfo;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.model.workspace.experiment.TaskState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.SSHHostType;
-import org.apache.openjpa.lib.log.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Timestamp;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
+import com.google.common.eventbus.EventBus;
/**
* This monitor is based on qstat command which can be run
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
index fa17c4e..010d3bc 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/AMQPMonitor.java
@@ -20,16 +20,20 @@
*/
package org.apache.airavata.gfac.monitor.impl.push.amqp;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.core.PushMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
@@ -38,9 +42,10 @@ import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
/**
* This is the implementation for AMQP based finishQueue, this uses
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
index b8ec08a..b992f85 100644
--- a/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
+++ b/modules/gfac/gfac-monitor/src/main/java/org/apache/airavata/gfac/monitor/impl/push/amqp/BasicConsumer.java
@@ -20,17 +20,18 @@
*/
package org.apache.airavata.gfac.monitor.impl.push.amqp;
-import com.rabbitmq.client.AMQP;
-import com.rabbitmq.client.Consumer;
-import com.rabbitmq.client.Envelope;
-import com.rabbitmq.client.ShutdownSignalException;
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.core.MessageParser;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.Envelope;
+import com.rabbitmq.client.ShutdownSignalException;
+
public class BasicConsumer implements Consumer {
private final static Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
index 9c3d08b..b17f9b8 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/AMQPMonitorTest.java
@@ -20,12 +20,18 @@
*/
package org.apache.airavata.job;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
@@ -33,18 +39,13 @@ import org.apache.airavata.gsi.ssh.api.authentication.GSIAuthenticationInfo;
import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
-import org.apache.airavata.gfac.monitor.impl.push.amqp.AMQPMonitor;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
public class AMQPMonitorTest {
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
index 3a762ec..6dd4d68 100644
--- a/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
+++ b/modules/gfac/gfac-monitor/src/test/java/org/apache/airavata/job/QstatMonitorTestWithMyProxyAuth.java
@@ -20,13 +20,19 @@
*/
package org.apache.airavata.job;
-import com.google.common.eventbus.EventBus;
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
-import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.HPCMonitorID;
+import org.apache.airavata.gfac.monitor.UserMonitorData;
+import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
import org.apache.airavata.gsi.ssh.api.Cluster;
import org.apache.airavata.gsi.ssh.api.SSHApiException;
import org.apache.airavata.gsi.ssh.api.ServerInfo;
@@ -35,17 +41,12 @@ import org.apache.airavata.gsi.ssh.api.job.JobDescriptor;
import org.apache.airavata.gsi.ssh.impl.PBSCluster;
import org.apache.airavata.gsi.ssh.impl.authentication.MyProxyAuthenticationInfo;
import org.apache.airavata.gsi.ssh.util.CommonUtils;
-import org.apache.airavata.gfac.monitor.UserMonitorData;
-import org.apache.airavata.gfac.monitor.impl.pull.qstat.HPCPullMonitor;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.junit.Assert;
import org.testng.annotations.Test;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
public class QstatMonitorTestWithMyProxyAuth {
private String myProxyUserName;
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowUtils.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowUtils.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowUtils.java
new file mode 100644
index 0000000..e21a2e2
--- /dev/null
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/WorkflowUtils.java
@@ -0,0 +1,56 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.engine;
+
+import java.util.List;
+
+import org.airavata.appcatalog.cpi.AppCatalogException;
+import org.airavata.appcatalog.cpi.ApplicationInterface;
+import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
+import org.apache.airavata.model.util.ExecutionType;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.workflow.catalog.WorkflowCatalogException;
+import org.apache.airavata.workflow.catalog.WorkflowCatalogFactory;
+
+public class WorkflowUtils {
+
+ public static ExecutionType getExecutionType(Experiment experiment){
+ try {
+ ApplicationInterface applicationInterface = AppCatalogFactory.getAppCatalog().getApplicationInterface();
+ List<String> allApplicationInterfaceIds = applicationInterface.getAllApplicationInterfaceIds();
+ String applicationId = experiment.getApplicationId();
+ if (allApplicationInterfaceIds.contains(applicationId)){
+ return ExecutionType.SINGLE_APP;
+ } else {
+ List<String> allWorkflows = WorkflowCatalogFactory.getWorkflowCatalog().getAllWorkflows();
+ if (allWorkflows.contains(applicationId)){
+ return ExecutionType.WORKFLOW;
+ }
+ }
+ } catch (AppCatalogException e) {
+ e.printStackTrace();
+ } catch (WorkflowCatalogException e) {
+ e.printStackTrace();
+ }
+ return ExecutionType.UNKNOWN;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
index 3fe82f0..98fc560 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/interpretor/WorkflowInterpreter.java
@@ -41,10 +41,13 @@ import javax.xml.xpath.XPathExpressionException;
import javax.xml.xpath.XPathFactory;
import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+import org.apache.airavata.common.utils.AbstractActivityListener;
import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangedEvent;
import org.apache.airavata.model.util.ExperimentModelUtil;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
+import org.apache.airavata.model.workspace.experiment.ExecutionUnit;
import org.apache.airavata.model.workspace.experiment.Experiment;
import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.model.workspace.experiment.TaskDetails;
@@ -62,6 +65,7 @@ import org.apache.airavata.workflow.engine.invoker.DynamicInvoker;
import org.apache.airavata.workflow.engine.invoker.Invoker;
import org.apache.airavata.workflow.engine.util.AmazonUtil;
import org.apache.airavata.workflow.engine.util.InterpreterUtil;
+import org.apache.airavata.workflow.engine.util.ProxyMonitorPublisher;
import org.apache.airavata.workflow.model.component.Component;
import org.apache.airavata.workflow.model.component.amazon.InstanceComponent;
import org.apache.airavata.workflow.model.component.amazon.TerminateInstanceComponent;
@@ -112,7 +116,9 @@ import org.xmlpull.infoset.XmlElement;
import xsul5.XmlConstants;
-public class WorkflowInterpreter {
+import com.google.common.eventbus.Subscribe;
+
+public class WorkflowInterpreter implements AbstractActivityListener{
private static final Logger log = LoggerFactory.getLogger(WorkflowInterpreter.class);
public static final String WORKFLOW_STARTED = "Workflow Running";
@@ -133,6 +139,9 @@ public class WorkflowInterpreter {
private OrchestratorService.Client orchestratorClient;
+ private Map<String, Node> awaitingTasks;
+ private Map<Node, Map<String,String>> nodeOutputData;
+
public static ThreadLocal<WorkflowInterpreterConfiguration> workflowInterpreterConfigurationThreadLocal =
new ThreadLocal<WorkflowInterpreterConfiguration>();
@@ -151,6 +160,9 @@ public class WorkflowInterpreter {
//TODO set act of provenance
nodeInstanceList=new HashMap<Node, WorkflowNodeDetails>();
setWorkflowInterpreterConfigurationThreadLocal(config);
+ awaitingTasks = new HashMap<String, Node>();
+ nodeOutputData = new HashMap<Node, Map<String,String>>();
+ ProxyMonitorPublisher.registerListener(this);
}
public WorkflowInterpreterInteractor getInteractor(){
@@ -361,7 +373,16 @@ public class WorkflowInterpreter {
}
private WorkflowNodeDetails createWorkflowNodeDetails(Node node) {
- WorkflowNodeDetails workflowNode = ExperimentModelUtil.createWorkflowNode(node.getName(), null);
+ WorkflowNodeDetails workflowNode = ExperimentModelUtil.createWorkflowNode(node.getName(), null);
+ ExecutionUnit executionUnit = ExecutionUnit.APPLICATION;
+ if (node instanceof InputNode){
+ executionUnit = ExecutionUnit.INPUT;
+ } else if (node instanceof OutputNode){
+ executionUnit = ExecutionUnit.OUTPUT;
+ } if (node instanceof WSNode){
+ executionUnit = ExecutionUnit.APPLICATION;
+ }
+ workflowNode.setExecutionUnit(executionUnit);
nodeInstanceList.put(node, workflowNode);
return workflowNode;
}
@@ -401,7 +422,7 @@ public class WorkflowInterpreter {
// next run
// even if the next run runs before the notification arrives
WorkflowNodeDetails workflowNodeDetails = createWorkflowNodeDetails(node);
- getRegistry().update(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeDetails, getExperiment().getExperimentID());
+ workflowNodeDetails.setNodeInstanceId((String)getRegistry().add(ChildDataType.WORKFLOW_NODE_DETAIL, workflowNodeDetails, getExperiment().getExperimentID()));
node.setState(NodeExecutionState.EXECUTING);
updateWorkflowNodeStatus(workflowNodeDetails, WorkflowNodeState.EXECUTING);
// OutputNode node = (OutputNode) outputNode;
@@ -606,11 +627,37 @@ public class WorkflowInterpreter {
}
protected void handleWSComponent(Node node) throws WorkflowException, TException, RegistryException {
- TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(experiment, nodeInstanceList.get(node));
- taskDetails.setTaskID(getRegistry().add(ChildDataType.TASK_DETAIL, taskDetails,nodeInstanceList.get(node).getNodeInstanceId()).toString());
+ TaskDetails taskDetails = createTaskDetails(node);
getOrchestratorClient().launchTask(taskDetails.getTaskID(), getCredentialStoreToken());
}
+
+ private void addToAwaitingTaskList(String taskId, Node node){
+ synchronized (awaitingTasks) {
+ awaitingTasks.put(taskId, node);
+ }
+ }
+
+ private boolean isTaskAwaiting(String taskId){
+ boolean result;
+ synchronized (awaitingTasks) {
+ result = awaitingTasks.containsKey(taskId);
+ }
+ return result;
+ }
+ private Node getAwaitingNodeForTask(String taskId){
+ Node node;
+ synchronized (awaitingTasks) {
+ node = awaitingTasks.get(taskId);
+ }
+ return node;
+ }
+
+ private void removeAwaitingTask(String taskId){
+ synchronized (awaitingTasks) {
+ awaitingTasks.remove(taskId);
+ }
+ }
private void handleDynamicComponent(Node node) throws WorkflowException {
DynamicComponent dynamicComponent = (DynamicComponent) node.getComponent();
String className = dynamicComponent.getClassName();
@@ -930,49 +977,57 @@ public class WorkflowInterpreter {
node.setState(NodeExecutionState.FINISHED);
}
+
private String createInvokerForEachSingleWSNode(Node foreachWSNode, WSComponent wsComponent) throws WorkflowException, RegistryException, TException {
- TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(experiment, nodeInstanceList.get(foreachWSNode));
- taskDetails.setTaskID(getRegistry().add(ChildDataType.TASK_DETAIL, taskDetails,nodeInstanceList.get(foreachWSNode).getNodeInstanceId()).toString());
+ TaskDetails taskDetails = createTaskDetails(foreachWSNode);
getOrchestratorClient().launchTask(taskDetails.getTaskID(), getCredentialStoreToken());
-
-// if (null == wsdlLocation) {
- // WorkflowInterpreter is no longer using gfac in service mode. we only support embedded mode.
-// if (gfacURLString.startsWith("https")) {
-// LeadContextHeader leadCtxHeader = null;
-// try {
-// leadCtxHeader = XBayaUtil.buildLeadContextHeader(this.getWorkflow(), this.getConfig().getConfiguration(), new MonitorConfiguration(this
-// .getConfig().getConfiguration().getBrokerURL(), this.config.getTopic(), true, this.getConfig().getConfiguration()
-// .getMessageBoxURL()), foreachWSNode.getID(), null);
-// } catch (URISyntaxException e) {
-// throw new WorkflowException(e);
-// }
-// invoker = new WorkflowInvokerWrapperForGFacInvoker(portTypeQName, gfacURLString, this.getConfig().getConfiguration().getMessageBoxURL()
-// .toString(), leadCtxHeader, this.config.getNotifier().createServiceNotificationSender(foreachWSNode.getID()));
-// } else {
-
-// if (this.config.isGfacEmbeddedMode()) {
-// invoker = new EmbeddedGFacInvoker(portTypeQName, WSDLUtil.wsdlDefinitions5ToWsdlDefintions3(((WSNode)foreachWSNode).getComponent().getWSDL()), foreachWSNode.getID(),
-// this.config.getMessageBoxURL().toASCIIString(), this.config.getMessageBrokerURL().toASCIIString(), this.config.getNotifier(),
-// this.config.getTopic(), this.config.getAiravataAPI(), portTypeQName.getLocalPart(), this.config.getConfiguration());
-// } else {
-// invoker = new GenericInvoker(portTypeQName, WSDLUtil.wsdlDefinitions5ToWsdlDefintions3(((WSNode)foreachWSNode).getComponent().getWSDL()), foreachWSNode.getID(),
-// this.config.getMessageBoxURL().toASCIIString(), gfacURLString, this.config.getNotifier());
-// }
-// }
-
-// } else {
-// if (wsdlLocation.endsWith("/")) {
-// wsdlLocation = wsdlLocation.substring(0, wsdlLocation.length() - 1);
-// }
-// if (!wsdlLocation.endsWith("?wsdl")) {
-// wsdlLocation += "?wsdl";
-// }
-// invoker = new GenericInvoker(portTypeQName, wsdlLocation, foreachWSNode.getID(), this.getConfig().getConfiguration().getMessageBoxURL().toString(),
-// gfacURLString, this.config.getNotifier());
-// }
return taskDetails.getTaskID();
}
+ private void setupNodeDetailsInput(Node node, WorkflowNodeDetails nodeDetails){
+ List<DataPort> inputPorts = node.getInputPorts();
+ for (DataPort dataPort : inputPorts) {
+ Map<String, String> outputData = nodeOutputData.get(dataPort.getFromNode());
+ String portInputValue = outputData.get(dataPort.getName());
+ DataObjectType elem = new DataObjectType();
+ elem.setKey(dataPort.getName());
+ elem.setValue(portInputValue);
+ nodeDetails.addToNodeInputs(elem);
+ }
+ try {
+ getRegistry().update(RegistryModelType.WORKFLOW_NODE_DETAIL, nodeDetails, nodeDetails.getNodeInstanceId());
+ } catch (RegistryException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void setupNodeDetailsOutput(Node node){
+ WorkflowNodeDetails nodeDetails = nodeInstanceList.get(node);
+ List<DataPort> outputPorts = node.getOutputPorts();
+ Map<String, String> outputData = nodeOutputData.get(node);
+ for (DataPort dataPort : outputPorts) {
+ String portInputValue = outputData.get(dataPort.getName());
+ DataObjectType elem = new DataObjectType();
+ elem.setKey(dataPort.getName());
+ elem.setValue(portInputValue);
+ nodeDetails.addToNodeOutputs(elem);
+ }
+ try {
+ getRegistry().update(RegistryModelType.WORKFLOW_NODE_DETAIL, nodeDetails, nodeDetails.getNodeInstanceId());
+ } catch (RegistryException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private TaskDetails createTaskDetails(Node node)
+ throws RegistryException {
+ setupNodeDetailsInput(node, nodeInstanceList.get(node));
+ TaskDetails taskDetails = ExperimentModelUtil.cloneTaskFromWorkflowNodeDetails(experiment, nodeInstanceList.get(node));
+ taskDetails.setTaskID(getRegistry().add(ChildDataType.TASK_DETAIL, taskDetails,nodeInstanceList.get(node).getNodeInstanceId()).toString());
+ addToAwaitingTaskList(taskDetails.getTaskID(), node);
+ return taskDetails;
+ }
+
private void runInThread(final LinkedList<String> listOfValues, ForEachNode forEachNode, final Node middleNode, List<Node> endForEachNodes,
Map<Node, Invoker> tempInvoker, AtomicInteger counter, final Integer[] inputNumber) throws WorkflowException, RegistryException, TException {
@@ -1326,83 +1381,55 @@ public class WorkflowInterpreter {
this.credentialStoreToken = credentialStoreToken;
}
- public static class TmpRegistry implements Registry{
- public TmpRegistry() throws RegistryException {
- }
-
- @Override
- public Object add(ChildDataType dataType, Object newObjectToAdd,
- Object dependentIdentifier) throws RegistryException {
- return UUID.randomUUID().toString();
- }
+ @Override
+ public void setup(Object... configurations) {
+ // TODO Auto-generated method stub
- @Override
- public void update(RegistryModelType dataType, Object identifier,
- String fieldName, Object value) throws RegistryException {
- }
-
- @Override
- public Object add(ParentDataType dataType, Object newObjectToAdd)
- throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void update(RegistryModelType dataType,
- Object newObjectToUpdate, Object identifier)
- throws RegistryException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public Object get(RegistryModelType dataType, Object identifier)
- throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public List<Object> get(RegistryModelType dataType, String fieldName,
- Object value) throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public List<Object> search(RegistryModelType dataType,
- Map<String, String> filters) throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Object getValue(RegistryModelType dataType, Object identifier,
- String field) throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public List<String> getIds(RegistryModelType dataType,
- String fieldName, Object value) throws RegistryException {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public void remove(RegistryModelType dataType, Object identifier)
- throws RegistryException {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public boolean isExist(RegistryModelType dataType, Object identifier)
- throws RegistryException {
- // TODO Auto-generated method stub
- return false;
- }
}
+
+ @Subscribe
+ public void taskStatusChanged(TaskStatusChangedEvent taskStatus){
+ String taskId = taskStatus.getIdentity().getTaskId();
+ if (isTaskAwaiting(taskId)){
+ WorkflowNodeState state=WorkflowNodeState.UNKNOWN;
+ switch(taskStatus.getState()){
+ case CANCELED:
+ ; break;
+ case COMPLETED:
+ //task is completed
+ try {
+ TaskDetails task = (TaskDetails)getRegistry().get(RegistryModelType.TASK_DETAIL, taskId);
+ List<DataObjectType> applicationOutputs = task.getApplicationOutputs();
+ Map<String, String> outputData = new HashMap<String, String>();
+ Node node = getAwaitingNodeForTask(taskId);
+ for (DataObjectType outputObj : applicationOutputs) {
+ List<DataPort> outputPorts = node.getOutputPorts();
+ for (DataPort dataPort : outputPorts) {
+ if (dataPort.getName().equals(outputObj.getKey())){
+ outputData.put(outputObj.getKey(), outputObj.getValue());
+ }
+ }
+ }
+ nodeOutputData.put(node, outputData);
+ setupNodeDetailsOutput(node);
+ } catch (RegistryException e) {
+ e.printStackTrace();
+ }
+ break;
+ case CONFIGURING_WORKSPACE:
+ break;
+ case FAILED:
+ break;
+ case EXECUTING: case WAITING: case PRE_PROCESSING: case POST_PROCESSING: case OUTPUT_DATA_STAGING: case INPUT_DATA_STAGING:
+ break;
+ case STARTED:
+ break;
+ case CANCELING:
+ break;
+ default:
+ break;
+ }
+ }
+
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
new file mode 100644
index 0000000..507e1bb
--- /dev/null
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/util/ProxyMonitorPublisher.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.airavata.workflow.engine.util;
+
+import org.apache.airavata.common.utils.AbstractActivityListener;
+import org.apache.airavata.common.utils.MonitorPublisher;
+
+public class ProxyMonitorPublisher implements AbstractActivityListener{
+
+ private static Object[] setupConfigurations;
+
+ @Override
+ public void setup(Object... configurations) {
+ setupConfigurations=configurations;
+ }
+
+ private static MonitorPublisher getPublisher(){
+ if (setupConfigurations!=null) {
+ for (Object configuration : setupConfigurations) {
+ if (configuration instanceof MonitorPublisher){
+ return (MonitorPublisher) configuration;
+ }
+ }
+ }
+ return null;
+ }
+
+ public static void registerListener(Object listener) {
+ if (listener instanceof AbstractActivityListener){
+ ((AbstractActivityListener) listener).setup(setupConfigurations);
+ }
+ getPublisher().registerListener(listener);
+ }
+
+ public static void unregisterListener(Object listener) {
+ getPublisher().unregisterListener(listener);
+ }
+
+ public void publish(Object o) {
+ getPublisher().publish(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/component/ws/WSComponent.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/component/ws/WSComponent.java b/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/component/ws/WSComponent.java
index ac86b05..606c88b 100644
--- a/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/component/ws/WSComponent.java
+++ b/modules/workflow-model/workflow-model-core/src/main/java/org/apache/airavata/workflow/model/component/ws/WSComponent.java
@@ -204,7 +204,7 @@ public class WSComponent extends Component {
buf.append("<h1>Application: " + getName() + "</h1>\n");
buf.append("<h2>Description:</h2>\n");
- buf.append(this.description==null?"":this.description);
+ buf.append(application.getApplicationId()+"<br />"+(this.description==null?"":this.description));
if (getInputPorts().size()>0) {
buf.append("<h3>Input parameter(s)</h3>\n");
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/dialogs/registry/RegistryWindow.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/dialogs/registry/RegistryWindow.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/dialogs/registry/RegistryWindow.java
index 61040ac..8bcc2bc 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/dialogs/registry/RegistryWindow.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/dialogs/registry/RegistryWindow.java
@@ -141,7 +141,7 @@ public class RegistryWindow {
XBayaLabel serverAddressLabel = new XBayaLabel("Server Address", this.serverTextField);
XBayaLabel serverPortLabel = new XBayaLabel("Server Port", this.portTextField);
XBayaLabel gatewayNameLabel = new XBayaLabel("Gateway Name", this.gatewayNameTextField);
- XBayaLabel gatewayUserLabel = new XBayaLabel("Gateway TUser", this.usernameTextField);
+ XBayaLabel gatewayUserLabel = new XBayaLabel("Gateway User", this.usernameTextField);
serviceTypeModel = new DefaultComboBoxModel(ThriftServiceType.values());
serviceTypeModel.setSelectedItem(getServiceType());
this.serviceTypeCombo = new XBayaComboBox(serviceTypeModel);
@@ -188,7 +188,7 @@ public class RegistryWindow {
buttonPanel.add(cancelButton);
buttonPanel.getSwingComponent().setBorder(BorderFactory.createEtchedBorder());
- this.dialog = new XBayaDialog(this.engine.getGUI(), "Configure Airavata Registry", infoPanel, buttonPanel);
+ this.dialog = new XBayaDialog(this.engine.getGUI(), "Configure Airavata Thrift Service", infoPanel, buttonPanel);
this.dialog.setDefaultButton(okButton);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/92838e4a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/experiment/WorkflowInterpreterLaunchWindow.java
----------------------------------------------------------------------
diff --git a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/experiment/WorkflowInterpreterLaunchWindow.java b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/experiment/WorkflowInterpreterLaunchWindow.java
index 2671d30..fd6cb46 100644
--- a/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/experiment/WorkflowInterpreterLaunchWindow.java
+++ b/modules/xbaya-gui/src/main/java/org/apache/airavata/xbaya/ui/experiment/WorkflowInterpreterLaunchWindow.java
@@ -22,7 +22,6 @@
package org.apache.airavata.xbaya.ui.experiment;
import java.awt.event.ActionEvent;
-import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -30,10 +29,6 @@ import java.util.UUID;
//import org.apache.airavata.registry.api.AiravataRegistry2;
-
-
-
-
import javax.swing.AbstractAction;
import javax.swing.BorderFactory;
import javax.swing.JButton;
@@ -43,9 +38,14 @@ import javax.swing.JPanel;
import javax.swing.border.EtchedBorder;
import javax.xml.namespace.QName;
+import org.apache.airavata.api.Airavata.Client;
import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.common.utils.StringUtil;
import org.apache.airavata.common.utils.XMLUtil;
+import org.apache.airavata.model.error.AiravataClientConnectException;
+import org.apache.airavata.model.error.AiravataClientException;
+import org.apache.airavata.model.error.AiravataSystemException;
+import org.apache.airavata.model.error.InvalidRequestException;
+import org.apache.airavata.model.workspace.Project;
import org.apache.airavata.model.workspace.experiment.DataObjectType;
import org.apache.airavata.model.workspace.experiment.DataType;
import org.apache.airavata.model.workspace.experiment.Experiment;
@@ -57,25 +57,21 @@ import org.apache.airavata.workflow.engine.interpretor.WorkflowInterpreterConfig
import org.apache.airavata.workflow.model.exceptions.WorkflowException;
import org.apache.airavata.workflow.model.graph.system.InputNode;
import org.apache.airavata.workflow.model.graph.util.GraphUtil;
-import org.apache.airavata.workflow.model.graph.ws.WSNode;
-import org.apache.airavata.workflow.model.ode.ODEClient;
import org.apache.airavata.workflow.model.wf.Workflow;
import org.apache.airavata.ws.monitor.MonitorConfiguration;
+import org.apache.airavata.xbaya.ThriftClientData;
+import org.apache.airavata.xbaya.ThriftServiceType;
import org.apache.airavata.xbaya.XBayaEngine;
-import org.apache.airavata.xbaya.graph.controller.NodeController;
-import org.apache.airavata.xbaya.jython.script.JythonScript;
import org.apache.airavata.xbaya.ui.dialogs.XBayaDialog;
-import org.apache.airavata.xbaya.ui.graph.ws.WSNodeGUI;
import org.apache.airavata.xbaya.ui.utils.ErrorMessages;
import org.apache.airavata.xbaya.ui.widgets.GridPanel;
import org.apache.airavata.xbaya.ui.widgets.XBayaLabel;
import org.apache.airavata.xbaya.ui.widgets.XBayaTextField;
+import org.apache.airavata.xbaya.util.XBayaUtil;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.infoset.XmlElement;
-import org.xmlpull.v1.builder.XmlInfosetBuilder;
-
-import xsul.XmlConstants;
public class WorkflowInterpreterLaunchWindow {
@@ -101,7 +97,7 @@ public class WorkflowInterpreterLaunchWindow {
private XBayaTextField instanceNameTextField;
- protected final static XmlInfosetBuilder builder = XmlConstants.BUILDER;
+// protected final static XmlInfosetBuilder builder = XmlConstants.BUILDER;
/**
* Constructs a WorkflowInterpreterLaunchWindow.
@@ -228,7 +224,23 @@ public class WorkflowInterpreterLaunchWindow {
JButton okButton = new JButton("Run");
okButton.addActionListener(new AbstractAction() {
public void actionPerformed(ActionEvent e) {
- execute();
+ try {
+ execute();
+ } catch (AiravataClientConnectException e1) {
+ e1.printStackTrace();
+ } catch (InvalidRequestException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (AiravataClientException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (AiravataSystemException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } catch (TException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ }
}
});
@@ -247,10 +259,24 @@ public class WorkflowInterpreterLaunchWindow {
this.dialog.setDefaultButton(okButton);
}
- private void execute() {
- final List<String> arguments = new ArrayList<String>();
-
- String topic = UUID.randomUUID().toString();//this.topicTextField.getText();
+ private void execute() throws AiravataClientConnectException, InvalidRequestException, AiravataClientException, AiravataSystemException, TException {
+
+ if (!(engine.getGUI().setupThriftClientData(ThriftServiceType.API_SERVICE) && engine.getGUI().setupThriftClientData(ThriftServiceType.WORKFLOW_SERVICE))){
+ hide();
+ return;
+ }
+
+ ThriftClientData thriftClientData = engine.getConfiguration().getThriftClientData(ThriftServiceType.API_SERVICE);
+ Client airavataClient = XBayaUtil.getAiravataClient(thriftClientData);
+
+ org.apache.airavata.api.workflow.Workflow.Client workflowClient = XBayaUtil.getWorkflowClient(engine.getConfiguration().getThriftClientData(ThriftServiceType.WORKFLOW_SERVICE));
+
+ Workflow workflowClone = workflow.clone();
+ workflowClone.setName(workflowClone.getName()+UUID.randomUUID().toString());
+ org.apache.airavata.model.Workflow w = new org.apache.airavata.model.Workflow();
+ w.setName(workflowClone.getName());
+ w.setGraph(XMLUtil.xmlElementToString(workflowClone.toXML()));
+ w.setTemplateId(workflowClient.registerWorkflow(w));
String instanceName = this.instanceNameTextField.getText();
if (instanceName.trim().equals("")){
JOptionPane.showMessageDialog(engine.getGUI().getFrame(),
@@ -264,38 +290,22 @@ public class WorkflowInterpreterLaunchWindow {
if (!instanceNameTextField.getText().equals("")){
this.instanceNameTextField.setText("");
}
- final String instanceNameFinal=instanceName;
- if (topic.length() == 0) {
- this.engine.getGUI().getErrorWindow().error(ErrorMessages.TOPIC_EMPTY_ERROR);
- return;
- }
// Use topic as a base of workflow instance ID so that the monitor can
// find it.
- URI workfowInstanceID = URI.create(StringUtil.convertToJavaIdentifier(topic));
- this.workflow.setGPELInstanceID(workfowInstanceID);
-
- MonitorConfiguration notifConfig = this.engine.getMonitor().getConfiguration();
- engine.getMonitor().resetEventData();
- notifConfig.setTopic(topic);
- arguments.add("-" + JythonScript.TOPIC_VARIABLE);
- arguments.add(topic);
- Collection<WSNode> wsNodes = GraphUtil.getWSNodes(this.engine.getGUI().getWorkflow().getGraph());
- for (WSNode node : wsNodes) {
- ((WSNodeGUI) NodeController.getGUI(node)).setInteractiveMode(false);
- }
-
- // TODO error check for user inputs
+ Project project = new Project();
+ project.setName("project1");
+ project.setOwner(thriftClientData.getUsername());
+ project.setProjectID(airavataClient.createProject(project));
final List<InputNode> inputNodes = GraphUtil.getInputNodes(this.workflow.getGraph());
- builder.newFragment("inputs");
- new ODEClient();
+// builder.newFragment("inputs");
+// new ODEClient();
final Experiment experiment = new Experiment();
- experiment.setApplicationId(workflow.getName());
- experiment.setExperimentID("some-id");
- experiment.setProjectID("prj1");
- experiment.setName("some-exp");
- experiment.setUserName("me");
+ experiment.setApplicationId(w.getTemplateId());
+ experiment.setName(instanceName);
+ experiment.setProjectID(project.getProjectID());
+ experiment.setUserName(thriftClientData.getUsername());
for (int i = 0; i < inputNodes.size(); i++) {
InputNode inputNode = inputNodes.get(i);
XBayaTextField parameterTextField = this.parameterTextFields.get(i);
@@ -308,8 +318,9 @@ public class WorkflowInterpreterLaunchWindow {
elem.setValue(value);
experiment.addToExperimentInputs(elem );
}
-
-
+
+ experiment.setExperimentID(airavataClient.createExperiment(experiment));
+ airavataClient.launchExperiment(experiment.getExperimentID(), "testToken");
// final String workflowInterpreterUrl = this.workflowInterpreterTextField.getText();
// if (null != workflowInterpreterUrl && !"".equals(workflowInterpreterUrl)) {
// try {
@@ -327,20 +338,20 @@ public class WorkflowInterpreterLaunchWindow {
// this.engine.getGUI().getErrorWindow().error(e);
// }
// }
- this.engine.getConfiguration().setTopic(topic);
+// this.engine.getConfiguration().setTopic(topic);
new Thread() {
@Override
public void run() {
- WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter(experiment, null, new WorkflowInterpreterConfiguration(workflow),getOrchestratorClient());
- try {
- workflowInterpreter.scheduleDynamically();
- } catch (WorkflowException e) {
- e.printStackTrace();
- } catch (RegistryException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
+// WorkflowInterpreter workflowInterpreter = new WorkflowInterpreter(experiment, null, new WorkflowInterpreterConfiguration(workflow),getOrchestratorClient());
+// try {
+// workflowInterpreter.scheduleDynamically();
+// } catch (WorkflowException e) {
+// e.printStackTrace();
+// } catch (RegistryException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
// try {
// List<WorkflowInput> workflowInputs=new ArrayList<WorkflowInput>();
// for (int i = 0; i < inputNodes.size(); i++) {