You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 21:53:11 UTC
airavata git commit: Fixed issues with streaming datamodel
Repository: airavata
Updated Branches:
refs/heads/develop a9520e0cf -> 7b1888fdc
Fixed issues with streaming datamodel
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7b1888fd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7b1888fd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7b1888fd
Branch: refs/heads/develop
Commit: 7b1888fdcf6746d70972e3981d79b53607bf9219
Parents: a9520e0
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 15:53:07 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 15:53:07 2015 -0500
----------------------------------------------------------------------
.../airavata/gfac/impl/GFacEngineImpl.java | 34 ++++++++++++++++----
1 file changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/7b1888fd/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 95a60fe..431aa1a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -280,12 +280,34 @@ public class GFacEngineImpl implements GFacEngine {
List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
if (processOutputs != null && !processOutputs.isEmpty()){
for (OutputDataObjectType output : processOutputs){
- if (output.isOutputStreaming()){
- status = new ProcessStatus(ProcessState.EXECUTING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- executeDataStreaming(taskContext, processContext.isRecovery());
+ try {
+ if (output.isOutputStreaming()){
+ TaskModel streamingTaskModel = new TaskModel();
+ streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
+ streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ streamingTaskModel.setParentProcessId(processContext.getProcessId());
+ TaskContext streamingTaskContext = getTaskContext(processContext);
+ streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setType(DataStageType.OUPUT);
+ submodel.setProcessOutput(output);
+ URI source = new URI(processContext.getDataMovementProtocol().name(),
+ processContext.getComputeResourcePreference().getLoginUserName(),
+ processContext.getComputeResourceDescription().getHostName(),
+ 22,
+ processContext.getWorkingDir() + output.getValue(), null, null);
+ submodel.setSource(source.getPath());
+ submodel.setDestination("dummy://temp/file/location");
+ streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ String streamTaskId = (String) processContext.getExperimentCatalog()
+ .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
+ streamingTaskModel.setTaskId(streamTaskId);
+ streamingTaskContext.setTaskModel(streamingTaskModel);
+ executeDataStreaming(taskContext, processContext.isRecovery());
+ }
+ } catch (URISyntaxException | TException | RegistryException e) {
+ log.error("Error while streaming output " + output.getValue());
}
}
}