You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 22:41:31 UTC
airavata git commit: moved stream outptu logic after jobsubmission
Repository: airavata
Updated Branches:
refs/heads/develop 7b1888fdc -> 69ecb6801
moved stream outptu logic after jobsubmission
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/69ecb680
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/69ecb680
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/69ecb680
Branch: refs/heads/develop
Commit: 69ecb680123cea1f86c4b562ef7aff9c978e14e4
Parents: 7b1888f
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 16:41:23 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 16:41:23 2015 -0500
----------------------------------------------------------------------
.../airavata/gfac/impl/GFacEngineImpl.java | 74 +++++++++++---------
1 file changed, 40 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/69ecb680/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 431aa1a..9669d99 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -258,6 +258,46 @@ public class GFacEngineImpl implements GFacEngine {
GFacUtils.handleProcessInterrupt(processContext);
return;
}
+
+ JobStatus jobStatus = processContext.getJobModel().getJobStatus();
+ if (jobStatus != null && (jobStatus.getJobState() == JobState.SUBMITTED
+ || jobStatus.getJobState() == JobState.QUEUED || jobStatus.getJobState() == JobState.ACTIVE)) {
+
+ List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
+ if (processOutputs != null && !processOutputs.isEmpty()){
+ for (OutputDataObjectType output : processOutputs){
+ try {
+ if (output.isOutputStreaming()){
+ TaskModel streamingTaskModel = new TaskModel();
+ streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
+ streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ streamingTaskModel.setParentProcessId(processContext.getProcessId());
+ TaskContext streamingTaskContext = getTaskContext(processContext);
+ streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setType(DataStageType.OUPUT);
+ submodel.setProcessOutput(output);
+ URI source = new URI(processContext.getDataMovementProtocol().name(),
+ processContext.getComputeResourcePreference().getLoginUserName(),
+ processContext.getComputeResourceDescription().getHostName(),
+ 22,
+ processContext.getWorkingDir() + output.getValue(), null, null);
+ submodel.setSource(source.getPath());
+ submodel.setDestination("dummy://temp/file/location");
+ streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ String streamTaskId = (String) processContext.getExperimentCatalog()
+ .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
+ streamingTaskModel.setTaskId(streamTaskId);
+ streamingTaskContext.setTaskModel(streamingTaskModel);
+ executeDataStreaming(taskContext, processContext.isRecovery());
+ }
+ } catch (URISyntaxException | TException | RegistryException e) {
+ log.error("Error while streaming output " + output.getValue());
+ }
+ }
+ }
+ }
break;
case MONITORING:
@@ -277,40 +317,6 @@ public class GFacEngineImpl implements GFacEngine {
}
- List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
- if (processOutputs != null && !processOutputs.isEmpty()){
- for (OutputDataObjectType output : processOutputs){
- try {
- if (output.isOutputStreaming()){
- TaskModel streamingTaskModel = new TaskModel();
- streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
- streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
- streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
- streamingTaskModel.setParentProcessId(processContext.getProcessId());
- TaskContext streamingTaskContext = getTaskContext(processContext);
- streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
- DataStagingTaskModel submodel = new DataStagingTaskModel();
- submodel.setType(DataStageType.OUPUT);
- submodel.setProcessOutput(output);
- URI source = new URI(processContext.getDataMovementProtocol().name(),
- processContext.getComputeResourcePreference().getLoginUserName(),
- processContext.getComputeResourceDescription().getHostName(),
- 22,
- processContext.getWorkingDir() + output.getValue(), null, null);
- submodel.setSource(source.getPath());
- submodel.setDestination("dummy://temp/file/location");
- streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
- String streamTaskId = (String) processContext.getExperimentCatalog()
- .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
- streamingTaskModel.setTaskId(streamTaskId);
- streamingTaskContext.setTaskModel(streamingTaskModel);
- executeDataStreaming(taskContext, processContext.isRecovery());
- }
- } catch (URISyntaxException | TException | RegistryException e) {
- log.error("Error while streaming output " + output.getValue());
- }
- }
- }
if (processContext.isPauseTaskExecution()) {
return; // If any task put processContext to wait, the same task must continue processContext execution.