You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 22:41:31 UTC

airavata git commit: moved stream outptu logic after jobsubmission

Repository: airavata
Updated Branches:
  refs/heads/develop 7b1888fdc -> 69ecb6801


moved stream outptu logic after jobsubmission


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/69ecb680
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/69ecb680
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/69ecb680

Branch: refs/heads/develop
Commit: 69ecb680123cea1f86c4b562ef7aff9c978e14e4
Parents: 7b1888f
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 16:41:23 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 16:41:23 2015 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 74 +++++++++++---------
 1 file changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/69ecb680/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 431aa1a..9669d99 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -258,6 +258,46 @@ public class GFacEngineImpl implements GFacEngine {
                         GFacUtils.handleProcessInterrupt(processContext);
                         return;
                     }
+
+                    JobStatus jobStatus = processContext.getJobModel().getJobStatus();
+                    if (jobStatus != null && (jobStatus.getJobState() == JobState.SUBMITTED
+                            || jobStatus.getJobState() == JobState.QUEUED || jobStatus.getJobState() == JobState.ACTIVE)) {
+
+                        List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+                        if (processOutputs != null && !processOutputs.isEmpty()){
+                            for (OutputDataObjectType output : processOutputs){
+                                try {
+                                    if (output.isOutputStreaming()){
+                                        TaskModel streamingTaskModel = new TaskModel();
+                                        streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
+                                        streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+                                        streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                                        streamingTaskModel.setParentProcessId(processContext.getProcessId());
+                                        TaskContext streamingTaskContext = getTaskContext(processContext);
+                                        streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
+                                        DataStagingTaskModel submodel = new DataStagingTaskModel();
+                                        submodel.setType(DataStageType.OUPUT);
+                                        submodel.setProcessOutput(output);
+                                        URI source = new URI(processContext.getDataMovementProtocol().name(),
+                                                processContext.getComputeResourcePreference().getLoginUserName(),
+                                                processContext.getComputeResourceDescription().getHostName(),
+                                                22,
+                                                processContext.getWorkingDir() + output.getValue(), null, null);
+                                        submodel.setSource(source.getPath());
+                                        submodel.setDestination("dummy://temp/file/location");
+                                        streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+                                        String streamTaskId = (String) processContext.getExperimentCatalog()
+                                                .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
+                                        streamingTaskModel.setTaskId(streamTaskId);
+                                        streamingTaskContext.setTaskModel(streamingTaskModel);
+                                        executeDataStreaming(taskContext, processContext.isRecovery());
+                                    }
+                                } catch (URISyntaxException | TException | RegistryException e) {
+                                    log.error("Error while streaming output " + output.getValue());
+                                }
+                            }
+                        }
+                    }
                     break;
 
                 case MONITORING:
@@ -277,40 +317,6 @@ public class GFacEngineImpl implements GFacEngine {
 
             }
 
-            List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
-            if (processOutputs != null && !processOutputs.isEmpty()){
-                for (OutputDataObjectType output : processOutputs){
-                    try {
-                        if (output.isOutputStreaming()){
-                            TaskModel streamingTaskModel = new TaskModel();
-                            streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
-                            streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
-                            streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
-                            streamingTaskModel.setParentProcessId(processContext.getProcessId());
-                            TaskContext streamingTaskContext = getTaskContext(processContext);
-                            streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
-                            DataStagingTaskModel submodel = new DataStagingTaskModel();
-                            submodel.setType(DataStageType.OUPUT);
-                            submodel.setProcessOutput(output);
-                            URI source = new URI(processContext.getDataMovementProtocol().name(),
-                                    processContext.getComputeResourcePreference().getLoginUserName(),
-                                    processContext.getComputeResourceDescription().getHostName(),
-                                    22,
-                                    processContext.getWorkingDir() + output.getValue(), null, null);
-                            submodel.setSource(source.getPath());
-                            submodel.setDestination("dummy://temp/file/location");
-                            streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
-                            String streamTaskId = (String) processContext.getExperimentCatalog()
-                                    .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
-                            streamingTaskModel.setTaskId(streamTaskId);
-                            streamingTaskContext.setTaskModel(streamingTaskModel);
-                            executeDataStreaming(taskContext, processContext.isRecovery());
-                        }
-                    } catch (URISyntaxException | TException | RegistryException e) {
-                        log.error("Error while streaming output " + output.getValue());
-                    }
-                }
-            }
 
             if (processContext.isPauseTaskExecution()) {
                 return;   // If any task put processContext to wait, the same task must continue processContext execution.