You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sm...@apache.org on 2014/07/07 16:36:23 UTC
git commit: GFac is integrated to use app catalog ..but internally
fills in legacy models to minimize changes - AIRAVATA-1309
Repository: airavata
Updated Branches:
refs/heads/master 25f4f5f01 -> 2c54484db
GFac is integrated to use app catalog ..but internally fills in legacy models to minimize changes - AIRAVATA-1309
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2c54484d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2c54484d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2c54484d
Branch: refs/heads/master
Commit: 2c54484dba97a43827aeaca4ada9265ded4fcb53
Parents: 25f4f5f
Author: Suresh Marru <sm...@apache.org>
Authored: Mon Jul 7 10:36:15 2014 -0400
Committer: Suresh Marru <sm...@apache.org>
Committed: Mon Jul 7 10:36:15 2014 -0400
----------------------------------------------------------------------
.../main/resources/airavata-server.properties | 2 +
.../apache/airavata/gfac/cpi/GfacService.java | 127 ++++++-
.../gfac/cpi/gfac_cpi_serviceConstants.java | 2 +-
.../airavata/gfac/server/GfacServerHandler.java | 21 +-
.../gfac/core/context/JobExecutionContext.java | 10 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 357 ++++++++-----------
.../org/apache/airavata/gfac/core/cpi/GFac.java | 2 +-
.../apache/airavata/gfac/core/cpi/GFacImpl.java | 9 +-
.../handler/LocalDirectorySetupHandler.java | 6 +-
.../core/impl/GFACEmbeddedJobSubmitter.java | 4 +-
.../core/impl/GFACServiceJobSubmitter.java | 3 +-
11 files changed, 316 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 794dcf7..3d5e670 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -323,6 +323,8 @@ email.password=xxx
email.ssl=true
email.from=airavata@apache.org
+gateway_id=php_reference_gateway
+
apiserver.server.host=localhost
apiserver.server.port=8930
apiserver.server.min.threads=30
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
index 558bb3d..eb8ddf7 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
@@ -64,14 +64,17 @@ import org.slf4j.LoggerFactory;
* *
* * @param experimentID
* * @param taskID
+ * * @param gatewayId:
+ * * The GatewayId is inferred from security context and passed onto gfac.
* * @return sucess/failure
* *
* *
*
* @param experimentId
* @param taskId
+ * @param gatewayId
*/
- public boolean submitJob(String experimentId, String taskId) throws org.apache.thrift.TException;
+ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException;
/**
* *
@@ -97,7 +100,7 @@ import org.slf4j.LoggerFactory;
public void getGFACServiceVersion(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
- public void submitJob(String experimentId, String taskId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
public void cancelJob(String experimentId, String taskId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
@@ -145,17 +148,18 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getGFACServiceVersion failed: unknown result");
}
- public boolean submitJob(String experimentId, String taskId) throws org.apache.thrift.TException
+ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
{
- send_submitJob(experimentId, taskId);
+ send_submitJob(experimentId, taskId, gatewayId);
return recv_submitJob();
}
- public void send_submitJob(String experimentId, String taskId) throws org.apache.thrift.TException
+ public void send_submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
{
submitJob_args args = new submitJob_args();
args.setExperimentId(experimentId);
args.setTaskId(taskId);
+ args.setGatewayId(gatewayId);
sendBase("submitJob", args);
}
@@ -240,9 +244,9 @@ import org.slf4j.LoggerFactory;
}
}
- public void submitJob(String experimentId, String taskId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- submitJob_call method_call = new submitJob_call(experimentId, taskId, resultHandler, this, ___protocolFactory, ___transport);
+ submitJob_call method_call = new submitJob_call(experimentId, taskId, gatewayId, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -250,10 +254,12 @@ import org.slf4j.LoggerFactory;
public static class submitJob_call extends org.apache.thrift.async.TAsyncMethodCall {
private String experimentId;
private String taskId;
- public submitJob_call(String experimentId, String taskId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private String gatewayId;
+ public submitJob_call(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.experimentId = experimentId;
this.taskId = taskId;
+ this.gatewayId = gatewayId;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -261,6 +267,7 @@ import org.slf4j.LoggerFactory;
submitJob_args args = new submitJob_args();
args.setExperimentId(experimentId);
args.setTaskId(taskId);
+ args.setGatewayId(gatewayId);
args.write(prot);
prot.writeMessageEnd();
}
@@ -364,7 +371,7 @@ import org.slf4j.LoggerFactory;
public submitJob_result getResult(I iface, submitJob_args args) throws org.apache.thrift.TException {
submitJob_result result = new submitJob_result();
- result.success = iface.submitJob(args.experimentId, args.taskId);
+ result.success = iface.submitJob(args.experimentId, args.taskId, args.gatewayId);
result.setSuccessIsSet(true);
return result;
}
@@ -509,7 +516,7 @@ import org.slf4j.LoggerFactory;
}
public void start(I iface, submitJob_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
- iface.submitJob(args.experimentId, args.taskId,resultHandler);
+ iface.submitJob(args.experimentId, args.taskId, args.gatewayId,resultHandler);
}
}
@@ -1172,6 +1179,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
+ private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -1181,11 +1189,13 @@ import org.slf4j.LoggerFactory;
public String experimentId; // required
public String taskId; // required
+ public String gatewayId; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
EXPERIMENT_ID((short)1, "experimentId"),
- TASK_ID((short)2, "taskId");
+ TASK_ID((short)2, "taskId"),
+ GATEWAY_ID((short)3, "gatewayId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -1204,6 +1214,8 @@ import org.slf4j.LoggerFactory;
return EXPERIMENT_ID;
case 2: // TASK_ID
return TASK_ID;
+ case 3: // GATEWAY_ID
+ return GATEWAY_ID;
default:
return null;
}
@@ -1251,6 +1263,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.TASK_ID, new org.apache.thrift.meta_data.FieldMetaData("taskId", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(submitJob_args.class, metaDataMap);
}
@@ -1260,11 +1274,13 @@ import org.slf4j.LoggerFactory;
public submitJob_args(
String experimentId,
- String taskId)
+ String taskId,
+ String gatewayId)
{
this();
this.experimentId = experimentId;
this.taskId = taskId;
+ this.gatewayId = gatewayId;
}
/**
@@ -1277,6 +1293,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetTaskId()) {
this.taskId = other.taskId;
}
+ if (other.isSetGatewayId()) {
+ this.gatewayId = other.gatewayId;
+ }
}
public submitJob_args deepCopy() {
@@ -1287,6 +1306,7 @@ import org.slf4j.LoggerFactory;
public void clear() {
this.experimentId = null;
this.taskId = null;
+ this.gatewayId = null;
}
public String getExperimentId() {
@@ -1337,6 +1357,30 @@ import org.slf4j.LoggerFactory;
}
}
+ public String getGatewayId() {
+ return this.gatewayId;
+ }
+
+ public submitJob_args setGatewayId(String gatewayId) {
+ this.gatewayId = gatewayId;
+ return this;
+ }
+
+ public void unsetGatewayId() {
+ this.gatewayId = null;
+ }
+
+ /** Returns true if field gatewayId is set (has been assigned a value) and false otherwise */
+ public boolean isSetGatewayId() {
+ return this.gatewayId != null;
+ }
+
+ public void setGatewayIdIsSet(boolean value) {
+ if (!value) {
+ this.gatewayId = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case EXPERIMENT_ID:
@@ -1355,6 +1399,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case GATEWAY_ID:
+ if (value == null) {
+ unsetGatewayId();
+ } else {
+ setGatewayId((String)value);
+ }
+ break;
+
}
}
@@ -1366,6 +1418,9 @@ import org.slf4j.LoggerFactory;
case TASK_ID:
return getTaskId();
+ case GATEWAY_ID:
+ return getGatewayId();
+
}
throw new IllegalStateException();
}
@@ -1381,6 +1436,8 @@ import org.slf4j.LoggerFactory;
return isSetExperimentId();
case TASK_ID:
return isSetTaskId();
+ case GATEWAY_ID:
+ return isSetGatewayId();
}
throw new IllegalStateException();
}
@@ -1416,6 +1473,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_gatewayId = true && this.isSetGatewayId();
+ boolean that_present_gatewayId = true && that.isSetGatewayId();
+ if (this_present_gatewayId || that_present_gatewayId) {
+ if (!(this_present_gatewayId && that_present_gatewayId))
+ return false;
+ if (!this.gatewayId.equals(that.gatewayId))
+ return false;
+ }
+
return true;
}
@@ -1452,6 +1518,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetGatewayId()).compareTo(other.isSetGatewayId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetGatewayId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gatewayId, other.gatewayId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1487,6 +1563,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.taskId);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("gatewayId:");
+ if (this.gatewayId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.gatewayId);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1499,6 +1583,9 @@ import org.slf4j.LoggerFactory;
if (taskId == null) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'taskId' was not present! Struct: " + toString());
}
+ if (gatewayId == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' was not present! Struct: " + toString());
+ }
// check for sub-struct validity
}
@@ -1552,6 +1639,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // GATEWAY_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1577,6 +1672,11 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.taskId);
oprot.writeFieldEnd();
}
+ if (struct.gatewayId != null) {
+ oprot.writeFieldBegin(GATEWAY_ID_FIELD_DESC);
+ oprot.writeString(struct.gatewayId);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1596,6 +1696,7 @@ import org.slf4j.LoggerFactory;
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeString(struct.experimentId);
oprot.writeString(struct.taskId);
+ oprot.writeString(struct.gatewayId);
}
@Override
@@ -1605,6 +1706,8 @@ import org.slf4j.LoggerFactory;
struct.setExperimentIdIsSet(true);
struct.taskId = iprot.readString();
struct.setTaskIdIsSet(true);
+ struct.gatewayId = iprot.readString();
+ struct.setGatewayIdIsSet(true);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
index 0a46b6b..14fd7fe 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/gfac_cpi_serviceConstants.java
@@ -50,6 +50,6 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("all") public class gfac_cpi_serviceConstants {
- public static final String GFAC_CPI_VERSION = "0.12.0";
+ public static final String GFAC_CPI_VERSION = "0.13.0";
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 622ba61..5d59746 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -158,11 +158,28 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
}
- public boolean submitJob(String experimentId, String taskId) throws TException {
+ /**
+ * * After creating the experiment Data and Task Data in the orchestrator
+ * * Orchestrator has to invoke this operation for each Task per experiment to run
+ * * the actual Job related actions.
+ * *
+ * * @param experimentID
+ * * @param taskID
+ * * @param gatewayId:
+ * * The GatewayId is inferred from security context and passed onto gfac.
+ * * @return sucess/failure
+ * *
+ * *
+ *
+ * @param experimentId
+ * @param taskId
+ * @param gatewayId
+ */
+ public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException {
logger.info("GFac Recieved the Experiment: " + experimentId + " TaskId: " + taskId);
GFac gfac = getGfac();
try {
- return gfac.submitJob(experimentId, taskId);
+ return gfac.submitJob(experimentId, taskId, gatewayId);
} catch (GFacException e) {
throw new TException("Error launching the experiment : " + e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index a533804..9abab8d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -92,6 +92,16 @@ public class JobExecutionContext extends AbstractContext implements Serializable
private String serviceName;
private String experimentID;
+
+ public String getGatewayID() {
+ return gatewayID;
+ }
+
+ public void setGatewayID(String gatewayID) {
+ this.gatewayID = gatewayID;
+ }
+
+ private String gatewayID;
private String status;
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index c4391be..598e720 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -74,20 +74,14 @@ import org.apache.airavata.model.appcatalog.appdeployment.ApplicationDeploymentD
import org.apache.airavata.model.appcatalog.appinterface.ApplicationInterfaceDescription;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.airavata.model.workspace.experiment.DataObjectType;
-import org.apache.airavata.model.workspace.experiment.Experiment;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
-import org.apache.airavata.model.workspace.experiment.JobState;
-import org.apache.airavata.model.workspace.experiment.TaskDetails;
-import org.apache.airavata.model.workspace.experiment.TaskState;
+import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.gatewayprofile.ComputeResourcePreference;
+import org.apache.airavata.model.workspace.experiment.*;
import org.apache.airavata.registry.api.AiravataRegistry2;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.schemas.gfac.*;
import org.apache.airavata.schemas.gfac.DataType;
-import org.apache.airavata.schemas.gfac.InputParameterType;
-import org.apache.airavata.schemas.gfac.OutputParameterType;
-import org.apache.airavata.schemas.gfac.ParameterType;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
@@ -217,10 +211,10 @@ public class BetterGfacImpl implements GFac {
* @return
* @throws GFacException
*/
- public boolean submitJob(String experimentID, String taskID) throws GFacException {
+ public boolean submitJob(String experimentID, String taskID, String gatewayID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
- jobExecutionContext = createJEC(experimentID, taskID);
+ jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
return submitJob(jobExecutionContext);
} catch (Exception e) {
log.error("Error inovoking the job with experiment ID: " + experimentID);
@@ -228,203 +222,159 @@ public class BetterGfacImpl implements GFac {
}
}
- private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+ private JobExecutionContext createJEC(String experimentID, String taskID, String gatewayID) throws Exception {
+
JobExecutionContext jobExecutionContext;
- TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
- // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
- // has to be same.
+ /** FIXME:
+ * A temporary wrapper to co-relate the app catalog and experiment thrift models to old gfac schema documents.
+ * The serviceName in ExperimentData and service name in ServiceDescriptor has to be same.
+ * 1. Get the Task from the task ID and construct the Job object and save it in to registry
+ * 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
+ */
- // 1. Get the Task from the task ID and construct the Job object and save it in to registry
- // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
- String applicationId = taskData.getApplicationId();
- if (applicationId == null) {
- throw new GFacException("Error executing the job because there is not Application Name in this Experiment: " + applicationId);
+ //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
+ TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+ String applicationInterfaceId = taskData.getApplicationId();
+ String applicationDeploymentId = taskData.getApplicationDeploymentId();
+ if (null == applicationInterfaceId) {
+ throw new GFacException("Error executing the job. The required Application Id is missing");
+ }
+ if (null == applicationDeploymentId) {
+ throw new GFacException("Error executing the job. The required Application deployment Id is missing");
}
AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
- ApplicationDeploymentDescription applicationDeployement = appCatalog.getApplicationDeployment().getApplicationDeployement(taskData.getApplicationDeploymentId());
- ComputeResourceDescription computeResource = appCatalog.getComputeResource().getComputeResource(applicationDeployement.getComputeHostId());
- HostDescription hostDescription= new HostDescription();
- ApplicationDescription applicationDescription = new ApplicationDescription();
+ //fetch the compute resource, application interface and deployment information from app catalog
+ ApplicationInterfaceDescription applicationInterface = appCatalog.
+ getApplicationInterface().getApplicationInterface(applicationInterfaceId);
+ ApplicationDeploymentDescription applicationDeployment = appCatalog.
+ getApplicationDeployment().getApplicationDeployement(applicationDeploymentId);
+ ComputeResourceDescription computeResource = appCatalog.getComputeResource().
+ getComputeResource(applicationDeployment.getComputeHostId());
+ ComputeResourcePreference gatewayResourcePreferences = appCatalog.getGatewayProfile().
+ getComputeResourcePreference(gatewayID, applicationDeployment.getComputeHostId());
+
+ //Create the legacy schema docs to fill-in
+ ServiceDescription legacyServiceDescription = new ServiceDescription();
+ ServiceDescriptionType legacyServiceDescType = legacyServiceDescription.getType();
+ ApplicationDescription legacyAppDescription = new ApplicationDescription();
+ HpcApplicationDeploymentType legacyHPCAppDescType =
+ (HpcApplicationDeploymentType)legacyAppDescription.getType();
+ HostDescription legacyHostDescription= new HostDescription();
+ HostDescriptionType legacyHostDescType = legacyHostDescription.getType();
+
+ //Fetch the application inputs and outputs from the app interface and create the legacy service description.
+ List<InputParameterType> legacyInputParameters = new ArrayList<InputParameterType>();
+ List<OutputParameterType> legacyOutputParameters = new ArrayList<OutputParameterType>();
+ legacyServiceDescType.setName(applicationInterface.getApplicationName());
+ legacyServiceDescType.setDescription(applicationInterface.getApplicationName());
+
+ List<InputDataObjectType> applicationInputs = applicationInterface.getApplicationInputs();
+ for (InputDataObjectType dataObjectType : applicationInputs) {
+ InputParameterType parameter = InputParameterType.Factory.newInstance();
+ parameter.setParameterName(dataObjectType.getApplicationArgument());
+ parameter.setParameterDescription(dataObjectType.getUserFriendlyDescription());
+ ParameterType parameterType = parameter.addNewParameterType();
+ switch (dataObjectType.getType()){
+ case FLOAT:
+ parameterType.setType(DataType.FLOAT); break;
+ case INTEGER:
+ parameterType.setType(DataType.INTEGER); break;
+ case STRING:
+ parameterType.setType(DataType.STRING); break;
+ case URI:
+ parameterType.setType(DataType.URI); break;
+ }
+ parameterType.setName(parameterType.getType().toString());
+ parameter.addParameterValue(dataObjectType.getValue());
+ legacyInputParameters.add(parameter);
+ }
+ List<OutputDataObjectType> applicationOutputs = applicationInterface.getApplicationOutputs();
+ for (OutputDataObjectType dataObjectType : applicationOutputs) {
+ OutputParameterType parameter = OutputParameterType.Factory.newInstance();
+ parameter.setParameterName(dataObjectType.getName());
+ parameter.setParameterDescription(dataObjectType.getName());
+ ParameterType parameterType = parameter.addNewParameterType();
+ switch (dataObjectType.getType()){
+ case FLOAT:
+ parameterType.setType(DataType.FLOAT); break;
+ case INTEGER:
+ parameterType.setType(DataType.INTEGER); break;
+ case STRING:
+ parameterType.setType(DataType.STRING); break;
+ case URI:
+ parameterType.setType(DataType.URI); break;
+ }
+ parameterType.setName(parameterType.getType().toString());
+ legacyOutputParameters.add(parameter);
+ }
- hostDescription.getType().setHostName(computeResource.getHostName());
- hostDescription.getType().setHostAddress(computeResource.getIpAddresses().iterator().next());
-
-// String preferredJobSubmissionProtocol = computeResource.getPreferredJobSubmissionProtocol();
-// String preferredDataMovementProtocol = computeResource.getDataMovementProtocols().keySet().iterator().next();
-//
-// if (preferredJobSubmissionProtocol==null){
-// preferredJobSubmissionProtocol=computeResource.getJobSubmissionProtocols().keySet().iterator().next();
-// }
-// JobSubmissionProtocol jobSubmissionProtocol = computeResource.getJobSubmissionProtocols().get(preferredJobSubmissionProtocol);
-// DataMovementProtocol dataMovementProtocol = computeResource.getDataMovementProtocols().get(preferredDataMovementProtocol);
-//
-// if (jobSubmissionProtocol==JobSubmissionProtocol.GRAM){
-// hostDescription.getType().changeType(GlobusHostType.type);
-//
-// applicationDescription.getType().changeType(HpcApplicationDeploymentType.type);
-// HpcApplicationDeploymentType app=(HpcApplicationDeploymentType)applicationDescription.getType();
-//
-// GlobusJobSubmission globusJobSubmission = appCatalog.getComputeResource().getGlobusJobSubmission(preferredJobSubmissionProtocol);
-// ((GlobusHostType)hostDescription.getType()).setGlobusGateKeeperEndPointArray(globusJobSubmission.getGlobusGateKeeperEndPoint().toArray(new String[]{}));
-// if (dataMovementProtocol==DataMovementProtocol.GridFTP) {
-// GridFTPDataMovement gridFTPDataMovement = appCatalog.getComputeResource().getGridFTPDataMovement(preferredDataMovementProtocol);
-// ((GlobusHostType) hostDescription.getType())
-// .setGridFTPEndPointArray(gridFTPDataMovement
-// .getGridFTPEndPoint().toArray(
-// new String[] {}));
-// }
-// ////////////////
-// if (computeResource.getHostName().equalsIgnoreCase("trestles.sdsc.edu")){
-// ProjectAccountType projectAccountType = app.addNewProjectAccount();
-// projectAccountType.setProjectAccountNumber("sds128");
-//
-// QueueType queueType = app.addNewQueue();
-// queueType.setQueueName("normal");
-//
-// app.setCpuCount(1);
-// app.setJobType(JobTypeType.SERIAL);
-// app.setNodeCount(1);
-// app.setProcessorsPerNode(1);
-//
-// String tempDir = "/home/ogce/scratch";
-// app.setScratchWorkingDirectory(tempDir);
-// app.setMaxMemory(10);
-// }
-// ////////////////
-// } else if (jobSubmissionProtocol==JobSubmissionProtocol.GSISSH){
-// hostDescription.getType().changeType(GsisshHostType.type);
-// applicationDescription.getType().changeType(HpcApplicationDeploymentType.type);
-// HpcApplicationDeploymentType app=(HpcApplicationDeploymentType)applicationDescription.getType();
-//
-// GSISSHJobSubmission gsisshJobSubmission = appCatalog.getComputeResource().getGSISSHJobSubmission(preferredJobSubmissionProtocol);
-// ((GsisshHostType) hostDescription.getType()).setPort(gsisshJobSubmission.getSshPort());
-// ((GsisshHostType) hostDescription.getType()).setInstalledPath(gsisshJobSubmission.getInstalledPath());
-// if (computeResource.getHostName().equalsIgnoreCase("lonestar.tacc.utexas.edu")){
-// ((GsisshHostType) hostDescription.getType()).setJobManager("sge");
-// ((GsisshHostType) hostDescription.getType()).setInstalledPath("/opt/sge6.2/bin/lx24-amd64/");
-// ((GsisshHostType) hostDescription.getType()).setPort(22);
-// ProjectAccountType projectAccountType = app.addNewProjectAccount();
-// projectAccountType.setProjectAccountNumber("TG-STA110014S");
-// QueueType queueType = app.addNewQueue();
-// queueType.setQueueName("normal");
-// app.setCpuCount(1);
-// app.setJobType(JobTypeType.SERIAL);
-// app.setNodeCount(1);
-// app.setProcessorsPerNode(1);
-// app.setMaxWallTime(10);
-// String tempDir = "/home1/01437/ogce";
-// app.setScratchWorkingDirectory(tempDir);
-// app.setInstalledParentPath("/opt/sge6.2/bin/lx24-amd64/");
-// } else if (computeResource.getHostName().equalsIgnoreCase("stampede.tacc.xsede.org")){
-// ((GsisshHostType) hostDescription.getType()).setJobManager("slurm");
-// ((GsisshHostType) hostDescription.getType()).setInstalledPath("/usr/bin/");
-// ((GsisshHostType) hostDescription.getType()).setPort(2222);
-// ((GsisshHostType) hostDescription.getType()).setMonitorMode("push");
-//
-// ProjectAccountType projectAccountType = app.addNewProjectAccount();
-// projectAccountType.setProjectAccountNumber("TG-STA110014S");
-//
-// QueueType queueType = app.addNewQueue();
-// queueType.setQueueName("normal");
-//
-// app.setCpuCount(1);
-// app.setJobType(JobTypeType.SERIAL);
-// app.setNodeCount(1);
-// app.setProcessorsPerNode(1);
-// app.setMaxWallTime(10);
-// String tempDir = "/home1/01437/ogce";
-// app.setScratchWorkingDirectory(tempDir);
-// app.setInstalledParentPath("/usr/bin/");
-//
-// } else if (computeResource.getHostName().equalsIgnoreCase("trestles.sdsc.edu")){
-// ProjectAccountType projectAccountType = app.addNewProjectAccount();
-// projectAccountType.setProjectAccountNumber("sds128");
-//
-// QueueType queueType = app.addNewQueue();
-// queueType.setQueueName("normal");
-//
-// app.setCpuCount(1);
-// app.setJobType(JobTypeType.SERIAL);
-// app.setNodeCount(1);
-// app.setProcessorsPerNode(1);
-// app.setMaxWallTime(10);
-// String tempDir = "/oasis/scratch/trestles/ogce/temp_project/";
-// app.setScratchWorkingDirectory(tempDir);
-// app.setInstalledParentPath("/opt/torque/bin/");
-// }
-// } else if (jobSubmissionProtocol==JobSubmissionProtocol.SSH){
-// hostDescription.getType().changeType(SSHHostType.type);
-// SSHJobSubmission sshJobSubmission = appCatalog.getComputeResource().getSSHJobSubmission(preferredJobSubmissionProtocol);
-// applicationDescription.getType().setExecutableLocation(applicationDeployement.getExecutablePath());
-// //TODO update scratch location
-// if (computeResource.getHostName().equalsIgnoreCase("gw111.iu.xsede.org")){
-// applicationDescription.getType().setScratchWorkingDirectory("/tmp");
-// }
-// }
-
- ApplicationInterfaceDescription applicationInterface = appCatalog.getApplicationInterface().getApplicationInterface(applicationId);
-
- ServiceDescription serviceDescription = new ServiceDescription();
- List<InputParameterType> inputParameters = new ArrayList<InputParameterType>();
- List<OutputParameterType> outputParameters = new ArrayList<OutputParameterType>();
- serviceDescription.getType().setName(applicationInterface.getApplicationName());
- serviceDescription.getType().setDescription(applicationInterface.getApplicationName());
-
- List<InputDataObjectType> applicationInputs = applicationInterface.getApplicationInputs();
- for (InputDataObjectType dataObjectType : applicationInputs) {
- InputParameterType parameter = InputParameterType.Factory.newInstance();
- parameter.setParameterName(dataObjectType.getApplicationArgument());
- parameter.setParameterDescription(dataObjectType.getUserFriendlyDescription());
- ParameterType parameterType = parameter.addNewParameterType();
- switch (dataObjectType.getType()){
- case FLOAT:
- parameterType.setType(DataType.FLOAT); break;
- case INTEGER:
- parameterType.setType(DataType.INTEGER); break;
- case STRING:
- parameterType.setType(DataType.STRING); break;
- case URI:
- parameterType.setType(DataType.URI); break;
- }
- parameterType.setName(parameterType.getType().toString());
- parameter.addParameterValue(dataObjectType.getValue());
- inputParameters.add(parameter);
- }
- List<OutputDataObjectType> applicationOutputs = applicationInterface.getApplicationOutputs();
- for (OutputDataObjectType dataObjectType : applicationOutputs) {
- OutputParameterType parameter = OutputParameterType.Factory.newInstance();
- parameter.setParameterName(dataObjectType.getName());
- parameter.setParameterDescription(dataObjectType.getName());
- ParameterType parameterType = parameter.addNewParameterType();
- switch (dataObjectType.getType()){
- case FLOAT:
- parameterType.setType(DataType.FLOAT); break;
- case INTEGER:
- parameterType.setType(DataType.INTEGER); break;
- case STRING:
- parameterType.setType(DataType.STRING); break;
- case URI:
- parameterType.setType(DataType.URI); break;
- }
- parameterType.setName(parameterType.getType().toString());
- outputParameters.add(parameter);
- }
-
- serviceDescription.getType().setInputParametersArray(inputParameters.toArray(new InputParameterType[]{}));
- serviceDescription.getType().setOutputParametersArray(outputParameters.toArray(new OutputParameterType[]{}));
-
-
+ legacyServiceDescType.setInputParametersArray(legacyInputParameters.toArray(new InputParameterType[]{}));
+ legacyServiceDescType.setOutputParametersArray(legacyOutputParameters.toArray(new OutputParameterType[]{}));
+
+ //Fetch deployment information and fill-in legacy doc
+ legacyHPCAppDescType.addNewApplicationName().setStringValue(applicationDeploymentId);
+ legacyHPCAppDescType.setExecutableLocation(applicationDeployment.getExecutablePath());
+ switch (applicationDeployment.getParallelism()) {
+ case SERIAL:
+ legacyHPCAppDescType.setJobType(JobTypeType.SERIAL);
+ case MPI:
+ legacyHPCAppDescType.setJobType(JobTypeType.MPI);
+ case OPENMP:
+ legacyHPCAppDescType.setJobType(JobTypeType.OPEN_MP);
+ }
+
+ //Fetch scheduling information from experiment request
+ ComputationalResourceScheduling taskSchedule = taskData.getTaskScheduling();
+ QueueType queueType = legacyHPCAppDescType.addNewQueue();
+ queueType.setQueueName(taskSchedule.getQueueName());
+ legacyHPCAppDescType.setCpuCount(taskSchedule.getTotalCPUCount());
+ legacyHPCAppDescType.setNodeCount(taskSchedule.getNodeCount());
+ legacyHPCAppDescType.setMaxWallTime(taskSchedule.getWallTimeLimit());
+ //legacyHPCAppDescType.setInstalledParentPath();
+
+
+ //Fetch from gateway profile
+ ProjectAccountType projectAccountType = legacyHPCAppDescType.addNewProjectAccount();
+ projectAccountType.setProjectAccountNumber(gatewayResourcePreferences.getAllocationProjectNumber());
+ legacyHPCAppDescType.setScratchWorkingDirectory(gatewayResourcePreferences.getScratchLocation());
+
+ //Fetch the host description details and fill-in legacy doc
+ legacyHostDescType.setHostName(computeResource.getHostName());
+ legacyHostDescType.setHostAddress(computeResource.getHostName());
+ //hostDescription.getType().setHostAddress(computeResource.getIpAddresses().iterator().next());
+
+ for (JobSubmissionInterface jobSubmissionInterface : computeResource.getJobSubmissionInterfaces())
+ if (jobSubmissionInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.LOCAL) {
+ LOCALSubmission localSubmission =
+ appCatalog.getComputeResource().getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ } else {
+ if (jobSubmissionInterface.getJobSubmissionProtocol() == JobSubmissionProtocol.SSH) {
+
+ //Fetch from App catalog Compute Resource
+ SSHJobSubmission sshJobSubmission =
+ appCatalog.getComputeResource().getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
+ if (sshJobSubmission.getSecurityProtocol() == SecurityProtocol.GSI) {
+ ResourceJobManager resourceJobManager = sshJobSubmission.getResourceJobManager();
+ ((GsisshHostType) legacyHostDescription.getType()).setJobManager
+ (resourceJobManager.getResourceJobManagerType().name());
+ ((GsisshHostType) legacyHostDescType).setInstalledPath(resourceJobManager.getJobManagerBinPath());
+ // applicationDescription.setInstalledParentPath(resourceJobManager.getJobManagerBinPath());
+ ((GsisshHostType) legacyHostDescType).setPort(sshJobSubmission.getSshPort());
+
+ }
+
+ }
+ }
URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
Properties configurationProperties = ServerSettings.getProperties();
GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
-
// start constructing jobexecutioncontext
- jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationId);
+ jobExecutionContext = new JobExecutionContext(gFacConfiguration, applicationInterfaceId);
// setting experiment/task/workflownode related information
Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
@@ -432,23 +382,24 @@ public class BetterGfacImpl implements GFac {
jobExecutionContext.setExperimentID(experimentID);
jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
jobExecutionContext.setTaskData(taskData);
+ jobExecutionContext.setGatewayID(gatewayID);
// setting the registry
jobExecutionContext.setRegistry(registry);
ApplicationContext applicationContext = new ApplicationContext();
- applicationContext.setApplicationDeploymentDescription(applicationDescription);
- applicationContext.setHostDescription(hostDescription);
- applicationContext.setServiceDescription(serviceDescription);
+// applicationContext.setApplicationDeploymentDescription(applicationDescription);
+ applicationContext.setHostDescription(legacyHostDescription);
+ applicationContext.setServiceDescription(legacyServiceDescription);
jobExecutionContext.setApplicationContext(applicationContext);
List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
- serviceDescription.getType().getInputParametersArray())));
+ legacyServiceDescription.getType().getInputParametersArray())));
List<DataObjectType> outputData = taskData.getApplicationOutputs();
jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getMessageContext(outputData,
- serviceDescription.getType().getOutputParametersArray())));
+ legacyServiceDescription.getType().getOutputParametersArray())));
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
jobExecutionContext.setGfac(this);
@@ -719,7 +670,8 @@ public class BetterGfacImpl implements GFac {
handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
} else {
try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
} catch (Exception e) {
log.error("Error constructing job execution context during outhandler invocation");
throw new GFacException(e);
@@ -837,7 +789,8 @@ public class BetterGfacImpl implements GFac {
handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
} else {
try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
} catch (Exception e) {
log.error("Error constructing job execution context during outhandler invocation");
throw new GFacException(e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index 291d3ce..f161a55 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -37,7 +37,7 @@ public interface GFac {
* @return boolean Successful acceptence of the jobExecution returns a true value
* @throws org.apache.airavata.gfac.GFacException
*/
- public boolean submitJob(String experimentID,String taskID) throws GFacException;
+ public boolean submitJob(String experimentID,String taskID, String gatewayID) throws GFacException;
/**
* This method can be used in a handler to ivvoke outhandler asynchronously
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index c342886..c1ab92b 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -193,10 +193,10 @@ public class GFacImpl implements GFac {
* @return
* @throws GFacException
*/
- public boolean submitJob(String experimentID,String taskID) throws GFacException {
+ public boolean submitJob(String experimentID,String taskID, String gatewayID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
- jobExecutionContext = createJEC(experimentID, taskID);
+ jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
return submitJob(jobExecutionContext);
} catch (Exception e) {
log.error("Error inovoking the job with experiment ID: " + experimentID);
@@ -204,7 +204,7 @@ public class GFacImpl implements GFac {
}
}
- private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+ private JobExecutionContext createJEC(String experimentID, String taskID, String gatewayID) throws Exception {
JobExecutionContext jobExecutionContext;
TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
@@ -413,7 +413,8 @@ public class GFacImpl implements GFac {
handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
}else {
try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
} catch (Exception e) {
log.error("Error constructing job execution context during outhandler invocation");
throw new GFacException(e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
index 541990e..de516c0 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
@@ -36,11 +36,11 @@ public class LocalDirectorySetupHandler implements GFacHandler {
private static final Logger log = LoggerFactory.getLogger(LocalDirectorySetupHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- log.info("Invoking GramDirectorySetupHandler ...");
+ log.info("Invoking LocalDirectorySetupHandler ...");
HostDescriptionType type = jobExecutionContext.getApplicationContext().getHostDescription().getType();
ApplicationDescription applicationDeploymentDescription = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
ApplicationDeploymentDescriptionType app = applicationDeploymentDescription.getType();
- log.debug("working diectroy = " + app.getStaticWorkingDirectory());
+ log.debug("working directory = " + app.getStaticWorkingDirectory());
log.debug("temp directory = " + app.getScratchWorkingDirectory());
makeFileSystemDir(app.getStaticWorkingDirectory(),jobExecutionContext);
@@ -53,7 +53,7 @@ public class LocalDirectorySetupHandler implements GFacHandler {
if (f.isDirectory() && f.exists()) {
return;
} else if (!new File(dir).mkdir()) {
- throw new GFacHandlerException("Cannot make directory "+dir);
+ throw new GFacHandlerException("Cannot create directory " + dir);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index 518ab05..ff6b5e8 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -21,6 +21,8 @@
package org.apache.airavata.orchestrator.core.impl;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
@@ -55,7 +57,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
public boolean submit(String experimentID, String taskID) throws OrchestratorException {
try {
- return gfac.submitJob(experimentID, taskID);
+ return gfac.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
} catch (Exception e) {
String error = "Error launching the job : " + experimentID;
logger.error(error);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2c54484d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 890ba1c..212e892 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -95,7 +95,8 @@ public class GFACServiceJobSubmitter implements JobSubmitter, Watcher {
GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) { // before submitting the job we check again the state of the node
if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild,tokenId)) {
- return localhost.submitJob(experimentID, taskID);
+ //FIXME:: The GatewayID is temporarily read from properties file. It should instead be inferred from the token.
+ return localhost.submitJob(experimentID, taskID, ServerSettings.getSetting(Constants.GATEWAY_NAME));
}
}
} catch (TException e) {