You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by di...@apache.org on 2020/05/28 14:26:02 UTC
[airavata] 01/01: Param sweep support with SLURM job arrays
This is an automated email from the ASF dual-hosted git repository.
dimuthuupe pushed a commit to branch param-sweep
in repository https://gitbox.apache.org/repos/asf/airavata.git
commit 66719492db1cc3300541538954f019c8e10c8084
Author: Dimuthu Wannipurage <di...@gmail.com>
AuthorDate: Thu May 28 10:25:38 2020 -0400
Param sweep support with SLURM job arrays
---
.../airavata/model/application/io/ttypes.py | 74 ++++++++
.../airavata/model/experiment/ttypes.py | 26 ++-
.../airavata/model/experiment/ExperimentModel.java | 211 ++++++++++++++++++++-
.../helix/impl/participant/GlobalParticipant.java | 2 +
.../airavata/helix/impl/task/TaskContext.java | 65 +++----
.../task/cancel/RemoteJobCancellationTask.java | 2 +-
.../airavata/helix/impl/task/env/EnvSetupTask.java | 16 +-
.../impl/task/staging/JobVerificationTask.java | 3 +-
.../task/staging/SweepingInputDataStagingTask.java | 159 ++++++++++++++++
.../staging/SweepingOutputDataStagingTask.java | 115 +++++++++++
.../task/submission/DefaultJobSubmissionTask.java | 2 +
.../impl/task/submission/JobSubmissionTask.java | 23 ++-
.../task/submission/config/GroovyMapBuilder.java | 1 +
.../impl/task/submission/config/GroovyMapData.java | 11 ++
.../impl/task/submission/config/JobFactory.java | 46 +++--
.../helix/impl/workflow/PostWorkflowManager.java | 26 ++-
.../helix/impl/workflow/PreWorkflowManager.java | 11 +-
.../server/src/main/resources/LSF_Groovy.template | 4 +-
.../server/src/main/resources/PBS_Groovy.template | 4 +-
...M_Groovy.template => SLURM_Arr_Groovy.template} | 7 +-
.../src/main/resources/SLURM_Groovy.template | 4 +-
.../server/src/main/resources/UGE_Groovy.template | 4 +-
.../src/main/resources/META-INF/persistence.xml | 1 +
.../database_scripts/expcatalog-derby.sql | 10 +
.../database_scripts/expcatalog-mysql.sql | 10 +
.../init/05-parameter-sweep-migration.sql | 11 ++
.../ide-integration/src/main/resources/logback.xml | 4 +-
.../monitor/email/parser/SLURMEmailParser.java | 4 +-
.../core/entities/expcatalog/ExperimentEntity.java | 22 +++
.../expcatalog/ExperimentOutputEntity.java | 13 ++
.../expcatalog/ExperimentOutputValueEntity.java | 83 ++++++++
.../expcatalog/ExperimentOutputValuePK.java | 78 ++++++++
.../ExperimentOutputValueRepository.java | 74 ++++++++
.../expcatalog/ExperimentRepository.java | 17 +-
.../airavata/registry/core/utils/DBConstants.java | 5 +-
.../registry/core/utils/QueryConstants.java | 3 +
.../src/main/resources/META-INF/persistence.xml | 1 +
.../src/main/resources/expcatalog-mysql.sql | 10 +
.../api/service/handler/RegistryServerHandler.java | 13 ++
.../application_io_models.thrift | 5 +
.../experiment_model.thrift | 4 +-
41 files changed, 1086 insertions(+), 98 deletions(-)
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/application/io/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/application/io/ttypes.py
index 17f1760..189cb43 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/application/io/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/application/io/ttypes.py
@@ -510,3 +510,77 @@ class OutputDataObjectType(object):
def __ne__(self, other):
return not (self == other)
+
+
+class OutputDataValueObjectType(object):
+ """
+ Attributes:
+ - name
+ - value
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'name', 'UTF8', None, ), # 1
+ (2, TType.STRING, 'value', 'UTF8', None, ), # 2
+ )
+
+ def __init__(self, name=None, value=None,):
+ self.name = name
+ self.value = value
+
+ def read(self, iprot):
+ if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
+ iprot._fast_decode(self, iprot, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.name = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.value = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot._fast_encode is not None and self.thrift_spec is not None:
+ oprot.trans.write(oprot._fast_encode(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('OutputDataValueObjectType')
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 1)
+ oprot.writeString(self.name.encode('utf-8') if sys.version_info[0] == 2 else self.name)
+ oprot.writeFieldEnd()
+ if self.value is not None:
+ oprot.writeFieldBegin('value', TType.STRING, 2)
+ oprot.writeString(self.value.encode('utf-8') if sys.version_info[0] == 2 else self.value)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.name is None:
+ raise TProtocolException(message='Required field name is unset!')
+ return
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.items()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
diff --git a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/experiment/ttypes.py b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/experiment/ttypes.py
index d5cb425..800d0ff 100644
--- a/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/experiment/ttypes.py
+++ b/airavata-api/airavata-client-sdks/airavata-python-sdk/airavata/model/experiment/ttypes.py
@@ -312,6 +312,8 @@ class ExperimentModel(object):
- errors
- processes
- workflow
+ - executionType
+ - sweepCount
"""
thrift_spec = (
@@ -336,9 +338,11 @@ class ExperimentModel(object):
(18, TType.LIST, 'errors', (TType.STRUCT, (airavata.model.commons.ttypes.ErrorModel, airavata.model.commons.ttypes.ErrorModel.thrift_spec), False), None, ), # 18
(19, TType.LIST, 'processes', (TType.STRUCT, (airavata.model.process.ttypes.ProcessModel, airavata.model.process.ttypes.ProcessModel.thrift_spec), False), None, ), # 19
(20, TType.STRUCT, 'workflow', (airavata.model.workflow.ttypes.AiravataWorkflow, airavata.model.workflow.ttypes.AiravataWorkflow.thrift_spec), None, ), # 20
+ (21, TType.STRING, 'executionType', 'UTF8', None, ), # 21
+ (22, TType.I32, 'sweepCount', None, None, ), # 22
)
- def __init__(self, experimentId=thrift_spec[1][4], projectId=None, gatewayId=None, experimentType=thrift_spec[4][4], userName=None, experimentName=None, creationTime=None, description=None, executionId=None, gatewayExecutionId=None, gatewayInstanceId=None, enableEmailNotification=None, emailAddresses=None, userConfigurationData=None, experimentInputs=None, experimentOutputs=None, experimentStatus=None, errors=None, processes=None, workflow=None,):
+ def __init__(self, experimentId=thrift_spec[1][4], projectId=None, gatewayId=None, experimentType=thrift_spec[4][4], userName=None, experimentName=None, creationTime=None, description=None, executionId=None, gatewayExecutionId=None, gatewayInstanceId=None, enableEmailNotification=None, emailAddresses=None, userConfigurationData=None, experimentInputs=None, experimentOutputs=None, experimentStatus=None, errors=None, processes=None, workflow=None, executionType=None, sweepCount=None,):
self.experimentId = experimentId
self.projectId = projectId
self.gatewayId = gatewayId
@@ -359,6 +363,8 @@ class ExperimentModel(object):
self.errors = errors
self.processes = processes
self.workflow = workflow
+ self.executionType = executionType
+ self.sweepCount = sweepCount
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -506,6 +512,16 @@ class ExperimentModel(object):
self.workflow.read(iprot)
else:
iprot.skip(ftype)
+ elif fid == 21:
+ if ftype == TType.STRING:
+ self.executionType = iprot.readString().decode('utf-8') if sys.version_info[0] == 2 else iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 22:
+ if ftype == TType.I32:
+ self.sweepCount = iprot.readI32()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -614,6 +630,14 @@ class ExperimentModel(object):
oprot.writeFieldBegin('workflow', TType.STRUCT, 20)
self.workflow.write(oprot)
oprot.writeFieldEnd()
+ if self.executionType is not None:
+ oprot.writeFieldBegin('executionType', TType.STRING, 21)
+ oprot.writeString(self.executionType.encode('utf-8') if sys.version_info[0] == 2 else self.executionType)
+ oprot.writeFieldEnd()
+ if self.sweepCount is not None:
+ oprot.writeFieldBegin('sweepCount', TType.I32, 22)
+ oprot.writeI32(self.sweepCount)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/experiment/ExperimentModel.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/experiment/ExperimentModel.java
index 6d9f40b..8f11459 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/experiment/ExperimentModel.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/experiment/ExperimentModel.java
@@ -64,6 +64,8 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
private static final org.apache.thrift.protocol.TField ERRORS_FIELD_DESC = new org.apache.thrift.protocol.TField("errors", org.apache.thrift.protocol.TType.LIST, (short)18);
private static final org.apache.thrift.protocol.TField PROCESSES_FIELD_DESC = new org.apache.thrift.protocol.TField("processes", org.apache.thrift.protocol.TType.LIST, (short)19);
private static final org.apache.thrift.protocol.TField WORKFLOW_FIELD_DESC = new org.apache.thrift.protocol.TField("workflow", org.apache.thrift.protocol.TType.STRUCT, (short)20);
+ private static final org.apache.thrift.protocol.TField EXECUTION_TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("executionType", org.apache.thrift.protocol.TType.STRING, (short)21);
+ private static final org.apache.thrift.protocol.TField SWEEP_COUNT_FIELD_DESC = new org.apache.thrift.protocol.TField("sweepCount", org.apache.thrift.protocol.TType.I32, (short)22);
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ExperimentModelStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ExperimentModelTupleSchemeFactory();
@@ -88,6 +90,8 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
private java.util.List<org.apache.airavata.model.commons.ErrorModel> errors; // optional
private java.util.List<org.apache.airavata.model.process.ProcessModel> processes; // optional
private org.apache.airavata.model.workflow.AiravataWorkflow workflow; // optional
+ private java.lang.String executionType; // optional
+ private int sweepCount; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -114,7 +118,9 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
EXPERIMENT_STATUS((short)17, "experimentStatus"),
ERRORS((short)18, "errors"),
PROCESSES((short)19, "processes"),
- WORKFLOW((short)20, "workflow");
+ WORKFLOW((short)20, "workflow"),
+ EXECUTION_TYPE((short)21, "executionType"),
+ SWEEP_COUNT((short)22, "sweepCount");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -169,6 +175,10 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
return PROCESSES;
case 20: // WORKFLOW
return WORKFLOW;
+ case 21: // EXECUTION_TYPE
+ return EXECUTION_TYPE;
+ case 22: // SWEEP_COUNT
+ return SWEEP_COUNT;
default:
return null;
}
@@ -211,8 +221,9 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
// isset id assignments
private static final int __CREATIONTIME_ISSET_ID = 0;
private static final int __ENABLEEMAILNOTIFICATION_ISSET_ID = 1;
+ private static final int __SWEEPCOUNT_ISSET_ID = 2;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.CREATION_TIME,_Fields.DESCRIPTION,_Fields.EXECUTION_ID,_Fields.GATEWAY_EXECUTION_ID,_Fields.GATEWAY_INSTANCE_ID,_Fields.ENABLE_EMAIL_NOTIFICATION,_Fields.EMAIL_ADDRESSES,_Fields.USER_CONFIGURATION_DATA,_Fields.EXPERIMENT_INPUTS,_Fields.EXPERIMENT_OUTPUTS,_Fields.EXPERIMENT_STATUS,_Fields.ERRORS,_Fields.PROCESSES,_Fields.WORKFLOW};
+ private static final _Fields optionals[] = {_Fields.CREATION_TIME,_Fields.DESCRIPTION,_Fields.EXECUTION_ID,_Fields.GATEWAY_EXECUTION_ID,_Fields.GATEWAY_INSTANCE_ID,_Fields.ENABLE_EMAIL_NOTIFICATION,_Fields.EMAIL_ADDRESSES,_Fields.USER_CONFIGURATION_DATA,_Fields.EXPERIMENT_INPUTS,_Fields.EXPERIMENT_OUTPUTS,_Fields.EXPERIMENT_STATUS,_Fields.ERRORS,_Fields.PROCESSES,_Fields.WORKFLOW,_Fields.EXECUTION_TYPE,_Fields.SWEEP_COUNT};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -262,6 +273,10 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.airavata.model.process.ProcessModel.class))));
tmpMap.put(_Fields.WORKFLOW, new org.apache.thrift.meta_data.FieldMetaData("workflow", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, org.apache.airavata.model.workflow.AiravataWorkflow.class)));
+ tmpMap.put(_Fields.EXECUTION_TYPE, new org.apache.thrift.meta_data.FieldMetaData("executionType", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.SWEEP_COUNT, new org.apache.thrift.meta_data.FieldMetaData("sweepCount", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ExperimentModel.class, metaDataMap);
}
@@ -372,6 +387,10 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
if (other.isSetWorkflow()) {
this.workflow = new org.apache.airavata.model.workflow.AiravataWorkflow(other.workflow);
}
+ if (other.isSetExecutionType()) {
+ this.executionType = other.executionType;
+ }
+ this.sweepCount = other.sweepCount;
}
public ExperimentModel deepCopy() {
@@ -404,6 +423,9 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
this.errors = null;
this.processes = null;
this.workflow = null;
+ this.executionType = null;
+ setSweepCountIsSet(false);
+ this.sweepCount = 0;
}
public java.lang.String getExperimentId() {
@@ -962,6 +984,51 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
}
}
+ public java.lang.String getExecutionType() {
+ return this.executionType;
+ }
+
+ public void setExecutionType(java.lang.String executionType) {
+ this.executionType = executionType;
+ }
+
+ public void unsetExecutionType() {
+ this.executionType = null;
+ }
+
+ /** Returns true if field executionType is set (has been assigned a value) and false otherwise */
+ public boolean isSetExecutionType() {
+ return this.executionType != null;
+ }
+
+ public void setExecutionTypeIsSet(boolean value) {
+ if (!value) {
+ this.executionType = null;
+ }
+ }
+
+ public int getSweepCount() {
+ return this.sweepCount;
+ }
+
+ public void setSweepCount(int sweepCount) {
+ this.sweepCount = sweepCount;
+ setSweepCountIsSet(true);
+ }
+
+ public void unsetSweepCount() {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __SWEEPCOUNT_ISSET_ID);
+ }
+
+ /** Returns true if field sweepCount is set (has been assigned a value) and false otherwise */
+ public boolean isSetSweepCount() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __SWEEPCOUNT_ISSET_ID);
+ }
+
+ public void setSweepCountIsSet(boolean value) {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __SWEEPCOUNT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, java.lang.Object value) {
switch (field) {
case EXPERIMENT_ID:
@@ -1124,6 +1191,22 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
}
break;
+ case EXECUTION_TYPE:
+ if (value == null) {
+ unsetExecutionType();
+ } else {
+ setExecutionType((java.lang.String)value);
+ }
+ break;
+
+ case SWEEP_COUNT:
+ if (value == null) {
+ unsetSweepCount();
+ } else {
+ setSweepCount((java.lang.Integer)value);
+ }
+ break;
+
}
}
@@ -1189,6 +1272,12 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
case WORKFLOW:
return getWorkflow();
+ case EXECUTION_TYPE:
+ return getExecutionType();
+
+ case SWEEP_COUNT:
+ return getSweepCount();
+
}
throw new java.lang.IllegalStateException();
}
@@ -1240,6 +1329,10 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
return isSetProcesses();
case WORKFLOW:
return isSetWorkflow();
+ case EXECUTION_TYPE:
+ return isSetExecutionType();
+ case SWEEP_COUNT:
+ return isSetSweepCount();
}
throw new java.lang.IllegalStateException();
}
@@ -1439,6 +1532,24 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
return false;
}
+ boolean this_present_executionType = true && this.isSetExecutionType();
+ boolean that_present_executionType = true && that.isSetExecutionType();
+ if (this_present_executionType || that_present_executionType) {
+ if (!(this_present_executionType && that_present_executionType))
+ return false;
+ if (!this.executionType.equals(that.executionType))
+ return false;
+ }
+
+ boolean this_present_sweepCount = true && this.isSetSweepCount();
+ boolean that_present_sweepCount = true && that.isSetSweepCount();
+ if (this_present_sweepCount || that_present_sweepCount) {
+ if (!(this_present_sweepCount && that_present_sweepCount))
+ return false;
+ if (this.sweepCount != that.sweepCount)
+ return false;
+ }
+
return true;
}
@@ -1526,6 +1637,14 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
if (isSetWorkflow())
hashCode = hashCode * 8191 + workflow.hashCode();
+ hashCode = hashCode * 8191 + ((isSetExecutionType()) ? 131071 : 524287);
+ if (isSetExecutionType())
+ hashCode = hashCode * 8191 + executionType.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetSweepCount()) ? 131071 : 524287);
+ if (isSetSweepCount())
+ hashCode = hashCode * 8191 + sweepCount;
+
return hashCode;
}
@@ -1737,6 +1856,26 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.valueOf(isSetExecutionType()).compareTo(other.isSetExecutionType());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExecutionType()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executionType, other.executionType);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetSweepCount()).compareTo(other.isSetSweepCount());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSweepCount()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.sweepCount, other.sweepCount);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1936,6 +2075,22 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
}
first = false;
}
+ if (isSetExecutionType()) {
+ if (!first) sb.append(", ");
+ sb.append("executionType:");
+ if (this.executionType == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.executionType);
+ }
+ first = false;
+ }
+ if (isSetSweepCount()) {
+ if (!first) sb.append(", ");
+ sb.append("sweepCount:");
+ sb.append(this.sweepCount);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -2238,6 +2393,22 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 21: // EXECUTION_TYPE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.executionType = iprot.readString();
+ struct.setExecutionTypeIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 22: // SWEEP_COUNT
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.sweepCount = iprot.readI32();
+ struct.setSweepCountIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -2417,6 +2588,18 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
oprot.writeFieldEnd();
}
}
+ if (struct.executionType != null) {
+ if (struct.isSetExecutionType()) {
+ oprot.writeFieldBegin(EXECUTION_TYPE_FIELD_DESC);
+ oprot.writeString(struct.executionType);
+ oprot.writeFieldEnd();
+ }
+ }
+ if (struct.isSetSweepCount()) {
+ oprot.writeFieldBegin(SWEEP_COUNT_FIELD_DESC);
+ oprot.writeI32(struct.sweepCount);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -2483,7 +2666,13 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
if (struct.isSetWorkflow()) {
optionals.set(13);
}
- oprot.writeBitSet(optionals, 14);
+ if (struct.isSetExecutionType()) {
+ optionals.set(14);
+ }
+ if (struct.isSetSweepCount()) {
+ optionals.set(15);
+ }
+ oprot.writeBitSet(optionals, 16);
if (struct.isSetCreationTime()) {
oprot.writeI64(struct.creationTime);
}
@@ -2562,6 +2751,12 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
if (struct.isSetWorkflow()) {
struct.workflow.write(oprot);
}
+ if (struct.isSetExecutionType()) {
+ oprot.writeString(struct.executionType);
+ }
+ if (struct.isSetSweepCount()) {
+ oprot.writeI32(struct.sweepCount);
+ }
}
@Override
@@ -2579,7 +2774,7 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
struct.setUserNameIsSet(true);
struct.experimentName = iprot.readString();
struct.setExperimentNameIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(14);
+ java.util.BitSet incoming = iprot.readBitSet(16);
if (incoming.get(0)) {
struct.creationTime = iprot.readI64();
struct.setCreationTimeIsSet(true);
@@ -2697,6 +2892,14 @@ public class ExperimentModel implements org.apache.thrift.TBase<ExperimentModel,
struct.workflow.read(iprot);
struct.setWorkflowIsSet(true);
}
+ if (incoming.get(14)) {
+ struct.executionType = iprot.readString();
+ struct.setExecutionTypeIsSet(true);
+ }
+ if (incoming.get(15)) {
+ struct.sweepCount = iprot.readI32();
+ struct.setSweepCountIsSet(true);
+ }
}
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
index ce1372a..349617e 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/participant/GlobalParticipant.java
@@ -35,7 +35,9 @@ public class GlobalParticipant extends HelixParticipant<AbstractTask> {
public final static String[] TASK_CLASS_NAMES = {
"org.apache.airavata.helix.impl.task.env.EnvSetupTask",
"org.apache.airavata.helix.impl.task.staging.InputDataStagingTask",
+ "org.apache.airavata.helix.impl.task.staging.SweepingInputDataStagingTask",
"org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask",
+ "org.apache.airavata.helix.impl.task.staging.SweepingOutputDataStagingTask",
"org.apache.airavata.helix.impl.task.staging.JobVerificationTask",
"org.apache.airavata.helix.impl.task.completing.CompletingTask",
"org.apache.airavata.helix.impl.task.submission.ForkJobSubmissionTask",
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
index 7e32c00..365779f 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/TaskContext.java
@@ -44,6 +44,7 @@ import org.apache.airavata.model.application.io.DataType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.data.movement.DataMovementInterface;
import org.apache.airavata.model.data.movement.DataMovementProtocol;
+import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.process.ProcessModel;
import org.apache.airavata.model.scheduling.ComputationalResourceSchedulingModel;
@@ -91,6 +92,7 @@ public class TaskContext {
private String taskId;
private ProcessModel processModel;
+ private ExperimentModel experimentModel;
private JobModel jobModel;
private Object subTaskModel = null;
@@ -98,8 +100,6 @@ public class TaskContext {
private String scratchLocation;
private String inputDir;
private String outputDir;
- private String stdoutLocation;
- private String stderrLocation;
private GatewayResourceProfile gatewayResourceProfile;
private UserResourceProfile userResourceProfile;
@@ -174,6 +174,14 @@ public class TaskContext {
this.processModel = processModel;
}
+ public ExperimentModel getExperimentModel() {
+ return experimentModel;
+ }
+
+ public void setExperimentModel(ExperimentModel experimentModel) {
+ this.experimentModel = experimentModel;
+ }
+
public String getWorkingDir() {
if (workingDir == null) {
if (processModel.getProcessResourceSchedule().getStaticWorkingDir() != null){
@@ -292,19 +300,11 @@ public class TaskContext {
}
public String getStdoutLocation() {
- return stdoutLocation;
- }
-
- public void setStdoutLocation(String stdoutLocation) {
- this.stdoutLocation = stdoutLocation;
+ return getApplicationInterfaceDescription().getApplicationName() + ".stdout";
}
public String getStderrLocation() {
- return stderrLocation;
- }
-
- public void setStderrLocation(String stderrLocation) {
- this.stderrLocation = stderrLocation;
+ return getApplicationInterfaceDescription().getApplicationName() + ".stderr";
}
public void setOutputDir(String outputDir) {
@@ -746,6 +746,14 @@ public class TaskContext {
return subTaskModel;
}
+ public String getExecutionType() {
+ return this.experimentModel.getExecutionType();
+ }
+
+ public int getSweepCount() {
+ return this.experimentModel.getSweepCount();
+ }
+
public static class TaskContextBuilder {
private final String processId;
private final String gatewayId;
@@ -788,9 +796,12 @@ public class TaskContext {
throwError("Invalid Registry Client");
}
+ ExperimentModel experimentModel = registryClient.getExperiment(processModel.getExperimentId());
+
TaskContext ctx = new TaskContext(processId, gatewayId, taskId);
ctx.setRegistryClient(registryClient);
ctx.setProcessModel(processModel);
+ ctx.setExperimentModel(experimentModel);
ctx.setProfileClient(profileClient);
ctx.setGroupComputeResourcePreference(registryClient.getGroupComputeResourcePreference(processModel.getComputeResourceId(),
@@ -841,35 +852,9 @@ public class TaskContext {
processModel.getComputeResourceId()));
}
- List<OutputDataObjectType> applicationOutputs = ctx.getApplicationInterfaceDescription().getApplicationOutputs();
- if (applicationOutputs != null && !applicationOutputs.isEmpty()) {
- for (OutputDataObjectType outputDataObjectType : applicationOutputs) {
- if (outputDataObjectType.getType().equals(DataType.STDOUT)) {
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
- String stdOut = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
- + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stdout";
- outputDataObjectType.setValue(stdOut);
- ctx.setStdoutLocation(stdOut);
- } else {
- ctx.setStdoutLocation(outputDataObjectType.getValue());
- }
- }
- if (outputDataObjectType.getType().equals(DataType.STDERR)) {
- if (outputDataObjectType.getValue() == null || outputDataObjectType.getValue().equals("")) {
- String stderrLocation = (ctx.getWorkingDir().endsWith(File.separator) ? ctx.getWorkingDir() : ctx.getWorkingDir() + File.separator)
- + ctx.getApplicationInterfaceDescription().getApplicationName() + ".stderr";
- outputDataObjectType.setValue(stderrLocation);
- ctx.setStderrLocation(stderrLocation);
- } else {
- ctx.setStderrLocation(outputDataObjectType.getValue());
- }
- }
- }
- }
-
// TODO move this to some where else as this is not the correct place to do so
- registryClient.updateProcess(processModel, processId);
- processModel.setProcessOutputs(applicationOutputs);
+ //registryClient.updateProcess(processModel, processId);
+ //processModel.setProcessOutputs(applicationOutputs);
return ctx;
}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
index c824bb0..488499b 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/cancel/RemoteJobCancellationTask.java
@@ -54,7 +54,7 @@ public class RemoteJobCancellationTask extends AiravataTask {
JobFactory.getResourceJobManager(
getRegistryServiceClient(),
getTaskContext().getJobSubmissionProtocol(),
- getTaskContext().getPreferredJobSubmissionInterface()));
+ getTaskContext().getPreferredJobSubmissionInterface()), "param_sweep".equals(getTaskContext().getExecutionType()));
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
index d6b00c5..c890781 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/env/EnvSetupTask.java
@@ -29,6 +29,8 @@ import org.apache.helix.task.TaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.file.Paths;
+
@TaskDef(name = "Environment Setup Task")
public class EnvSetupTask extends AiravataTask {
@@ -48,7 +50,19 @@ public class EnvSetupTask extends AiravataTask {
logger.info("Creating directory " + getTaskContext().getWorkingDir() + " on compute resource " +
getTaskContext().getComputeResourceId() + " by user " + getTaskContext().getComputeResourceLoginUserName()
+ " using token " + getTaskContext().getComputeResourceCredentialToken());
- adaptor.createDirectory(getTaskContext().getWorkingDir(), true);
+
+ if ("one_pass".equals(taskContext.getExecutionType())) {
+ adaptor.createDirectory(getTaskContext().getWorkingDir(), true);
+ }
+
+ if ("param_sweep".equals(taskContext.getExecutionType())) {
+ for (int i = 0; i < taskContext.getSweepCount(); i++) {
+ String sweepDir = Paths.get(getTaskContext().getWorkingDir(), i + "").toString();
+ logger.info("Creating sweep directory {}", sweepDir);
+ adaptor.createDirectory(sweepDir, true);
+ }
+ }
+
return onSuccess("Envi setup task successfully completed " + getTaskId());
} catch (Exception e) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
index 249e550..54392ef 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/JobVerificationTask.java
@@ -43,7 +43,8 @@ public class JobVerificationTask extends AiravataTask {
JobFactory.getResourceJobManager(
getRegistryServiceClient(),
getTaskContext().getJobSubmissionProtocol(),
- getTaskContext().getPreferredJobSubmissionInterface()));
+ getTaskContext().getPreferredJobSubmissionInterface()),
+ "param_sweep".equals(getTaskContext().getExecutionType()));
AgentAdaptor adaptor = taskHelper.getAdaptorSupport().fetchAdaptor(
getTaskContext().getGatewayId(),
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingInputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingInputDataStagingTask.java
new file mode 100644
index 0000000..1a3fd97
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingInputDataStagingTask.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ package org.apache.airavata.helix.impl.task.staging;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.TaskOnFailException;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.InputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.UUID;
+
+@TaskDef(name = "Sweeping Input Data Staging Task")
+public class SweepingInputDataStagingTask extends DataStagingTask {
+ private final static Logger logger = LoggerFactory.getLogger(SweepingInputDataStagingTask.class);
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+ try {
+ saveAndPublishProcessStatus(ProcessState.INPUT_DATA_STAGING);
+
+ // Get and validate data staging task model
+ DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
+
+ // Fetch and validate input data type from data staging task model
+ InputDataObjectType processInput = dataStagingTaskModel.getProcessInput();
+ if (processInput != null && processInput.getValue() == null) {
+ String message = "expId: " + getExperimentId() + ", processId: " + getProcessId() + ", taskId: " + getTaskId() +
+ ":- Couldn't stage file " + processInput.getName() + " , file name shouldn't be null. ";
+ logger.error(message);
+ if (processInput.isIsRequired()) {
+ message += "File name is null, but this input's isRequired bit is not set";
+ } else {
+ message += "File name is null";
+ }
+ logger.error(message);
+ throw new TaskOnFailException(message, true, null);
+ }
+
+ try {
+
+ String[] sourceUrls;
+
+ if (dataStagingTaskModel.getProcessInput().getType() == DataType.URI_COLLECTION) {
+ logger.info("Found a URI collection so splitting by comma for path " + dataStagingTaskModel.getSource());
+ sourceUrls = dataStagingTaskModel.getSource().split(",");
+ } else {
+ sourceUrls = new String[]{dataStagingTaskModel.getSource()};
+ }
+
+ // Fetch and validate storage adaptor
+ StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+ // Fetch and validate compute resource adaptor
+ AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+ String workingDir = taskContext.getWorkingDir();
+ for (String url : sourceUrls) {
+ String sourcePath = new URI(url).getPath();
+ String sourceFileName = new File(sourcePath).getName();
+
+ // TODO: Put a flag in input model to detect sweeping zip files instead of looking at the extensions
+ /*
+ * Zip file should contain sub directories which are named in sequential order upto sweep count - 1
+ * Example: data.zip
+ * Unzipped Content: data/
+ * /0/input.txt
+ * /1/input.txt
+ * .
+ * .
+ * /<sweepCount -1>/input.txt
+ */
+
+ if (sourceFileName.endsWith(".zip")) {
+ String tempZipDir = Paths.get(workingDir, UUID.randomUUID().toString()).toString();
+ logger.info("Copying sweep input zip {} to temp directory {}", sourceFileName, tempZipDir);
+ adaptor.createDirectory(tempZipDir, true);
+
+ transferFileToComputeResource(sourcePath, Paths.get(tempZipDir, sourceFileName).toString(), adaptor, storageResourceAdaptor);
+ logger.info("Unzipping sweep input zip {} inside temp directory {}", sourceFileName, tempZipDir);
+ adaptor.executeCommand("unzip " + sourceFileName, tempZipDir);
+ String tempDataPath = Paths.get(tempZipDir, sourceFileName.substring(0, sourceFileName.length() - ".zip".length())).toString();
+
+ for (int i = 0; i < taskContext.getSweepCount(); i++) {
+ String sweepSourceDir = Paths.get(tempDataPath, i +"").toString();
+ List<String> sweepFiles = adaptor.listDirectory(sweepSourceDir);
+ for (String sweepFile: sweepFiles) {
+ String localSourceFile = Paths.get(sweepSourceDir, sweepFile).toString();
+
+ String overrideFileName = dataStagingTaskModel.getProcessInput().getOverrideFilename();
+ String destFileName = (overrideFileName != null && !"".equals(overrideFileName)) ? overrideFileName : sweepFile;
+ String destPath = Paths.get(workingDir, i + "", destFileName).toString();
+
+ logger.info("Transferring zipped sweeping input file {} to destination path {} locally", localSourceFile, destPath);
+ adaptor.executeCommand("cp " + localSourceFile + " " + destPath, sweepSourceDir);
+ }
+ }
+
+ } else {
+ // TODO: Optimize here to copy locally
+ for (int i = 0; i < taskContext.getSweepCount(); i++) {
+ String overrideFileName = dataStagingTaskModel.getProcessInput().getOverrideFilename();
+ String destFileName = (overrideFileName != null && !"".equals(overrideFileName)) ? overrideFileName : sourceFileName;
+ String destPath = Paths.get(workingDir, i + "", destFileName).toString();
+ logger.info("Transferring sweeping input file {} to destination path {}", sourcePath, destPath);
+ transferFileToComputeResource(sourcePath, destPath, adaptor, storageResourceAdaptor);
+ }
+ }
+ }
+
+ } catch (URISyntaxException e) {
+ throw new TaskOnFailException("Failed to obtain source URI for input data staging task " + getTaskId(), true, e);
+ }
+
+ return onSuccess("Input data staging task " + getTaskId() + " successfully completed");
+
+
+ } catch (Exception e) {
+ logger.error("Unknown error while executing sweeping input data staging task " + getTaskId(), e);
+ return onFail("Unknown error while executing sweeping input data staging task " + getTaskId(), false, e);
+ }
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+
+
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingOutputDataStagingTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingOutputDataStagingTask.java
new file mode 100644
index 0000000..4f2b000
--- /dev/null
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/staging/SweepingOutputDataStagingTask.java
@@ -0,0 +1,115 @@
+package org.apache.airavata.helix.impl.task.staging;
+
+import org.apache.airavata.agents.api.AgentAdaptor;
+import org.apache.airavata.agents.api.AgentException;
+import org.apache.airavata.agents.api.StorageResourceAdaptor;
+import org.apache.airavata.helix.impl.task.TaskContext;
+import org.apache.airavata.helix.impl.task.TaskOnFailException;
+import org.apache.airavata.helix.task.api.TaskHelper;
+import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.helix.task.api.annotation.TaskParam;
+import org.apache.airavata.model.appcatalog.storageresource.StorageResourceDescription;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.status.ProcessState;
+import org.apache.airavata.model.task.DataStagingTaskModel;
+import org.apache.helix.task.TaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Paths;
+
+@TaskDef(name = "Sweeping Output Data Staging Task")
+public class SweepingOutputDataStagingTask extends DataStagingTask {
+
+ private final static Logger logger = LoggerFactory.getLogger(SweepingOutputDataStagingTask.class);
+
+ @TaskParam(name = "Job Index")
+ private int jobIndex;
+
+ @Override
+ public TaskResult onRun(TaskHelper taskHelper, TaskContext taskContext) {
+ logger.info("Starting sweeping output data staging task " + getTaskId() + " in experiment " + getExperimentId());
+ saveAndPublishProcessStatus(ProcessState.OUTPUT_DATA_STAGING);
+
+ try {
+ // Get and validate data staging task model
+ DataStagingTaskModel dataStagingTaskModel = getDataStagingTaskModel();
+
+ // Fetch and validate input data type from data staging task model
+ OutputDataObjectType processOutput = dataStagingTaskModel.getProcessOutput();
+ if (processOutput != null && processOutput.getValue() == null) {
+ String message = "expId: " + getExperimentId() + ", processId: " + getProcessId() + ", taskId: " + getTaskId() +
+ ":- Couldn't stage file " + processOutput.getName() + " , file name shouldn't be null. ";
+ logger.error(message);
+ if (processOutput.isIsRequired()) {
+ message += "File name is null, but this output's isRequired bit is not set";
+ } else {
+ message += "File name is null";
+ }
+ throw new TaskOnFailException(message, true, null);
+ }
+
+ // Fetch and validate storage resource
+ StorageResourceDescription storageResource = getStorageResource();
+
+ // Fetch and validate storage adaptor
+ StorageResourceAdaptor storageResourceAdaptor = getStorageAdaptor(taskHelper.getAdaptorSupport());
+
+ // Fetch and validate compute resource adaptor
+ AgentAdaptor adaptor = getComputeResourceAdaptor(taskHelper.getAdaptorSupport());
+
+ //List<String> productUris = new ArrayList<>();
+
+ String sourcePath = Paths.get(taskContext.getWorkingDir(), jobIndex + "", dataStagingTaskModel.getProcessOutput().getValue()).toString();
+ String fileName = new File(dataStagingTaskModel.getProcessOutput().getValue()).getName();
+ String destPath = Paths.get(getProcessModel().getExperimentDataDir(), jobIndex + "_" + fileName).toString();
+
+ boolean transferred = transferFileToStorage(sourcePath, destPath, fileName, adaptor, storageResourceAdaptor);
+ URI destinationURI = new URI("file", getTaskContext().getStorageResourceLoginUserName(),
+ storageResource.getHostName(), 22, destPath, null, null);
+ if (transferred) {
+ saveExperimentOutput(processOutput.getName(), destinationURI.toString());
+ } else {
+ logger.warn("File " + sourcePath + " did not transfer");
+ }
+
+
+ /*if (processOutput.getType() == DataType.STDERR || processOutput.getType() == DataType.STDOUT) {
+ saveExperimentOutput(processOutput.getName(), productUris.get(0));
+ } else {
+ saveExperimentOutputCollection(processOutput.getName(), productUris);
+ }*/
+ //saveExperimentOutputCollection(processOutput.getName(), productUris);
+
+ return onSuccess("Sweeping output data staging task " + getTaskId() + " successfully completed");
+
+ } catch (TaskOnFailException e) {
+ if (e.getError() != null) {
+ logger.error(e.getReason(), e.getError());
+ } else {
+ logger.error(e.getReason());
+ }
+ return onFail(e.getReason(), e.isCritical(), e.getError());
+
+ } catch (Exception e) {
+ logger.error("Unknown error while executing output data staging task " + getTaskId(), e);
+ return onFail("Unknown error while executing output data staging task " + getTaskId(), false, e);
+ }
+ }
+
+ @Override
+ public void onCancel(TaskContext taskContext) {
+
+ }
+
+ public int getJobIndex() {
+ return jobIndex;
+ }
+
+ public void setJobIndex(int jobIndex) {
+ this.jobIndex = jobIndex;
+ }
+}
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
index f1f877f..5bb0c3a 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/DefaultJobSubmissionTask.java
@@ -28,6 +28,8 @@ import org.apache.airavata.helix.impl.task.submission.config.GroovyMapBuilder;
import org.apache.airavata.helix.impl.task.submission.config.GroovyMapData;
import org.apache.airavata.helix.task.api.TaskHelper;
import org.apache.airavata.helix.task.api.annotation.TaskDef;
+import org.apache.airavata.model.application.io.DataType;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.job.JobModel;
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
index 93df582..c94810b 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/JobSubmissionTask.java
@@ -56,8 +56,12 @@ public abstract class JobSubmissionTask extends AiravataTask {
@SuppressWarnings("WeakerAccess")
protected JobSubmissionOutput submitBatchJob(AgentAdaptor agentAdaptor, GroovyMapData groovyMapData, String workingDirectory) throws Exception {
- JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
- getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(
+ JobFactory.getResourceJobManager(
+ getRegistryServiceClient(),
+ getTaskContext().getJobSubmissionProtocol(),
+ getTaskContext().getPreferredJobSubmissionInterface()),
+ "param_sweep".equals(getTaskContext().getExecutionType()));
addMonitoringCommands(groovyMapData);
@@ -155,8 +159,12 @@ public abstract class JobSubmissionTask extends AiravataTask {
@SuppressWarnings("WeakerAccess")
public boolean cancelJob(AgentAdaptor agentAdaptor, String jobId) throws Exception {
- JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(JobFactory.getResourceJobManager(
- getRegistryServiceClient(), getTaskContext().getJobSubmissionProtocol(), getTaskContext().getPreferredJobSubmissionInterface()));
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(
+ JobFactory.getResourceJobManager(
+ getRegistryServiceClient(),
+ getTaskContext().getJobSubmissionProtocol(),
+ getTaskContext().getPreferredJobSubmissionInterface()), "param_sweep".equals(getTaskContext().getExecutionType()));
+
CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getCancelCommand(jobId).getRawCommand(), null);
return commandOutput.getExitCode() == 0;
}
@@ -171,7 +179,9 @@ public abstract class JobSubmissionTask extends AiravataTask {
throw new Exception("Resource job manager can not be null for protocol " + getTaskContext().getJobSubmissionProtocol() + " and job id " + jobId);
}
- JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(resourceJobManager);
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(
+ resourceJobManager,
+ "param_sweep".equals(getTaskContext().getExecutionType()));
CommandOutput commandOutput = agentAdaptor.executeCommand(jobManagerConfiguration.getMonitorCommand(jobId).getRawCommand(), null);
@@ -190,7 +200,8 @@ public abstract class JobSubmissionTask extends AiravataTask {
+ " and job name " + jobName + " and user " + userName);
}
- JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(resourceJobManager);
+ JobManagerConfiguration jobManagerConfiguration = JobFactory.getJobManagerConfiguration(
+ resourceJobManager, "param_sweep".equals(getTaskContext().getExecutionType()));
RawCommandInfo jobIdMonitorCommand = jobManagerConfiguration.getJobIdMonitorCommand(jobName, userName);
CommandOutput commandOutput = agentAdaptor.executeCommand(jobIdMonitorCommand.getRawCommand(), null);
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
index e76c6db..9cc1e1d 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapBuilder.java
@@ -79,6 +79,7 @@ public class GroovyMapBuilder {
mapData.setWorkingDirectory(taskContext.getWorkingDir());
mapData.setTaskId(taskContext.getTaskId());
mapData.setExperimentDataDir(taskContext.getProcessModel().getExperimentDataDir());
+ mapData.setSweepCount(taskContext.getSweepCount());
//List<String> emails = taskContext.getUserProfile().getEmails();
//if (emails != null && emails.size() > 0) {
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapData.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapData.java
index 663687d..2f94c34 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapData.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/GroovyMapData.java
@@ -141,6 +141,9 @@ public class GroovyMapData {
@ScriptTag(name = "experimentDataDir")
private String experimentDataDir;
+ @ScriptTag(name = "sweepCount")
+ private int sweepCount;
+
public Map<String, Object> getMap() {
Map<String, Object> map = new HashMap<>();
@@ -473,6 +476,14 @@ public class GroovyMapData {
this.experimentDataDir = experimentDataDir;
}
+ public int getSweepCount() {
+ return sweepCount;
+ }
+
+ public void setSweepCount(int sweepCount) {
+ this.sweepCount = sweepCount;
+ }
+
public Map toImmutableMap() {
Map<String, Object> dataMap = new HashMap<>();
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
index e9099e4..3ec3662 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/task/submission/config/JobFactory.java
@@ -31,22 +31,32 @@ public class JobFactory {
private final static Logger logger = LoggerFactory.getLogger(JobFactory.class);
- public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType) {
- switch (resourceJobManagerType) {
- case FORK:
- return "FORK_Groovy.template";
- case PBS:
- return "PBS_Groovy.template";
- case SLURM:
- return "SLURM_Groovy.template";
- case UGE:
- return "UGE_Groovy.template";
- case LSF:
- return "LSF_Groovy.template";
- case CLOUD:
- return "CLOUD_Groovy.template";
- default:
- return null;
+ public static String getTemplateFileName(ResourceJobManagerType resourceJobManagerType, boolean isSweepType) {
+
+ if (isSweepType) {
+ switch (resourceJobManagerType) {
+ case SLURM:
+ return "SLURM_Arr_Groovy.template";
+ default:
+ return null;
+ }
+ } else {
+ switch (resourceJobManagerType) {
+ case FORK:
+ return "FORK_Groovy.template";
+ case PBS:
+ return "PBS_Groovy.template";
+ case SLURM:
+ return "SLURM_Groovy.template";
+ case UGE:
+ return "UGE_Groovy.template";
+ case LSF:
+ return "LSF_Groovy.template";
+ case CLOUD:
+ return "CLOUD_Groovy.template";
+ default:
+ return null;
+ }
}
}
@@ -97,12 +107,12 @@ public class JobFactory {
}
}
- public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager) throws Exception {
+ public static JobManagerConfiguration getJobManagerConfiguration(ResourceJobManager resourceJobManager, boolean isSweepType) throws Exception {
if(resourceJobManager == null) {
throw new Exception("Resource job manager can not be null");
}
- String templateFileName = getTemplateFileName(resourceJobManager.getResourceJobManagerType());
+ String templateFileName = getTemplateFileName(resourceJobManager.getResourceJobManagerType(), isSweepType);
switch (resourceJobManager.getResourceJobManagerType()) {
case PBS:
return new PBSJobConfiguration(templateFileName, ".pbs", resourceJobManager.getJobManagerBinPath(),
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
index f3b9dea..93b7371 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PostWorkflowManager.java
@@ -27,9 +27,7 @@ import org.apache.airavata.helix.core.OutPort;
import org.apache.airavata.helix.impl.task.*;
import org.apache.airavata.helix.impl.task.completing.CompletingTask;
import org.apache.airavata.helix.impl.task.parsing.ParsingTriggeringTask;
-import org.apache.airavata.helix.impl.task.staging.ArchiveTask;
-import org.apache.airavata.helix.impl.task.staging.JobVerificationTask;
-import org.apache.airavata.helix.impl.task.staging.OutputDataStagingTask;
+import org.apache.airavata.helix.impl.task.staging.*;
import org.apache.airavata.model.job.JobModel;
import org.apache.airavata.model.status.ProcessState;
import org.apache.airavata.model.status.ProcessStatus;
@@ -172,7 +170,13 @@ public class PostWorkflowManager extends WorkflowManager {
logger.info("Job " + jobStatusResult.getJobId() + " was completed");
- executePostWorkflow(processId, gateway, false);
+ if (experimentModel.getSweepCount() == 0) {
+ executePostWorkflow(processId, gateway, false, 0);
+ } else {
+ for (int i = 0; i < experimentModel.getSweepCount(); i ++) {
+ executePostWorkflow(processId, gateway, false, i);
+ }
+ }
} else if (jobStatusResult.getState() == JobState.CANCELED) {
logger.info("Job " + jobStatusResult.getJobId() + " was externally cancelled but process is not marked as cancelled yet");
@@ -196,7 +200,7 @@ public class PostWorkflowManager extends WorkflowManager {
}
}
- private void executePostWorkflow(String processId, String gateway, boolean forceRun) throws Exception {
+ private void executePostWorkflow(String processId, String gateway, boolean forceRun, int jobIndex) throws Exception {
RegistryService.Client registryClient = getRegistryClientPool().getResource();
@@ -245,8 +249,16 @@ public class PostWorkflowManager extends WorkflowManager {
assert subTaskModel != null;
switch (subTaskModel.getType()) {
case OUPUT:
- airavataTask = new OutputDataStagingTask();
- airavataTask.setForceRunTask(true);
+ if ("one_pass".equals(experimentModel.getExecutionType())) {
+ airavataTask = new OutputDataStagingTask();
+ airavataTask.setForceRunTask(true);
+ }
+ if ("param_sweep".equals(experimentModel.getExecutionType())) {
+ SweepingOutputDataStagingTask sweepTask = new SweepingOutputDataStagingTask();
+ sweepTask.setJobIndex(jobIndex);
+ sweepTask.setForceRunTask(true);
+ airavataTask = sweepTask;
+ }
break;
case ARCHIVE_OUTPUT:
airavataTask = new ArchiveTask();
diff --git a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
index 9423854..6cdb0d0 100644
--- a/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
+++ b/modules/airavata-helix/helix-spectator/src/main/java/org/apache/airavata/helix/impl/workflow/PreWorkflowManager.java
@@ -31,6 +31,7 @@ import org.apache.airavata.helix.impl.task.cancel.RemoteJobCancellationTask;
import org.apache.airavata.helix.impl.task.cancel.WorkflowCancellationTask;
import org.apache.airavata.helix.impl.task.env.EnvSetupTask;
import org.apache.airavata.helix.impl.task.staging.InputDataStagingTask;
+import org.apache.airavata.helix.impl.task.staging.SweepingInputDataStagingTask;
import org.apache.airavata.helix.impl.task.submission.DefaultJobSubmissionTask;
import org.apache.airavata.messaging.core.*;
import org.apache.airavata.model.experiment.ExperimentModel;
@@ -116,8 +117,14 @@ public class PreWorkflowManager extends WorkflowManager {
jobSubmissionFound = true;
} else if (taskModel.getTaskType() == TaskTypes.DATA_STAGING) {
if (!jobSubmissionFound) {
- airavataTask = new InputDataStagingTask();
- airavataTask.setForceRunTask(true);
+ if ("one_pass".equals(experimentModel.getExecutionType())) {
+ airavataTask = new InputDataStagingTask();
+ airavataTask.setForceRunTask(true);
+ } else if ("param_sweep".equals(experimentModel.getExecutionType())) {
+ airavataTask = new SweepingInputDataStagingTask();
+ airavataTask.setForceRunTask(true);
+ }
+
}
}
diff --git a/modules/configuration/server/src/main/resources/LSF_Groovy.template b/modules/configuration/server/src/main/resources/LSF_Groovy.template
index f72e313..a8f54d5 100644
--- a/modules/configuration/server/src/main/resources/LSF_Groovy.template
+++ b/modules/configuration/server/src/main/resources/LSF_Groovy.template
@@ -11,8 +11,8 @@
out.print '#BSUB -N\n'
if (accountString != null && accountString != "") out.print '#BSUB -P ' + accountString + '\n'
if (maxWallTime != null && maxWallTime != "") out.print '#BSUB -W ' + maxWallTime + '\n'
- if (standardOutFile != null && standardOutFile != "") out.print '#BSUB -o ' + standardOutFile + '\n'
- if (standardErrorFile != null && standardErrorFile != "") out.print '#BSUB -e ' + standardErrorFile + '\n'
+ if (standardOutFile != null && standardOutFile != "") out.print '#BSUB -o ' + workingDirectory + '/' + standardOutFile + '\n'
+ if (standardErrorFile != null && standardErrorFile != "") out.print '#BSUB -e ' + workingDirectory + '/' + standardErrorFile + '\n'
if (chassisName != null && chassisName != "") out.print '#BSUB -m c' + chassisName + '\n'
if (usedMem != null && usedMem != "") out.print '#BSUB -R rusage[mem=' + usedMem + ']\n'
%>
diff --git a/modules/configuration/server/src/main/resources/PBS_Groovy.template b/modules/configuration/server/src/main/resources/PBS_Groovy.template
index 8e3f277..d086235 100644
--- a/modules/configuration/server/src/main/resources/PBS_Groovy.template
+++ b/modules/configuration/server/src/main/resources/PBS_Groovy.template
@@ -9,8 +9,8 @@
if (maxWallTime != null && maxWallTime != "") out.print '#PBS -l walltime=' + maxWallTime + '\n'
if (jobSubmitterCommand != null && jobSubmitterCommand != "" && jobSubmitterCommand == "ccmrun")
out.print '#PBS -l gres=ccm \n'
- if (standardOutFile != null && standardOutFile != "") out.print '#PBS -o ' + standardOutFile + '\n'
- if (standardErrorFile != null && standardErrorFile != "") out.print '#PBS -e ' + standardErrorFile + '\n'
+ if (standardOutFile != null && standardOutFile != "") out.print '#PBS -o ' + workingDirectory + '/' + standardOutFile + '\n'
+ if (standardErrorFile != null && standardErrorFile != "") out.print '#PBS -e ' + workingDirectory + '/' + standardErrorFile + '\n'
if (usedMem != null && usedMem != "") out.print '#PBS -l vmem=' + usedMem + 'M\n'
if (nodes != null && nodes != "" && processPerNode != null && processPerNode != "")
out.print '#PBS -l nodes=' + nodes + ':ppn=' + processPerNode + '\n'
diff --git a/modules/configuration/server/src/main/resources/SLURM_Groovy.template b/modules/configuration/server/src/main/resources/SLURM_Arr_Groovy.template
similarity index 86%
copy from modules/configuration/server/src/main/resources/SLURM_Groovy.template
copy to modules/configuration/server/src/main/resources/SLURM_Arr_Groovy.template
index a30cecb..1d33989 100644
--- a/modules/configuration/server/src/main/resources/SLURM_Groovy.template
+++ b/modules/configuration/server/src/main/resources/SLURM_Arr_Groovy.template
@@ -3,6 +3,7 @@
# SLURM job submission script generated by Apache Airavata
<%
if (queueName != null && queueName != "") out.print '#SBATCH -p ' + queueName + '\n'
+ if (sweepCount != null && sweepCount != "") out.print '#SBATCH --array=0-' + (sweepCount - 1) + '\n'
if (nodes != null && nodes != "") out.print '#SBATCH -N ' + nodes + '\n'
if (cpuCount != null && cpuCount != "") out.print '#SBATCH -n ' + cpuCount + '\n'
if (usedMem != null && usedMem != "") out.print '#SBATCH --mem=' + usedMem + 'M\n'
@@ -10,8 +11,8 @@ if (queueName != null && queueName != "") out.print '#SBATCH -p ' + queueName +
if (accountString != null && accountString != "" ) out.print '#SBATCH -A ' + accountString + '\n'
if (maxWallTime != null && maxWallTime != "") out.print '#SBATCH -t ' + maxWallTime + '\n'
if (jobName != null && jobName != "") out.print '#SBATCH -J ' + jobName + '\n'
- if (standardOutFile != null && standardOutFile != "") out.print '#SBATCH -o ' + standardOutFile + '\n'
- if (standardErrorFile != null && standardErrorFile != "") out.print '#SBATCH -e ' + standardErrorFile + '\n'
+ if (standardOutFile != null && standardOutFile != "") out.print '#SBATCH -o ' + workingDirectory + '/%a/' + standardOutFile + '\n'
+ if (standardErrorFile != null && standardErrorFile != "") out.print '#SBATCH -e ' + workingDirectory + '/%a/' + standardErrorFile + '\n'
if (qualityOfService != null && qualityOfService != "") out.print '#SBATCH --qos=' + qualityOfService + '\n'
if (reservation != null && reservation != "") out.print '#SBATCH --reservation=' + reservation + '\n'
if (queueSpecificMacros != null) for(queueMacro in queueSpecificMacros) out.print queueMacro +'\n'
@@ -20,7 +21,7 @@ if (queueName != null && queueName != "") out.print '#SBATCH -p ' + queueName +
<% if (exports != null) for(com in exports) out.print 'export ' + com +'\n'
if (moduleCommands != null) for(mc in moduleCommands) out.print mc +'\n'
- if (workingDirectory != null && workingDirectory != "") out.print 'cd ' + workingDirectory +'\n'
+ if (workingDirectory != null && workingDirectory != "") out.print 'cd ' + workingDirectory +'/${SLURM_ARRAY_TASK_ID}\n'
if (preJobCommands != null) for(pjc in preJobCommands) out.print pjc +'\n'
if (jobSubmitterCommand != null && jobSubmitterCommand != "") out.print jobSubmitterCommand + ' '
if (executablePath != null && executablePath != "") out.print executablePath + ' '
diff --git a/modules/configuration/server/src/main/resources/SLURM_Groovy.template b/modules/configuration/server/src/main/resources/SLURM_Groovy.template
index a30cecb..5eeb6bd 100644
--- a/modules/configuration/server/src/main/resources/SLURM_Groovy.template
+++ b/modules/configuration/server/src/main/resources/SLURM_Groovy.template
@@ -10,8 +10,8 @@ if (queueName != null && queueName != "") out.print '#SBATCH -p ' + queueName +
if (accountString != null && accountString != "" ) out.print '#SBATCH -A ' + accountString + '\n'
if (maxWallTime != null && maxWallTime != "") out.print '#SBATCH -t ' + maxWallTime + '\n'
if (jobName != null && jobName != "") out.print '#SBATCH -J ' + jobName + '\n'
- if (standardOutFile != null && standardOutFile != "") out.print '#SBATCH -o ' + standardOutFile + '\n'
- if (standardErrorFile != null && standardErrorFile != "") out.print '#SBATCH -e ' + standardErrorFile + '\n'
+ if (standardOutFile != null && standardOutFile != "") out.print '#SBATCH -o ' + workingDirectory + '/' + standardOutFile + '\n'
+ if (standardErrorFile != null && standardErrorFile != "") out.print '#SBATCH -e ' + workingDirectory + '/' + standardErrorFile + '\n'
if (qualityOfService != null && qualityOfService != "") out.print '#SBATCH --qos=' + qualityOfService + '\n'
if (reservation != null && reservation != "") out.print '#SBATCH --reservation=' + reservation + '\n'
if (queueSpecificMacros != null) for(queueMacro in queueSpecificMacros) out.print queueMacro +'\n'
diff --git a/modules/configuration/server/src/main/resources/UGE_Groovy.template b/modules/configuration/server/src/main/resources/UGE_Groovy.template
index 2870401..39e53f7 100644
--- a/modules/configuration/server/src/main/resources/UGE_Groovy.template
+++ b/modules/configuration/server/src/main/resources/UGE_Groovy.template
@@ -8,8 +8,8 @@
if (mailAddress != null && mailAddress != "") out.print '#$ -M ' + mailAddress + '\n'
if (accountString != null && accountString != "") out.print '#$ -A ' + accountString + '\n'
if (maxWallTime != null && maxWallTime != "") out.print '#$ -l h_rt=' + maxWallTime + '\n'
- if (standardOutFile != null && standardOutFile != "") out.print '#$ -o ' + standardOutFile + '\n'
- if (standardErrorFile != null && standardErrorFile != "") out.print '#$ -e ' + standardErrorFile + '\n'
+ if (standardOutFile != null && standardOutFile != "") out.print '#$ -o ' + workingDirectory + '/' + standardOutFile + '\n'
+ if (standardErrorFile != null && standardErrorFile != "") out.print '#$ -e ' + workingDirectory + '/' + standardErrorFile + '\n'
if (nodes != null && nodes != "" && processPerNode != null && processPerNode != "")
out.print '#$ -pe orte ' + processPerNode
%>
diff --git a/modules/ide-integration/src/main/resources/META-INF/persistence.xml b/modules/ide-integration/src/main/resources/META-INF/persistence.xml
index f9688f4..ca86b54 100644
--- a/modules/ide-integration/src/main/resources/META-INF/persistence.xml
+++ b/modules/ide-integration/src/main/resources/META-INF/persistence.xml
@@ -111,6 +111,7 @@
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentErrorEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentInputEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputEntity</class>
+ <class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputValueEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentStatusEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentSummaryEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.GatewayEntity</class>
diff --git a/modules/ide-integration/src/main/resources/database_scripts/expcatalog-derby.sql b/modules/ide-integration/src/main/resources/database_scripts/expcatalog-derby.sql
index 1cab313..0f974bf 100644
--- a/modules/ide-integration/src/main/resources/database_scripts/expcatalog-derby.sql
+++ b/modules/ide-integration/src/main/resources/database_scripts/expcatalog-derby.sql
@@ -109,6 +109,8 @@ CREATE TABLE EXPERIMENT (
GATEWAY_INSTANCE_ID varchar(255),
ENABLE_EMAIL_NOTIFICATION SMALLINT,
EMAIL_ADDRESSES CLOB,
+ SWEEP_COUNT int DEFAULT 1,
+ EXECUTION_TYPE varchar(255) DEFAULT 'one_pass',
PRIMARY KEY (EXPERIMENT_ID),
FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE
);
@@ -152,6 +154,14 @@ CREATE TABLE EXPERIMENT_OUTPUT
FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
);
+CREATE TABLE EXPERIMENT_OUTPUT_VALUE
+(
+ EXPERIMENT_ID varchar(255) NOT NULL,
+ OUTPUT_NAME varchar(255),
+ OUTPUT_VALUE varchar(2048),
+ PRIMARY KEY(EXPERIMENT_ID,OUTPUT_NAME,OUTPUT_VALUE),
+ FOREIGN KEY (EXPERIMENT_ID,OUTPUT_NAME) REFERENCES EXPERIMENT_OUTPUT(EXPERIMENT_ID,OUTPUT_NAME) ON DELETE CASCADE
+);
CREATE TABLE EXPERIMENT_STATUS (
STATUS_ID varchar(255),
diff --git a/modules/ide-integration/src/main/resources/database_scripts/expcatalog-mysql.sql b/modules/ide-integration/src/main/resources/database_scripts/expcatalog-mysql.sql
index 1b26cf0..20621c6 100644
--- a/modules/ide-integration/src/main/resources/database_scripts/expcatalog-mysql.sql
+++ b/modules/ide-integration/src/main/resources/database_scripts/expcatalog-mysql.sql
@@ -109,6 +109,8 @@ CREATE TABLE EXPERIMENT (
GATEWAY_INSTANCE_ID varchar(255),
ENABLE_EMAIL_NOTIFICATION tinyint(1),
EMAIL_ADDRESSES text,
+ SWEEP_COUNT int DEFAULT 1,
+ EXECUTION_TYPE varchar(255) DEFAULT 'one_pass',
PRIMARY KEY (EXPERIMENT_ID),
FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE
)ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -152,6 +154,14 @@ CREATE TABLE EXPERIMENT_OUTPUT
FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
)ENGINE=InnoDB DEFAULT CHARSET=latin1;
+CREATE TABLE EXPERIMENT_OUTPUT_VALUE
+(
+ EXPERIMENT_ID varchar(255) NOT NULL,
+ OUTPUT_NAME varchar(255),
+ OUTPUT_VALUE varchar(2048),
+ PRIMARY KEY(EXPERIMENT_ID,OUTPUT_NAME,OUTPUT_VALUE),
+ FOREIGN KEY (EXPERIMENT_ID,OUTPUT_NAME) REFERENCES EXPERIMENT_OUTPUT(EXPERIMENT_ID,OUTPUT_NAME) ON DELETE CASCADE
+)ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE EXPERIMENT_STATUS (
STATUS_ID varchar(255),
diff --git a/modules/ide-integration/src/main/resources/database_scripts/init/05-parameter-sweep-migration.sql b/modules/ide-integration/src/main/resources/database_scripts/init/05-parameter-sweep-migration.sql
new file mode 100644
index 0000000..476fb9b
--- /dev/null
+++ b/modules/ide-integration/src/main/resources/database_scripts/init/05-parameter-sweep-migration.sql
@@ -0,0 +1,11 @@
+ALTER TABLE EXPERIMENT ADD COLUMN SWEEP_COUNT INT DEFAULT 1;
+ALTER TABLE EXPERIMENT ADD COLUMN EXECUTION_TYPE VARCHAR(50) DEFAULT 'one_pass';
+
+CREATE TABLE EXPERIMENT_OUTPUT_VALUE
+(
+ EXPERIMENT_ID varchar(255) NOT NULL,
+ OUTPUT_NAME varchar(255),
+ OUTPUT_VALUE varchar(2048),
+ PRIMARY KEY(EXPERIMENT_ID,OUTPUT_NAME,OUTPUT_VALUE),
+ FOREIGN KEY (EXPERIMENT_ID,OUTPUT_NAME) REFERENCES EXPERIMENT_OUTPUT(EXPERIMENT_ID,OUTPUT_NAME) ON DELETE CASCADE
+)ENGINE=InnoDB DEFAULT CHARSET=latin1;
\ No newline at end of file
diff --git a/modules/ide-integration/src/main/resources/logback.xml b/modules/ide-integration/src/main/resources/logback.xml
index 0ee0b72..c7cb812 100644
--- a/modules/ide-integration/src/main/resources/logback.xml
+++ b/modules/ide-integration/src/main/resources/logback.xml
@@ -42,8 +42,10 @@
</appender>
<logger name="ch.qos.logback" level="WARN"/>
- <logger name="org.apache.helix" level="WARN"/>
+ <logger name="net.schmizz.sshj" level="ERROR"/>
+ <logger name="org.apache.helix" level="ERROR"/>
<logger name="org.apache.zookeeper" level="ERROR"/>
+ <logger name="org.apache.airavata.helix.adaptor.PoolingSSHJClient" level="ERROR"/>
<logger name="org.apache.airavata" level="INFO"/>
<logger name="org.hibernate" level="ERROR"/>
<root level="INFO">
diff --git a/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/parser/SLURMEmailParser.java b/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/parser/SLURMEmailParser.java
index e3f2637..3f47c45 100644
--- a/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/parser/SLURMEmailParser.java
+++ b/modules/job-monitor/email-monitor/src/main/java/org/apache/airavata/monitor/email/parser/SLURMEmailParser.java
@@ -34,8 +34,8 @@ public class SLURMEmailParser implements EmailParser {
private static final Logger log = LoggerFactory.getLogger(SLURMEmailParser.class);
- private static final String REGEX = "[A-Z]*\\s[a-zA-Z]*_[a-z]*=(?<" + JOBID + ">\\d*)[ ]*[a-zA-Z]*=(?<"+
- JOBNAME + ">[a-zA-Z0-9-]*)[ ]*(?<" + STATUS + ">[]a-zA-Z ]*),.*";
+ private static final String REGEX = "[A-Z]*\\s[a-zA-Z]*_[a-z]*=(?<" + JOBID + ">\\d*)_*\\**[ ]*\\(*\\d*\\)*[ ]*[a-zA-Z]*=(?<"+
+ JOBNAME + ">[a-zA-Z0-9-]*)[ ]*(?<" + STATUS + ">[]a-zA-Z ]*),*.*";
public static final String BEGAN = "Began";
public static final String STAGE_OUT = "Staged Out";
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
index 73e9ded..291c61d 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentEntity.java
@@ -73,6 +73,12 @@ public class ExperimentEntity implements Serializable {
@Column(name = "ENABLE_EMAIL_NOTIFICATION")
public boolean enableEmailNotification;
+ @Column(name = "EXECUTION_TYPE")
+ public String executionType;
+
+ @Column(name = "SWEEP_COUNT")
+ public int sweepCount;
+
@Lob
@Column(name = "EMAIL_ADDRESSES")
public String emailAddresses;
@@ -256,4 +262,20 @@ public class ExperimentEntity implements Serializable {
public void setProcesses(List<ProcessEntity> processes) {
this.processes = processes;
}
+
+ public String getExecutionType() {
+ return executionType;
+ }
+
+ public void setExecutionType(String executionType) {
+ this.executionType = executionType;
+ }
+
+ public int getSweepCount() {
+ return sweepCount;
+ }
+
+ public void setSweepCount(int sweepCount) {
+ this.sweepCount = sweepCount;
+ }
}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputEntity.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputEntity.java
index 6e11e7b..c16628f 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputEntity.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputEntity.java
@@ -24,6 +24,7 @@ import org.apache.airavata.model.application.io.DataType;
import javax.persistence.*;
import java.io.Serializable;
+import java.util.List;
/**
* The persistent class for the experiment_output database table.
@@ -81,6 +82,10 @@ public class ExperimentOutputEntity implements Serializable {
@JoinColumn(name = "EXPERIMENT_ID", referencedColumnName = "EXPERIMENT_ID")
private ExperimentEntity experiment;
+ @OneToMany(targetEntity = ExperimentOutputValueEntity.class, cascade = CascadeType.ALL,
+ mappedBy = "experimentOutput", fetch = FetchType.EAGER)
+ private List<ExperimentOutputValueEntity> experimentOutputValues;
+
public ExperimentOutputEntity() {
}
@@ -196,4 +201,12 @@ public class ExperimentOutputEntity implements Serializable {
public void setExperiment(ExperimentEntity experiment) {
this.experiment = experiment;
}
+
+ public List<ExperimentOutputValueEntity> getExperimentOutputValues() {
+ return experimentOutputValues;
+ }
+
+ public void setExperimentOutputValues(List<ExperimentOutputValueEntity> experimentOutputValues) {
+ this.experimentOutputValues = experimentOutputValues;
+ }
}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValueEntity.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValueEntity.java
new file mode 100644
index 0000000..41caa8b
--- /dev/null
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValueEntity.java
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ package org.apache.airavata.registry.core.entities.expcatalog;
+
+import javax.persistence.*;
+
+@Entity
+@Table(name = "EXPERIMENT_OUTPUT_VALUE")
+@IdClass(ExperimentOutputValuePK.class)
+public class ExperimentOutputValueEntity {
+
+ private static final long serialVersionUID = 1L;
+
+ @Id
+ @Column(name = "EXPERIMENT_ID")
+ private String experimentId;
+
+ @Id
+ @Column(name = "OUTPUT_NAME")
+ private String name;
+
+ @Id
+ @Lob
+ @Column(name = "OUTPUT_VALUE")
+ private String value;
+
+ @ManyToOne(targetEntity = ExperimentOutputEntity.class, cascade = CascadeType.ALL, fetch = FetchType.LAZY)
+ @JoinColumns(
+ {@JoinColumn(name = "EXPERIMENT_ID", referencedColumnName = "EXPERIMENT_ID"),
+ @JoinColumn(name = "OUTPUT_NAME", referencedColumnName = "OUTPUT_NAME")}
+ )
+ private ExperimentOutputEntity experimentOutput;
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public ExperimentOutputEntity getExperimentOutput() {
+ return experimentOutput;
+ }
+
+ public void setExperimentOutput(ExperimentOutputEntity experimentOutput) {
+ this.experimentOutput = experimentOutput;
+ }
+}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValuePK.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValuePK.java
new file mode 100644
index 0000000..4286803
--- /dev/null
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/entities/expcatalog/ExperimentOutputValuePK.java
@@ -0,0 +1,78 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+ package org.apache.airavata.registry.core.entities.expcatalog;
+
+import java.io.Serializable;
+
+public class ExperimentOutputValuePK implements Serializable {
+ //default serial version id, required for serializable classes.
+ private static final long serialVersionUID = 1L;
+
+ private String experimentId;
+ private String name;
+ private String value;
+
+ public String getExperimentId() {
+ return experimentId;
+ }
+
+ public void setExperimentId(String experimentId) {
+ this.experimentId = experimentId;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof ExperimentOutputPK)) {
+ return false;
+ }
+ ExperimentOutputValuePK castOther = (ExperimentOutputValuePK) other;
+ return
+ this.experimentId.equals(castOther.experimentId)
+ && this.name.equals(castOther.name) && this.value.equals(castOther.value);
+ }
+
+ public int hashCode() {
+ final int prime = 31;
+ int hash = 17;
+ hash = hash * prime + this.experimentId.hashCode();
+ hash = hash * prime + this.name.hashCode();
+ hash = hash * prime + this.value.hashCode();
+ return hash;
+ }
+}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentOutputValueRepository.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentOutputValueRepository.java
new file mode 100644
index 0000000..cacefde
--- /dev/null
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentOutputValueRepository.java
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.airavata.registry.core.repositories.expcatalog;
+
+import org.apache.airavata.model.application.io.OutputDataValueObjectType;
+import org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputValueEntity;
+import org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputValuePK;
+import org.apache.airavata.registry.core.utils.DBConstants;
+import org.apache.airavata.registry.core.utils.ObjectMapperSingleton;
+import org.apache.airavata.registry.core.utils.QueryConstants;
+import org.apache.airavata.registry.cpi.RegistryException;
+import org.dozer.Mapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ExperimentOutputValueRepository extends ExpCatAbstractRepository<OutputDataValueObjectType, ExperimentOutputValueEntity, ExperimentOutputValuePK> {
+ private final static Logger logger = LoggerFactory.getLogger(ExperimentOutputValueRepository.class);
+
+ public ExperimentOutputValueRepository() {
+ super(OutputDataValueObjectType.class, ExperimentOutputValueEntity.class);
+ }
+
+ protected void saveExperimentOutputValues(List<OutputDataValueObjectType> experimentOutputValues, String experimentId) throws RegistryException {
+
+ for (OutputDataValueObjectType output : experimentOutputValues) {
+ Mapper mapper = ObjectMapperSingleton.getInstance();
+ ExperimentOutputValueEntity experimentOutputValueEntity = mapper.map(output, ExperimentOutputValueEntity.class);
+
+ if (experimentOutputValueEntity.getExperimentId() == null) {
+ logger.debug("Setting the ExperimentOutputValueEntity's ExperimentId");
+ experimentOutputValueEntity.setExperimentId(experimentId);
+ }
+ execute(entityManager -> entityManager.merge(experimentOutputValueEntity));
+ }
+ }
+
+ public String addExperimentOutputValues(List<OutputDataValueObjectType> experimentOutputValues, String experimentId) throws RegistryException {
+ saveExperimentOutputValues(experimentOutputValues, experimentId);
+ return experimentId;
+ }
+
+ public void updateExperimentOutputValues(List<OutputDataValueObjectType> updatedExperimentOutputValues, String experimentId) throws RegistryException {
+ saveExperimentOutputValues(updatedExperimentOutputValues, experimentId);
+ }
+
+ public List<OutputDataValueObjectType> getExperimentOutputValues(String experimentId, String name) throws RegistryException {
+ Map<String, Object> queryParameters = new HashMap<>();
+ queryParameters.put(DBConstants.ExperimentOutputValue.EXPERIMENT_ID, experimentId);
+ queryParameters.put(DBConstants.ExperimentOutputValue.NAME, name);
+ return select(QueryConstants.FIND_EXPERIMENT_OUTPUT_VALUES, -1, 0, queryParameters);
+ }
+}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentRepository.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentRepository.java
index a59056c..09be7ef 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentRepository.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/repositories/expcatalog/ExperimentRepository.java
@@ -21,6 +21,8 @@
package org.apache.airavata.registry.core.repositories.expcatalog;
import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataValueObjectType;
import org.apache.airavata.model.commons.airavata_commonsConstants;
import org.apache.airavata.model.experiment.ExperimentModel;
import org.apache.airavata.model.experiment.UserConfigurationDataModel;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
public class ExperimentRepository extends ExpCatAbstractRepository<ExperimentModel, ExperimentEntity, String> {
private final static Logger logger = LoggerFactory.getLogger(ExperimentRepository.class);
@@ -123,7 +126,19 @@ public class ExperimentRepository extends ExpCatAbstractRepository<ExperimentMod
}
public ExperimentModel getExperiment(String experimentId) throws RegistryException {
- return get(experimentId);
+ ExperimentModel experimentModel = get(experimentId);
+ ExperimentOutputValueRepository outputValueRepository = new ExperimentOutputValueRepository();
+ List<OutputDataObjectType> experimentOutputs = experimentModel.getExperimentOutputs();
+ if (experimentOutputs != null) {
+ for (OutputDataObjectType out: experimentOutputs) {
+ List<OutputDataValueObjectType> experimentOutputValues = outputValueRepository.getExperimentOutputValues(experimentId, out.getName());
+ if (experimentOutputValues != null && experimentOutputValues.size() > 0) {
+ List<String> values = experimentOutputValues.stream().map(OutputDataValueObjectType::getValue).collect(Collectors.toList());
+ out.setValue(String.join(",", values));
+ }
+ }
+ }
+ return experimentModel;
}
public String addUserConfigurationData(UserConfigurationDataModel userConfigurationDataModel, String experimentId) throws RegistryException {
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/DBConstants.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/DBConstants.java
index b5e887f..133656a 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/DBConstants.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/DBConstants.java
@@ -180,5 +180,8 @@ public class DBConstants {
public static final String GATEWAY_ID = "gatewayId";
}
-
+ public static class ExperimentOutputValue {
+ public static final String EXPERIMENT_ID = "experimentId";
+ public static final String NAME = "name";
+ }
}
diff --git a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
index a2c1737..0e95e88 100644
--- a/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
+++ b/modules/registry/registry-core/src/main/java/org/apache/airavata/registry/core/utils/QueryConstants.java
@@ -178,4 +178,7 @@ public interface QueryConstants {
String FIND_ALL_PARSERS_FOR_GATEWAY_ID = "SELECT P FROM " + ParserEntity.class.getSimpleName() + " P " +
"WHERE P.gatewayId = :" + DBConstants.Parser.GATEWAY_ID;
+
+ String FIND_EXPERIMENT_OUTPUT_VALUES = "SELECT O FROM " + ExperimentOutputValueEntity.class.getSimpleName() + " O " +
+ "WHERE O.experimentId = :" + DBConstants.ExperimentOutputValue.EXPERIMENT_ID + " AND O.name = :" + DBConstants.ExperimentOutputValue.NAME;
}
diff --git a/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml b/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml
index 9f6fe1e..835d577 100644
--- a/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml
+++ b/modules/registry/registry-core/src/main/resources/META-INF/persistence.xml
@@ -111,6 +111,7 @@
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentErrorEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentInputEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputEntity</class>
+ <class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentOutputValueEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentStatusEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.ExperimentSummaryEntity</class>
<class>org.apache.airavata.registry.core.entities.expcatalog.GatewayEntity</class>
diff --git a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql
index b093aa3..cc3b06d 100644
--- a/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql
+++ b/modules/registry/registry-core/src/main/resources/expcatalog-mysql.sql
@@ -109,6 +109,8 @@ CREATE TABLE EXPERIMENT (
GATEWAY_INSTANCE_ID varchar(255),
ENABLE_EMAIL_NOTIFICATION tinyint(1),
EMAIL_ADDRESSES text,
+ SWEEP_COUNT int DEFAULT 1,
+ EXECUTION_TYPE varchar(255) DEFAULT 'one_pass',
PRIMARY KEY (EXPERIMENT_ID),
FOREIGN KEY (PROJECT_ID) REFERENCES PROJECT(PROJECT_ID) ON DELETE CASCADE
)ENGINE=InnoDB DEFAULT CHARSET=latin1;
@@ -154,6 +156,14 @@ CREATE TABLE EXPERIMENT_OUTPUT
FOREIGN KEY (EXPERIMENT_ID) REFERENCES EXPERIMENT(EXPERIMENT_ID) ON DELETE CASCADE
)ENGINE=InnoDB DEFAULT CHARSET=latin1;
+CREATE TABLE EXPERIMENT_OUTPUT_VALUE
+(
+ EXPERIMENT_ID varchar(255) NOT NULL,
+ OUTPUT_NAME varchar(255),
+ OUTPUT_VALUE varchar(2048),
+ PRIMARY KEY(EXPERIMENT_ID,OUTPUT_NAME,OUTPUT_VALUE),
+ FOREIGN KEY (EXPERIMENT_ID,OUTPUT_NAME) REFERENCES EXPERIMENT_OUTPUT(EXPERIMENT_ID,OUTPUT_NAME) ON DELETE CASCADE
+)ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE EXPERIMENT_STATUS (
STATUS_ID varchar(255),
diff --git a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
index e54167b..8fa4dca 100644
--- a/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
+++ b/modules/registry/registry-server/registry-api-service/src/main/java/org/apache/airavata/registry/api/service/handler/RegistryServerHandler.java
@@ -50,6 +50,7 @@ import org.apache.airavata.model.appcatalog.userresourceprofile.UserResourceProf
import org.apache.airavata.model.appcatalog.userresourceprofile.UserStoragePreference;
import org.apache.airavata.model.application.io.InputDataObjectType;
import org.apache.airavata.model.application.io.OutputDataObjectType;
+import org.apache.airavata.model.application.io.OutputDataValueObjectType;
import org.apache.airavata.model.commons.ErrorModel;
import org.apache.airavata.model.data.movement.DMType;
import org.apache.airavata.model.data.movement.DataMovementInterface;
@@ -125,6 +126,7 @@ public class RegistryServerHandler implements RegistryService.Iface {
private ExperimentSummaryRepository experimentSummaryRepository = new ExperimentSummaryRepository();
private ExperimentRepository experimentRepository = new ExperimentRepository();
private ExperimentOutputRepository experimentOutputRepository = new ExperimentOutputRepository();
+ private ExperimentOutputValueRepository experimentOutputValueRepository = new ExperimentOutputValueRepository();
private ExperimentStatusRepository experimentStatusRepository = new ExperimentStatusRepository();
private ExperimentErrorRepository experimentErrorRepository = new ExperimentErrorRepository();
private ProcessRepository processRepository = new ProcessRepository();
@@ -803,6 +805,17 @@ public class RegistryServerHandler implements RegistryService.Iface {
}
else if(ExpCatChildDataType.EXPERIMENT_OUTPUT.equals(ExpCatChildDataType.valueOf(outputType))) {
experimentOutputRepository.addExperimentOutputs(outputs, id);
+ List<OutputDataValueObjectType> outValues = new ArrayList<>();
+ for (OutputDataObjectType out: outputs) {
+ String[] values = out.getValue().split(",");
+ for (String value: values) {
+ OutputDataValueObjectType outVal = new OutputDataValueObjectType();
+ outVal.setName(out.getName());
+ outVal.setValue(value);
+ outValues.add(outVal);
+ }
+ }
+ experimentOutputValueRepository.addExperimentOutputValues(outValues, id);
}
} catch (Exception e) {
logger.error(id, "Error while adding outputs", e);
diff --git a/thrift-interface-descriptions/data-models/app-catalog-models/application_io_models.thrift b/thrift-interface-descriptions/data-models/app-catalog-models/application_io_models.thrift
index 3556e4b..b215825 100644
--- a/thrift-interface-descriptions/data-models/app-catalog-models/application_io_models.thrift
+++ b/thrift-interface-descriptions/data-models/app-catalog-models/application_io_models.thrift
@@ -124,3 +124,8 @@ struct OutputDataObjectType {
11: optional string storageResourceId,
12: optional string metaData,
}
+
+struct OutputDataValueObjectType {
+ 1: required string name,
+ 2: optional string value
+}
\ No newline at end of file
diff --git a/thrift-interface-descriptions/data-models/experiment-catalog-models/experiment_model.thrift b/thrift-interface-descriptions/data-models/experiment-catalog-models/experiment_model.thrift
index 1941d64..af7976d 100644
--- a/thrift-interface-descriptions/data-models/experiment-catalog-models/experiment_model.thrift
+++ b/thrift-interface-descriptions/data-models/experiment-catalog-models/experiment_model.thrift
@@ -108,7 +108,9 @@ struct ExperimentModel {
17: optional list<status_models.ExperimentStatus> experimentStatus,
18: optional list<airavata_commons.ErrorModel> errors,
19: optional list<process_model.ProcessModel> processes,
- 20: optional airavata_workflow_model.AiravataWorkflow workflow
+ 20: optional airavata_workflow_model.AiravataWorkflow workflow,
+ 21: optional string executionType,
+ 22: optional i32 sweepCount
}
struct ExperimentSummaryModel {