You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by ch...@apache.org on 2015/05/11 16:57:18 UTC
airavata git commit: adding token id to submit and cancel methods
Repository: airavata
Updated Branches:
refs/heads/master 7b8e933a4 -> 43c313f9a
adding token id to submit and cancel methods
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/43c313f9
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/43c313f9
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/43c313f9
Branch: refs/heads/master
Commit: 43c313f9a1234154e7093a5b63cbf8e6bf0efe49
Parents: 7b8e933
Author: Chathuri Wimalasena <ch...@apache.org>
Authored: Mon May 11 10:57:12 2015 -0400
Committer: Chathuri Wimalasena <ch...@apache.org>
Committed: Mon May 11 10:57:12 2015 -0400
----------------------------------------------------------------------
.../airavata/common/utils/AiravataZKUtils.java | 9 -
.../apache/airavata/gfac/cpi/GfacService.java | 250 +++++++++++++++++--
.../airavata/gfac/server/GfacServerHandler.java | 12 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 7 +-
.../org/apache/airavata/gfac/core/cpi/GFac.java | 4 +-
.../airavata/gfac/core/utils/GFacUtils.java | 6 -
.../gfac/core/utils/InputHandlerWorker.java | 12 +-
.../gfac.cpi.service.thrift | 6 +-
.../core/impl/GFACEmbeddedJobSubmitter.java | 2 +-
pom.xml | 2 +-
10 files changed, 248 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index 3a38ba1..9b42df9 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -79,15 +79,6 @@ public class AiravataZKUtils implements Watcher {
"state";
}
- public static String getExpTokenId(ZooKeeper zk, String expId) throws ApplicationSettingsException,
- KeeperException, InterruptedException {
- Stat exists = zk.exists(getExpZnodePath(expId), false);
- if (exists != null) {
- return new String(zk.getData(getExpZnodePath(expId), false, exists));
- }
- return null;
- }
-
public static String getExpState(ZooKeeper zk, String expId) throws ApplicationSettingsException,
KeeperException, InterruptedException {
Stat exists = zk.exists(getExpStatePath(expId), false);
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
index 24f4de8..213b834 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/cpi/GfacService.java
@@ -73,8 +73,9 @@ import org.slf4j.LoggerFactory;
* @param experimentId
* @param taskId
* @param gatewayId
+ * @param tokenId
*/
- public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException;
+ public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException;
/**
* *
@@ -92,8 +93,9 @@ import org.slf4j.LoggerFactory;
* @param experimentId
* @param taskId
* @param gatewayId
+ * @param tokenId
*/
- public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException;
+ public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException;
}
@@ -101,9 +103,9 @@ import org.slf4j.LoggerFactory;
public void getGFACServiceVersion(org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
- public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void submitJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
- public void cancelJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
+ public void cancelJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException;
}
@@ -149,18 +151,19 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "getGFACServiceVersion failed: unknown result");
}
- public boolean submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
+ public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException
{
- send_submitJob(experimentId, taskId, gatewayId);
+ send_submitJob(experimentId, taskId, gatewayId, tokenId);
return recv_submitJob();
}
- public void send_submitJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
+ public void send_submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException
{
submitJob_args args = new submitJob_args();
args.setExperimentId(experimentId);
args.setTaskId(taskId);
args.setGatewayId(gatewayId);
+ args.setTokenId(tokenId);
sendBase("submitJob", args);
}
@@ -174,18 +177,19 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "submitJob failed: unknown result");
}
- public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
+ public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException
{
- send_cancelJob(experimentId, taskId, gatewayId);
+ send_cancelJob(experimentId, taskId, gatewayId, tokenId);
return recv_cancelJob();
}
- public void send_cancelJob(String experimentId, String taskId, String gatewayId) throws org.apache.thrift.TException
+ public void send_cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws org.apache.thrift.TException
{
cancelJob_args args = new cancelJob_args();
args.setExperimentId(experimentId);
args.setTaskId(taskId);
args.setGatewayId(gatewayId);
+ args.setTokenId(tokenId);
sendBase("cancelJob", args);
}
@@ -246,9 +250,9 @@ import org.slf4j.LoggerFactory;
}
}
- public void submitJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void submitJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- submitJob_call method_call = new submitJob_call(experimentId, taskId, gatewayId, resultHandler, this, ___protocolFactory, ___transport);
+ submitJob_call method_call = new submitJob_call(experimentId, taskId, gatewayId, tokenId, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -257,11 +261,13 @@ import org.slf4j.LoggerFactory;
private String experimentId;
private String taskId;
private String gatewayId;
- public submitJob_call(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private String tokenId;
+ public submitJob_call(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -270,6 +276,7 @@ import org.slf4j.LoggerFactory;
args.setExperimentId(experimentId);
args.setTaskId(taskId);
args.setGatewayId(gatewayId);
+ args.setTokenId(tokenId);
args.write(prot);
prot.writeMessageEnd();
}
@@ -284,9 +291,9 @@ import org.slf4j.LoggerFactory;
}
}
- public void cancelJob(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
+ public void cancelJob(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException {
checkReady();
- cancelJob_call method_call = new cancelJob_call(experimentId, taskId, gatewayId, resultHandler, this, ___protocolFactory, ___transport);
+ cancelJob_call method_call = new cancelJob_call(experimentId, taskId, gatewayId, tokenId, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -295,11 +302,13 @@ import org.slf4j.LoggerFactory;
private String experimentId;
private String taskId;
private String gatewayId;
- public cancelJob_call(String experimentId, String taskId, String gatewayId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ private String tokenId;
+ public cancelJob_call(String experimentId, String taskId, String gatewayId, String tokenId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
super(client, protocolFactory, transport, resultHandler, false);
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
@@ -308,6 +317,7 @@ import org.slf4j.LoggerFactory;
args.setExperimentId(experimentId);
args.setTaskId(taskId);
args.setGatewayId(gatewayId);
+ args.setTokenId(tokenId);
args.write(prot);
prot.writeMessageEnd();
}
@@ -376,7 +386,7 @@ import org.slf4j.LoggerFactory;
public submitJob_result getResult(I iface, submitJob_args args) throws org.apache.thrift.TException {
submitJob_result result = new submitJob_result();
- result.success = iface.submitJob(args.experimentId, args.taskId, args.gatewayId);
+ result.success = iface.submitJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId);
result.setSuccessIsSet(true);
return result;
}
@@ -397,7 +407,7 @@ import org.slf4j.LoggerFactory;
public cancelJob_result getResult(I iface, cancelJob_args args) throws org.apache.thrift.TException {
cancelJob_result result = new cancelJob_result();
- result.success = iface.cancelJob(args.experimentId, args.taskId, args.gatewayId);
+ result.success = iface.cancelJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId);
result.setSuccessIsSet(true);
return result;
}
@@ -521,7 +531,7 @@ import org.slf4j.LoggerFactory;
}
public void start(I iface, submitJob_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
- iface.submitJob(args.experimentId, args.taskId, args.gatewayId,resultHandler);
+ iface.submitJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId,resultHandler);
}
}
@@ -573,7 +583,7 @@ import org.slf4j.LoggerFactory;
}
public void start(I iface, cancelJob_args args, org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws TException {
- iface.cancelJob(args.experimentId, args.taskId, args.gatewayId,resultHandler);
+ iface.cancelJob(args.experimentId, args.taskId, args.gatewayId, args.tokenId,resultHandler);
}
}
@@ -1185,6 +1195,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
+ private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -1195,12 +1206,14 @@ import org.slf4j.LoggerFactory;
public String experimentId; // required
public String taskId; // required
public String gatewayId; // required
+ public String tokenId; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
EXPERIMENT_ID((short)1, "experimentId"),
TASK_ID((short)2, "taskId"),
- GATEWAY_ID((short)3, "gatewayId");
+ GATEWAY_ID((short)3, "gatewayId"),
+ TOKEN_ID((short)4, "tokenId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -1221,6 +1234,8 @@ import org.slf4j.LoggerFactory;
return TASK_ID;
case 3: // GATEWAY_ID
return GATEWAY_ID;
+ case 4: // TOKEN_ID
+ return TOKEN_ID;
default:
return null;
}
@@ -1270,6 +1285,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(submitJob_args.class, metaDataMap);
}
@@ -1280,12 +1297,14 @@ import org.slf4j.LoggerFactory;
public submitJob_args(
String experimentId,
String taskId,
- String gatewayId)
+ String gatewayId,
+ String tokenId)
{
this();
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
/**
@@ -1301,6 +1320,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetGatewayId()) {
this.gatewayId = other.gatewayId;
}
+ if (other.isSetTokenId()) {
+ this.tokenId = other.tokenId;
+ }
}
public submitJob_args deepCopy() {
@@ -1312,6 +1334,7 @@ import org.slf4j.LoggerFactory;
this.experimentId = null;
this.taskId = null;
this.gatewayId = null;
+ this.tokenId = null;
}
public String getExperimentId() {
@@ -1386,6 +1409,30 @@ import org.slf4j.LoggerFactory;
}
}
+ public String getTokenId() {
+ return this.tokenId;
+ }
+
+ public submitJob_args setTokenId(String tokenId) {
+ this.tokenId = tokenId;
+ return this;
+ }
+
+ public void unsetTokenId() {
+ this.tokenId = null;
+ }
+
+ /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTokenId() {
+ return this.tokenId != null;
+ }
+
+ public void setTokenIdIsSet(boolean value) {
+ if (!value) {
+ this.tokenId = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case EXPERIMENT_ID:
@@ -1412,6 +1459,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case TOKEN_ID:
+ if (value == null) {
+ unsetTokenId();
+ } else {
+ setTokenId((String)value);
+ }
+ break;
+
}
}
@@ -1426,6 +1481,9 @@ import org.slf4j.LoggerFactory;
case GATEWAY_ID:
return getGatewayId();
+ case TOKEN_ID:
+ return getTokenId();
+
}
throw new IllegalStateException();
}
@@ -1443,6 +1501,8 @@ import org.slf4j.LoggerFactory;
return isSetTaskId();
case GATEWAY_ID:
return isSetGatewayId();
+ case TOKEN_ID:
+ return isSetTokenId();
}
throw new IllegalStateException();
}
@@ -1487,6 +1547,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_tokenId = true && this.isSetTokenId();
+ boolean that_present_tokenId = true && that.isSetTokenId();
+ if (this_present_tokenId || that_present_tokenId) {
+ if (!(this_present_tokenId && that_present_tokenId))
+ return false;
+ if (!this.tokenId.equals(that.tokenId))
+ return false;
+ }
+
return true;
}
@@ -1533,6 +1602,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTokenId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1576,6 +1655,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.gatewayId);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("tokenId:");
+ if (this.tokenId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.tokenId);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -1591,6 +1678,9 @@ import org.slf4j.LoggerFactory;
if (gatewayId == null) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' was not present! Struct: " + toString());
}
+ if (tokenId == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' was not present! Struct: " + toString());
+ }
// check for sub-struct validity
}
@@ -1652,6 +1742,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // TOKEN_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1682,6 +1780,11 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.gatewayId);
oprot.writeFieldEnd();
}
+ if (struct.tokenId != null) {
+ oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC);
+ oprot.writeString(struct.tokenId);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1702,6 +1805,7 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.experimentId);
oprot.writeString(struct.taskId);
oprot.writeString(struct.gatewayId);
+ oprot.writeString(struct.tokenId);
}
@Override
@@ -1713,6 +1817,8 @@ import org.slf4j.LoggerFactory;
struct.setTaskIdIsSet(true);
struct.gatewayId = iprot.readString();
struct.setGatewayIdIsSet(true);
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
}
}
@@ -2078,6 +2184,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
+ private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -2088,12 +2195,14 @@ import org.slf4j.LoggerFactory;
public String experimentId; // required
public String taskId; // required
public String gatewayId; // required
+ public String tokenId; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
EXPERIMENT_ID((short)1, "experimentId"),
TASK_ID((short)2, "taskId"),
- GATEWAY_ID((short)3, "gatewayId");
+ GATEWAY_ID((short)3, "gatewayId"),
+ TOKEN_ID((short)4, "tokenId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -2114,6 +2223,8 @@ import org.slf4j.LoggerFactory;
return TASK_ID;
case 3: // GATEWAY_ID
return GATEWAY_ID;
+ case 4: // TOKEN_ID
+ return TOKEN_ID;
default:
return null;
}
@@ -2163,6 +2274,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(cancelJob_args.class, metaDataMap);
}
@@ -2173,12 +2286,14 @@ import org.slf4j.LoggerFactory;
public cancelJob_args(
String experimentId,
String taskId,
- String gatewayId)
+ String gatewayId,
+ String tokenId)
{
this();
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
/**
@@ -2194,6 +2309,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetGatewayId()) {
this.gatewayId = other.gatewayId;
}
+ if (other.isSetTokenId()) {
+ this.tokenId = other.tokenId;
+ }
}
public cancelJob_args deepCopy() {
@@ -2205,6 +2323,7 @@ import org.slf4j.LoggerFactory;
this.experimentId = null;
this.taskId = null;
this.gatewayId = null;
+ this.tokenId = null;
}
public String getExperimentId() {
@@ -2279,6 +2398,30 @@ import org.slf4j.LoggerFactory;
}
}
+ public String getTokenId() {
+ return this.tokenId;
+ }
+
+ public cancelJob_args setTokenId(String tokenId) {
+ this.tokenId = tokenId;
+ return this;
+ }
+
+ public void unsetTokenId() {
+ this.tokenId = null;
+ }
+
+ /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTokenId() {
+ return this.tokenId != null;
+ }
+
+ public void setTokenIdIsSet(boolean value) {
+ if (!value) {
+ this.tokenId = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case EXPERIMENT_ID:
@@ -2305,6 +2448,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case TOKEN_ID:
+ if (value == null) {
+ unsetTokenId();
+ } else {
+ setTokenId((String)value);
+ }
+ break;
+
}
}
@@ -2319,6 +2470,9 @@ import org.slf4j.LoggerFactory;
case GATEWAY_ID:
return getGatewayId();
+ case TOKEN_ID:
+ return getTokenId();
+
}
throw new IllegalStateException();
}
@@ -2336,6 +2490,8 @@ import org.slf4j.LoggerFactory;
return isSetTaskId();
case GATEWAY_ID:
return isSetGatewayId();
+ case TOKEN_ID:
+ return isSetTokenId();
}
throw new IllegalStateException();
}
@@ -2380,6 +2536,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_tokenId = true && this.isSetTokenId();
+ boolean that_present_tokenId = true && that.isSetTokenId();
+ if (this_present_tokenId || that_present_tokenId) {
+ if (!(this_present_tokenId && that_present_tokenId))
+ return false;
+ if (!this.tokenId.equals(that.tokenId))
+ return false;
+ }
+
return true;
}
@@ -2426,6 +2591,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTokenId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -2469,6 +2644,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.gatewayId);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("tokenId:");
+ if (this.tokenId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.tokenId);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -2484,6 +2667,9 @@ import org.slf4j.LoggerFactory;
if (gatewayId == null) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' was not present! Struct: " + toString());
}
+ if (tokenId == null) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' was not present! Struct: " + toString());
+ }
// check for sub-struct validity
}
@@ -2545,6 +2731,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // TOKEN_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -2575,6 +2769,11 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.gatewayId);
oprot.writeFieldEnd();
}
+ if (struct.tokenId != null) {
+ oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC);
+ oprot.writeString(struct.tokenId);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -2595,6 +2794,7 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.experimentId);
oprot.writeString(struct.taskId);
oprot.writeString(struct.gatewayId);
+ oprot.writeString(struct.tokenId);
}
@Override
@@ -2606,6 +2806,8 @@ import org.slf4j.LoggerFactory;
struct.setTaskIdIsSet(true);
struct.gatewayId = iprot.readString();
struct.setGatewayIdIsSet(true);
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9b282db..8e17a38 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -237,12 +237,12 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
* @param taskId
* @param gatewayId
*/
- public boolean submitJob(String experimentId, String taskId, String gatewayId) throws TException {
+ public boolean submitJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
requestCount++;
logger.info("-----------------------------------------------------" + requestCount + "-----------------------------------------------------");
logger.infoId(experimentId, "GFac Received submit job request for the Experiment: {} TaskId: {}", experimentId, taskId);
GFac gfac = getGfac();
- InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId);
+ InputHandlerWorker inputHandlerWorker = new InputHandlerWorker(gfac, experimentId, taskId, gatewayId, tokenId);
// try {
// if( gfac.submitJob(experimentId, taskId, gatewayId)){
logger.debugId(experimentId, "Submitted job to the Gfac Implementation, experiment {}, task {}, gateway " +
@@ -254,11 +254,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
return true;
}
- public boolean cancelJob(String experimentId, String taskId, String gatewayId) throws TException {
+ public boolean cancelJob(String experimentId, String taskId, String gatewayId, String tokenId) throws TException {
logger.infoId(experimentId, "GFac Received cancel job request for Experiment: {} TaskId: {} ", experimentId, taskId);
GFac gfac = getGfac();
try {
- if (gfac.cancel(experimentId, taskId, gatewayId)) {
+ if (gfac.cancel(experimentId, taskId, gatewayId, tokenId)) {
logger.debugId(experimentId, "Successfully cancelled job, experiment {} , task {}", experimentId, taskId);
return true;
} else {
@@ -375,7 +375,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
try {
GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
AiravataZKUtils.getExpStatePath(event.getExperimentId());
- submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+ submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
} catch (KeeperException e) {
logger.error(nodeName + " was interrupted.");
rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
@@ -395,7 +395,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+ cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
System.out.println(" Message Received with message id '" + message.getMessageId()
+ "' and with message type '" + message.getType());
} catch (TException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index d5e623e..6eeef28 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -209,10 +209,11 @@ public class BetterGfacImpl implements GFac,Watcher {
* @return
* @throws GFacException
*/
- public boolean submitJob(String experimentID, String taskID, String gatewayID) throws GFacException {
+ public boolean submitJob(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
+ jobExecutionContext.setCredentialStoreToken(tokenId);
return submitJob(jobExecutionContext);
} catch (Exception e) {
log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
@@ -316,7 +317,6 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
jobExecutionContext.setGfac(this);
jobExecutionContext.setZk(zk);
- jobExecutionContext.setCredentialStoreToken(AiravataZKUtils.getExpTokenId(zk, experimentID));
// handle job submission protocol
List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
@@ -550,10 +550,11 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException {
+ public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
+ jobExecutionContext.setCredentialStoreToken(tokenId);
return cancel(jobExecutionContext);
} catch (Exception e) {
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
index 10656b7..5cf9d00 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFac.java
@@ -37,7 +37,7 @@ public interface GFac {
* @return boolean Successful acceptence of the jobExecution returns a true value
* @throws org.apache.airavata.gfac.GFacException
*/
- public boolean submitJob(String experimentID,String taskID, String gatewayID) throws GFacException;
+ public boolean submitJob(String experimentID,String taskID, String gatewayID, String tokenId) throws GFacException;
/**
* This method can be used in a handler to ivvoke outhandler asynchronously
@@ -58,6 +58,6 @@ public interface GFac {
* @return Successful cancellation will return true
* @throws GFacException
*/
- public boolean cancel(String experimentID, String taskID, String gatewayID)throws GFacException;
+ public boolean cancel(String experimentID, String taskID, String gatewayID, String tokenId)throws GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 434f6d4..1e9e212 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1166,13 +1166,7 @@ public class GFacUtils {
zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- Stat expParent = zk.exists(newExpNode, false);
- if (tokenId != null && expParent != null) {
- zk.setData(newExpNode, tokenId.getBytes(),
- expParent.getVersion());
- }
- String token = AiravataZKUtils.getExpTokenId(zk, experimentID);
String s = zk.create(newExpNode + File.separator + "state", String
.valueOf(GfacExperimentState.LAUNCHED.getValue())
.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
index 24d8b75..36caf76 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/InputHandlerWorker.java
@@ -20,35 +20,31 @@
*/
package org.apache.airavata.gfac.core.utils;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
-
public class InputHandlerWorker implements Runnable {
private static Logger log = LoggerFactory.getLogger(InputHandlerWorker.class);
String experimentId;
-
String taskId;
-
String gatewayId;
+ String tokenId;
GFac gfac;
- public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId) {
+ public InputHandlerWorker(GFac gfac, String experimentId,String taskId,String gatewayId, String tokenId) {
this.gfac = gfac;
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
@Override
public void run() {
try {
- gfac.submitJob(experimentId, taskId, gatewayId);
+ gfac.submitJob(experimentId, taskId, gatewayId, tokenId);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift b/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift
index 30f19a6..93d62c7 100644
--- a/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift
+++ b/modules/gfac/gfac-thrift-descriptions/gfac.cpi.service.thrift
@@ -46,7 +46,8 @@ service GfacService {
**/
bool submitJob (1: required string experimentId,
2: required string taskId
- 3: required string gatewayId)
+ 3: required string gatewayId,
+ 4: required string tokenId)
/**
*
@@ -62,5 +63,6 @@ service GfacService {
**/
bool cancelJob (1: required string experimentId,
2: required string taskId,
- 3: required string gatewayId)
+ 3: required string gatewayId,
+ 4: required string tokenId)
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index 9f4d919..d6fe40f 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -84,7 +84,7 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
if(gatewayId == null || gatewayId.isEmpty()){
gatewayId = ServerSettings.getDefaultUserGateway();
}
- return gfac.submitJob(experimentID, taskID, gatewayId);
+ return gfac.submitJob(experimentID, taskID, gatewayId, tokenId);
} catch (Exception e) {
String error = "Error launching the job : " + experimentID;
logger.error(error);
http://git-wip-us.apache.org/repos/asf/airavata/blob/43c313f9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 32a5e69..5ebefcd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -532,7 +532,7 @@
<module>modules/test-suite</module>
<module>modules/distribution</module>
<module>modules/messaging</module>
- <module>modules/integration-tests</module>
+ <!--module>modules/integration-tests</module-->
<module>modules/workflow</module>
<module>modules/xbaya-gui</module>
</modules>