You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/11/11 21:53:11 UTC

airavata git commit: Fixed issues with streaming datamodel

Repository: airavata
Updated Branches:
  refs/heads/develop a9520e0cf -> 7b1888fdc


Fixed issues with streaming datamodel


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/7b1888fd
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/7b1888fd
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/7b1888fd

Branch: refs/heads/develop
Commit: 7b1888fdcf6746d70972e3981d79b53607bf9219
Parents: a9520e0
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Wed Nov 11 15:53:07 2015 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Wed Nov 11 15:53:07 2015 -0500

----------------------------------------------------------------------
 .../airavata/gfac/impl/GFacEngineImpl.java      | 34 ++++++++++++++++----
 1 file changed, 28 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/7b1888fd/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index 95a60fe..431aa1a 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -280,12 +280,34 @@ public class GFacEngineImpl implements GFacEngine {
             List<OutputDataObjectType> processOutputs = processContext.getProcessModel().getProcessOutputs();
             if (processOutputs != null && !processOutputs.isEmpty()){
                 for (OutputDataObjectType output : processOutputs){
-                    if (output.isOutputStreaming()){
-                        status = new ProcessStatus(ProcessState.EXECUTING);
-                        status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
-                        processContext.setProcessStatus(status);
-                        GFacUtils.saveAndPublishProcessStatus(processContext);
-                        executeDataStreaming(taskContext, processContext.isRecovery());
+                    try {
+                        if (output.isOutputStreaming()){
+                            TaskModel streamingTaskModel = new TaskModel();
+                            streamingTaskModel.setTaskType(TaskTypes.OUTPUT_FETCHING);
+                            streamingTaskModel.setTaskStatus(new TaskStatus(TaskState.CREATED));
+                            streamingTaskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+                            streamingTaskModel.setParentProcessId(processContext.getProcessId());
+                            TaskContext streamingTaskContext = getTaskContext(processContext);
+                            streamingTaskContext.setTaskStatus(new TaskStatus(TaskState.CREATED));
+                            DataStagingTaskModel submodel = new DataStagingTaskModel();
+                            submodel.setType(DataStageType.OUPUT);
+                            submodel.setProcessOutput(output);
+                            URI source = new URI(processContext.getDataMovementProtocol().name(),
+                                    processContext.getComputeResourcePreference().getLoginUserName(),
+                                    processContext.getComputeResourceDescription().getHostName(),
+                                    22,
+                                    processContext.getWorkingDir() + output.getValue(), null, null);
+                            submodel.setSource(source.getPath());
+                            submodel.setDestination("dummy://temp/file/location");
+                            streamingTaskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+                            String streamTaskId = (String) processContext.getExperimentCatalog()
+                                    .add(ExpCatChildDataType.TASK, streamingTaskModel, processContext.getProcessId());
+                            streamingTaskModel.setTaskId(streamTaskId);
+                            streamingTaskContext.setTaskModel(streamingTaskModel);
+                            executeDataStreaming(taskContext, processContext.isRecovery());
+                        }
+                    } catch (URISyntaxException | TException | RegistryException e) {
+                        log.error("Error while streaming output " + output.getValue());
                     }
                 }
             }