You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2016/03/11 07:57:22 UTC
[06/16] airavata git commit: Added Archive Type as another data
staging task type
Added Archive Type as another data staging task type
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/75fb3b3c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/75fb3b3c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/75fb3b3c
Branch: refs/heads/master
Commit: 75fb3b3cdd597654832df16cb5f75a6f35db1a9e
Parents: 5f29e9f
Author: Shameera Rathnayaka <sh...@gmail.com>
Authored: Fri Mar 4 21:30:40 2016 -0500
Committer: Shameera Rathnayaka <sh...@gmail.com>
Committed: Fri Mar 4 21:30:40 2016 -0500
----------------------------------------------------------------------
.../resources/lib/airavata/task_model_types.cpp | 8 ++-
.../resources/lib/airavata/task_model_types.h | 3 +-
.../resources/lib/Airavata/Model/Task/Types.php | 2 +
.../lib/apache/airavata/model/task/ttypes.py | 3 +
.../airavata/model/task/DataStageType.java | 5 +-
.../model/util/ExperimentModelUtil.java | 1 +
.../gfac/core/context/ProcessContext.java | 4 ++
.../airavata/gfac/impl/GFacEngineImpl.java | 36 +++++++-----
.../apache/airavata/gfac/impl/GFacWorker.java | 2 +-
.../airavata/gfac/impl/task/ArchiveTask.java | 53 +++++++++++++++++
.../cpi/impl/SimpleOrchestratorImpl.java | 60 +++++++++++---------
.../experiment-catalog-models/task_model.thrift | 3 +-
12 files changed, 132 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.cpp
index dba3511..f208387 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.cpp
@@ -50,13 +50,15 @@ const std::map<int, const char*> _TaskTypes_VALUES_TO_NAMES(::apache::thrift::TE
int _kDataStageTypeValues[] = {
DataStageType::INPUT,
- DataStageType::OUPUT
+ DataStageType::OUPUT,
+ DataStageType::ARCHIVE_OUTPUT
};
const char* _kDataStageTypeNames[] = {
"INPUT",
- "OUPUT"
+ "OUPUT",
+ "ARCHIVE_OUTPUT"
};
-const std::map<int, const char*> _DataStageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(2, _kDataStageTypeValues, _kDataStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _DataStageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(3, _kDataStageTypeValues, _kDataStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
TaskModel::~TaskModel() throw() {
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.h
index 913ada9..8f18441 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/task_model_types.h
@@ -58,7 +58,8 @@ extern const std::map<int, const char*> _TaskTypes_VALUES_TO_NAMES;
struct DataStageType {
enum type {
INPUT = 0,
- OUPUT = 1
+ OUPUT = 1,
+ ARCHIVE_OUTPUT = 2
};
};
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Task/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Task/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Task/Types.php
index fcf5994..2997652 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Task/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Task/Types.php
@@ -48,9 +48,11 @@ final class TaskTypes {
final class DataStageType {
const INPUT = 0;
const OUPUT = 1;
+ const ARCHIVE_OUTPUT = 2;
static public $__names = array(
0 => 'INPUT',
1 => 'OUPUT',
+ 2 => 'ARCHIVE_OUTPUT',
);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/task/ttypes.py
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/task/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/task/ttypes.py
index 3ad81d5..ff6d729 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/task/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/src/main/resources/lib/apache/airavata/model/task/ttypes.py
@@ -63,15 +63,18 @@ class DataStageType:
"""
INPUT = 0
OUPUT = 1
+ ARCHIVE_OUTPUT = 2
_VALUES_TO_NAMES = {
0: "INPUT",
1: "OUPUT",
+ 2: "ARCHIVE_OUTPUT",
}
_NAMES_TO_VALUES = {
"INPUT": 0,
"OUPUT": 1,
+ "ARCHIVE_OUTPUT": 2,
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/task/DataStageType.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/task/DataStageType.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/task/DataStageType.java
index f6cbe06..fa22c62 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/task/DataStageType.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/task/DataStageType.java
@@ -37,7 +37,8 @@ import org.apache.thrift.TEnum;
*/
public enum DataStageType implements org.apache.thrift.TEnum {
INPUT(0),
- OUPUT(1);
+ OUPUT(1),
+ ARCHIVE_OUTPUT(2);
private final int value;
@@ -62,6 +63,8 @@ public enum DataStageType implements org.apache.thrift.TEnum {
return INPUT;
case 1:
return OUPUT;
+ case 2:
+ return ARCHIVE_OUTPUT;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
index 502c8ce..8546e5f 100644
--- a/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
+++ b/airavata-api/airavata-model-utils/src/main/java/org/apache/airavata/model/util/ExperimentModelUtil.java
@@ -81,6 +81,7 @@ public class ExperimentModelUtil {
processModel.setExperimentId(experiment.getExperimentId());
processModel.setApplicationInterfaceId(experiment.getExecutionId());
processModel.setEnableEmailNotification(experiment.isEnableEmailNotification());
+ processModel.setArchive(experiment.isArchive());
List<String> emailAddresses = experiment.getEmailAddresses();
if (emailAddresses != null && !emailAddresses.isEmpty()){
processModel.setEmailAddresses(emailAddresses);
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
index 2880551..2b7868c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/ProcessContext.java
@@ -510,5 +510,9 @@ public class ProcessContext {
public void setRecoveryWithCancel(boolean recoveryWithCancel) {
this.recoveryWithCancel = recoveryWithCancel;
}
+
+ public boolean isArchive() {
+ return getProcessModel().isArchive();
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
index caddbb2..5e7774f 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacEngineImpl.java
@@ -36,6 +36,7 @@ import org.apache.airavata.gfac.core.monitor.JobMonitor;
import org.apache.airavata.gfac.core.task.JobSubmissionTask;
import org.apache.airavata.gfac.core.task.Task;
import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.gfac.impl.task.DataStageTask;
import org.apache.airavata.gfac.impl.task.DataStreamingTask;
import org.apache.airavata.gfac.impl.task.EnvironmentSetupTask;
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
@@ -279,20 +280,27 @@ public class GFacEngineImpl implements GFacEngine {
return;
}
DataStagingTaskModel subTaskModel = (DataStagingTaskModel) taskContext.getSubTaskModel();
- if (subTaskModel.getType() == DataStageType.INPUT) {
- status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- taskContext.setProcessInput(subTaskModel.getProcessInput());
- inputDataStaging(taskContext, processContext.isRecovery());
- } else {
- status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
- status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
- processContext.setProcessStatus(status);
- GFacUtils.saveAndPublishProcessStatus(processContext);
- taskContext.setProcessOutput(subTaskModel.getProcessOutput());
- outputDataStaging(taskContext, processContext.isRecovery());
+ DataStageType type = subTaskModel.getType();
+ switch (type) {
+ case INPUT:
+ status = new ProcessStatus(ProcessState.INPUT_DATA_STAGING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ taskContext.setProcessInput(subTaskModel.getProcessInput());
+ inputDataStaging(taskContext, processContext.isRecovery());
+ break;
+ case OUPUT:
+ status = new ProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+ status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ processContext.setProcessStatus(status);
+ GFacUtils.saveAndPublishProcessStatus(processContext);
+ taskContext.setProcessOutput(subTaskModel.getProcessOutput());
+ outputDataStaging(taskContext, processContext.isRecovery());
+ break;
+ case ARCHIVE_OUTPUT:
+ // TODO - implement output archive logic
+
}
// checkpoint
if (processContext.isInterrupted()) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
index fd6dad3..6dc4c1d 100644
--- a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/GFacWorker.java
@@ -164,7 +164,7 @@ public class GFacWorker implements Runnable {
}
private void completeProcess() throws GFacException {
- ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED);
+ ProcessStatus status = new ProcessStatus(ProcessState.COMPLETED);
status.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
processContext.setProcessStatus(status);
GFacUtils.saveAndPublishProcessStatus(processContext);
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
new file mode 100644
index 0000000..d51c389
--- /dev/null
+++ b/modules/gfac/gfac-impl/src/main/java/org/apache/airavata/gfac/impl/task/ArchiveTask.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.gfac.impl.task;
+
+import org.apache.airavata.gfac.core.context.TaskContext;
+import org.apache.airavata.gfac.core.task.Task;
+import org.apache.airavata.gfac.core.task.TaskException;
+import org.apache.airavata.model.status.TaskState;
+import org.apache.airavata.model.status.TaskStatus;
+import org.apache.airavata.model.task.TaskTypes;
+
+import java.util.Map;
+
+public class ArchiveTask implements Task {
+ @Override
+ public void init(Map<String, String> propertyMap) throws TaskException {
+
+ }
+
+ @Override
+ public TaskStatus execute(TaskContext taskContext) {
+ // implement archive logic with jscp
+ return new TaskStatus(TaskState.COMPLETED);
+ }
+
+ @Override
+ public TaskStatus recover(TaskContext taskContext) {
+ return new TaskStatus(TaskState.COMPLETED);
+ }
+
+ @Override
+ public TaskTypes getType() {
+ return TaskTypes.DATA_STAGING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index c875180..20471df 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -398,38 +398,14 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
switch (type) {
case STDOUT :
processOutput.setValue(appName + ".stdout");
- try {
- TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
- processModel.getProcessId());
- outputDataStagingTask.setTaskId(taskId);
- dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
- } catch (TException e) {
- throw new RegistryException("Error while serializing data staging sub task model", e);
- }
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
case STDERR:
processOutput.setValue(appName + ".stderr");
- try {
- TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
- processModel.getProcessId());
- outputDataStagingTask.setTaskId(taskId);
- dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
- } catch (TException e) {
- throw new RegistryException("Error while serializing data staging sub task model", e);
- }
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
case URI:
- try {
- TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
- String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
- processModel.getProcessId());
- outputDataStagingTask.setTaskId(taskId);
- dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
- } catch (TException e) {
- throw new RegistryException("Error while serializing data staging sub task model", e);
- }
+ createOutputDataSatagingTasks(processModel, gatewayId, dataStagingTaskIds, processOutput);
break;
default:
// nothing to do
@@ -440,6 +416,18 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
return dataStagingTaskIds;
}
+ private void createOutputDataSatagingTasks(ProcessModel processModel, String gatewayId, List<String> dataStagingTaskIds, OutputDataObjectType processOutput) throws RegistryException {
+ try {
+ TaskModel outputDataStagingTask = getOutputDataStagingTask(processModel, processOutput, gatewayId);
+ String taskId = (String) orchestratorContext.getRegistry().getExperimentCatalog().add(ExpCatChildDataType.TASK, outputDataStagingTask,
+ processModel.getProcessId());
+ outputDataStagingTask.setTaskId(taskId);
+ dataStagingTaskIds.add(outputDataStagingTask.getTaskId());
+ } catch (TException e) {
+ throw new RegistryException("Error while serializing data staging sub task model", e);
+ }
+ }
+
private List<String> createAndSaveSubmissionTasks(String gatewayId, JobSubmissionInterface jobSubmissionInterface, ProcessModel processModel, int wallTime)
throws TException, RegistryException, OrchestratorException {
@@ -584,4 +572,22 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
}
+ private TaskModel getArchiveOutTask(ProcessModel processModel, OutputDataObjectType processOutput, String gatewayId) throws RegistryException, TException {
+
+ // create new task model for this task
+ TaskModel taskModel = new TaskModel();
+ taskModel.setParentProcessId(processModel.getProcessId());
+ taskModel.setCreationTime(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setLastUpdateTime(taskModel.getCreationTime());
+ TaskStatus taskStatus = new TaskStatus(TaskState.CREATED);
+ taskStatus.setTimeOfStateChange(AiravataUtils.getCurrentTimestamp().getTime());
+ taskModel.setTaskStatus(taskStatus);
+ taskModel.setTaskType(TaskTypes.DATA_STAGING);
+ DataStagingTaskModel submodel = new DataStagingTaskModel();
+ submodel.setType(DataStageType.ARCHIVE_OUTPUT);
+ taskModel.setSubTaskModel(ThriftUtils.serializeThriftObject(submodel));
+ return taskModel;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/75fb3b3c/thrift-interface-descriptions/data-models/experiment-catalog-models/task_model.thrift
----------------------------------------------------------------------
diff --git a/thrift-interface-descriptions/data-models/experiment-catalog-models/task_model.thrift b/thrift-interface-descriptions/data-models/experiment-catalog-models/task_model.thrift
index 53a7fb5..faaf93b 100644
--- a/thrift-interface-descriptions/data-models/experiment-catalog-models/task_model.thrift
+++ b/thrift-interface-descriptions/data-models/experiment-catalog-models/task_model.thrift
@@ -75,7 +75,8 @@ struct TaskModel {
enum DataStageType {
INPUT,
- OUPUT
+ OUPUT,
+ ARCHIVE_OUTPUT
}
struct DataStagingTaskModel {