You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:32 UTC
[05/15] airavata git commit: adding curator leader election logic
adding curator leader election logic
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a486b67d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a486b67d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a486b67d
Branch: refs/heads/master
Commit: a486b67d9187294bae14dd15d4d5b90a84484c73
Parents: b6bf782
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Mon Feb 16 22:38:41 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Mon Feb 16 22:38:41 2015 -0500
----------------------------------------------------------------------
.../lib/airavata/messagingEvents_types.cpp | 20 +++-
.../lib/airavata/messagingEvents_types.h | 13 ++-
.../Airavata/Model/Messaging/Event/Types.php | 20 ++++
.../client/samples/CreateLaunchExperiment.java | 2 +-
.../model/messaging/event/TaskSubmitEvent.java | 100 +++++++++++++++-
airavata-api/generate-thrift-files.sh | 2 +-
.../messagingEvents.thrift | 3 +-
modules/gfac/airavata-gfac-service/pom.xml | 10 ++
.../airavata/gfac/leader/CuratorClient.java | 79 +++++++++++++
.../gfac/leader/LeaderSelectorExample.java | 80 +++++++++++++
.../airavata/gfac/server/GfacServerHandler.java | 112 +++++++++++++++---
modules/gfac/gfac-core/pom.xml | 1 +
.../airavata/gfac/core/utils/GFacUtils.java | 116 ++++++++++++++++++-
modules/orchestrator/orchestrator-core/pom.xml | 10 --
.../core/impl/GFACPassiveJobSubmitter.java | 46 +++-----
.../core/impl/GFACRPCJobSubmitter.java | 4 +-
16 files changed, 547 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index a2e72f5..71f45be 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -839,8 +839,8 @@ void swap(JobIdentifier &a, JobIdentifier &b) {
swap(a.gatewayId, b.gatewayId);
}
-const char* TaskSubmitEvent::ascii_fingerprint = "AB879940BD15B6B25691265F7384B271";
-const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2";
+const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
@@ -856,6 +856,7 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
bool isset_experimentId = false;
bool isset_taskId = false;
bool isset_gatewayId = false;
+ bool isset_tokenId = false;
while (true)
{
@@ -889,6 +890,14 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
xfer += iprot->skip(ftype);
}
break;
+ case 4:
+ if (ftype == ::apache::thrift::protocol::T_STRING) {
+ xfer += iprot->readString(this->tokenId);
+ isset_tokenId = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -904,6 +913,8 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
throw TProtocolException(TProtocolException::INVALID_DATA);
if (!isset_gatewayId)
throw TProtocolException(TProtocolException::INVALID_DATA);
+ if (!isset_tokenId)
+ throw TProtocolException(TProtocolException::INVALID_DATA);
return xfer;
}
@@ -923,6 +934,10 @@ uint32_t TaskSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) co
xfer += oprot->writeString(this->gatewayId);
xfer += oprot->writeFieldEnd();
+ xfer += oprot->writeFieldBegin("tokenId", ::apache::thrift::protocol::T_STRING, 4);
+ xfer += oprot->writeString(this->tokenId);
+ xfer += oprot->writeFieldEnd();
+
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -933,6 +948,7 @@ void swap(TaskSubmitEvent &a, TaskSubmitEvent &b) {
swap(a.experimentId, b.experimentId);
swap(a.taskId, b.taskId);
swap(a.gatewayId, b.gatewayId);
+ swap(a.tokenId, b.tokenId);
}
const char* TaskTerminateEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972";
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index f063fc2..c7e2bb5 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -465,10 +465,10 @@ void swap(JobIdentifier &a, JobIdentifier &b);
class TaskSubmitEvent {
public:
- static const char* ascii_fingerprint; // = "AB879940BD15B6B25691265F7384B271";
- static const uint8_t binary_fingerprint[16]; // = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+ static const char* ascii_fingerprint; // = "C93D890311F28844166CF6E571EB3AC2";
+ static const uint8_t binary_fingerprint[16]; // = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
- TaskSubmitEvent() : experimentId(), taskId(), gatewayId() {
+ TaskSubmitEvent() : experimentId(), taskId(), gatewayId(), tokenId() {
}
virtual ~TaskSubmitEvent() throw() {}
@@ -476,6 +476,7 @@ class TaskSubmitEvent {
std::string experimentId;
std::string taskId;
std::string gatewayId;
+ std::string tokenId;
void __set_experimentId(const std::string& val) {
experimentId = val;
@@ -489,6 +490,10 @@ class TaskSubmitEvent {
gatewayId = val;
}
+ void __set_tokenId(const std::string& val) {
+ tokenId = val;
+ }
+
bool operator == (const TaskSubmitEvent & rhs) const
{
if (!(experimentId == rhs.experimentId))
@@ -497,6 +502,8 @@ class TaskSubmitEvent {
return false;
if (!(gatewayId == rhs.gatewayId))
return false;
+ if (!(tokenId == rhs.tokenId))
+ return false;
return true;
}
bool operator != (const TaskSubmitEvent &rhs) const {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index 40810d3..b0d7676 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -977,6 +977,7 @@ class TaskSubmitEvent {
public $experimentId = null;
public $taskId = null;
public $gatewayId = null;
+ public $tokenId = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -993,6 +994,10 @@ class TaskSubmitEvent {
'var' => 'gatewayId',
'type' => TType::STRING,
),
+ 4 => array(
+ 'var' => 'tokenId',
+ 'type' => TType::STRING,
+ ),
);
}
if (is_array($vals)) {
@@ -1005,6 +1010,9 @@ class TaskSubmitEvent {
if (isset($vals['gatewayId'])) {
$this->gatewayId = $vals['gatewayId'];
}
+ if (isset($vals['tokenId'])) {
+ $this->tokenId = $vals['tokenId'];
+ }
}
}
@@ -1048,6 +1056,13 @@ class TaskSubmitEvent {
$xfer += $input->skip($ftype);
}
break;
+ case 4:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->tokenId);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -1076,6 +1091,11 @@ class TaskSubmitEvent {
$xfer += $output->writeString($this->gatewayId);
$xfer += $output->writeFieldEnd();
}
+ if ($this->tokenId !== null) {
+ $xfer += $output->writeFieldBegin('tokenId', TType::STRING, 4);
+ $xfer += $output->writeString($this->tokenId);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 8483da7..b7121b9 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -60,7 +60,7 @@ public class CreateLaunchExperiment {
private static final String DEFAULT_GATEWAY = "default.registry.gateway";
private static Airavata.Client airavataClient;
- private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104";
+ private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
index c813c76..71d497e 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
+ private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -65,12 +66,14 @@ import org.slf4j.LoggerFactory;
private String experimentId; // required
private String taskId; // required
private String gatewayId; // required
+ private String tokenId; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
@SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
EXPERIMENT_ID((short)1, "experimentId"),
TASK_ID((short)2, "taskId"),
- GATEWAY_ID((short)3, "gatewayId");
+ GATEWAY_ID((short)3, "gatewayId"),
+ TOKEN_ID((short)4, "tokenId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -91,6 +94,8 @@ import org.slf4j.LoggerFactory;
return TASK_ID;
case 3: // GATEWAY_ID
return GATEWAY_ID;
+ case 4: // TOKEN_ID
+ return TOKEN_ID;
default:
return null;
}
@@ -140,6 +145,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskSubmitEvent.class, metaDataMap);
}
@@ -150,12 +157,14 @@ import org.slf4j.LoggerFactory;
public TaskSubmitEvent(
String experimentId,
String taskId,
- String gatewayId)
+ String gatewayId,
+ String tokenId)
{
this();
this.experimentId = experimentId;
this.taskId = taskId;
this.gatewayId = gatewayId;
+ this.tokenId = tokenId;
}
/**
@@ -171,6 +180,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetGatewayId()) {
this.gatewayId = other.gatewayId;
}
+ if (other.isSetTokenId()) {
+ this.tokenId = other.tokenId;
+ }
}
public TaskSubmitEvent deepCopy() {
@@ -182,6 +194,7 @@ import org.slf4j.LoggerFactory;
this.experimentId = null;
this.taskId = null;
this.gatewayId = null;
+ this.tokenId = null;
}
public String getExperimentId() {
@@ -253,6 +266,29 @@ import org.slf4j.LoggerFactory;
}
}
+ public String getTokenId() {
+ return this.tokenId;
+ }
+
+ public void setTokenId(String tokenId) {
+ this.tokenId = tokenId;
+ }
+
+ public void unsetTokenId() {
+ this.tokenId = null;
+ }
+
+ /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */
+ public boolean isSetTokenId() {
+ return this.tokenId != null;
+ }
+
+ public void setTokenIdIsSet(boolean value) {
+ if (!value) {
+ this.tokenId = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case EXPERIMENT_ID:
@@ -279,6 +315,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case TOKEN_ID:
+ if (value == null) {
+ unsetTokenId();
+ } else {
+ setTokenId((String)value);
+ }
+ break;
+
}
}
@@ -293,6 +337,9 @@ import org.slf4j.LoggerFactory;
case GATEWAY_ID:
return getGatewayId();
+ case TOKEN_ID:
+ return getTokenId();
+
}
throw new IllegalStateException();
}
@@ -310,6 +357,8 @@ import org.slf4j.LoggerFactory;
return isSetTaskId();
case GATEWAY_ID:
return isSetGatewayId();
+ case TOKEN_ID:
+ return isSetTokenId();
}
throw new IllegalStateException();
}
@@ -354,6 +403,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_tokenId = true && this.isSetTokenId();
+ boolean that_present_tokenId = true && that.isSetTokenId();
+ if (this_present_tokenId || that_present_tokenId) {
+ if (!(this_present_tokenId && that_present_tokenId))
+ return false;
+ if (!this.tokenId.equals(that.tokenId))
+ return false;
+ }
+
return true;
}
@@ -400,6 +458,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetTokenId()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -443,6 +511,14 @@ import org.slf4j.LoggerFactory;
sb.append(this.gatewayId);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("tokenId:");
+ if (this.tokenId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.tokenId);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -461,6 +537,10 @@ import org.slf4j.LoggerFactory;
throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString());
}
+ if (!isSetTokenId()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' is unset! Struct:" + toString());
+ }
+
// check for sub-struct validity
}
@@ -522,6 +602,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 4: // TOKEN_ID
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -550,6 +638,11 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.gatewayId);
oprot.writeFieldEnd();
}
+ if (struct.tokenId != null) {
+ oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC);
+ oprot.writeString(struct.tokenId);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -570,6 +663,7 @@ import org.slf4j.LoggerFactory;
oprot.writeString(struct.experimentId);
oprot.writeString(struct.taskId);
oprot.writeString(struct.gatewayId);
+ oprot.writeString(struct.tokenId);
}
@Override
@@ -581,6 +675,8 @@ import org.slf4j.LoggerFactory;
struct.setTaskIdIsSet(true);
struct.gatewayId = iprot.readString();
struct.setGatewayIdIsSet(true);
+ struct.tokenId = iprot.readString();
+ struct.setTokenIdIsSet(true);
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/generate-thrift-files.sh
----------------------------------------------------------------------
diff --git a/airavata-api/generate-thrift-files.sh b/airavata-api/generate-thrift-files.sh
index bd823e4..c8a000d 100755
--- a/airavata-api/generate-thrift-files.sh
+++ b/airavata-api/generate-thrift-files.sh
@@ -27,7 +27,7 @@ DATAMODEL_SRC_DIR='airavata-data-models/src/main/java'
JAVA_API_SDK_DIR='airavata-api-stubs/src/main/java'
CPP_SDK_DIR='airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/'
PHP_SDK_DIR='airavata-client-sdks/airavata-php-sdk/src/main/resources/lib'
-THRIFT_EXEC=thrift
+THRIFT_EXEC=/usr/local/Cellar/thrift/0.9.1/bin/thrift
# The Function fail prints error messages on failure and quits the script.
fail() {
echo $@
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index d736701..d9e85d4 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -105,7 +105,8 @@ struct JobIdentifier {
struct TaskSubmitEvent{
1: required string experimentId,
2: required string taskId,
- 3: required string gatewayId
+ 3: required string gatewayId,
+ 4: required string tokenId
}
struct TaskTerminateEvent{
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index 0884942..5a178bd 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -80,6 +80,16 @@
<artifactId>airavata-server-configuration</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>${curator.version}</version>
+ </dependency>
</dependencies>
<properties>
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
new file mode 100644
index 0000000..2db9a6f
--- /dev/null
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.gfac.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which
+ * has the recommended handling for connection state issues
+ */
+public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
+ private final String name;
+ private final LeaderSelector leaderSelector;
+ private final AtomicInteger leaderCount = new AtomicInteger();
+
+ public CuratorClient(CuratorFramework client, String path, String name) {
+ this.name = name;
+
+ // create a leader selector using the given path for management
+ // all participants in a given leader selection must use the same path
+ // ExampleClient here is also a LeaderSelectorListener but this isn't required
+ leaderSelector = new LeaderSelector(client, path, this);
+
+ // for most cases you will want your instance to requeue when it relinquishes leadership
+ leaderSelector.autoRequeue();
+ }
+
+ public void start() throws IOException {
+ // the selection for this instance doesn't start until the leader selector is started
+ // leader selection is done in the background so this call to leaderSelector.start() returns immediately
+ leaderSelector.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ leaderSelector.close();
+ }
+
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception {
+ // we are now the leader. This method should not return until we want to relinquish leadership
+
+ final int waitSeconds = (int) (5 * Math.random()) + 1;
+
+ System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+ System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+ try {
+ Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
+ } catch (InterruptedException e) {
+ System.err.println(name + " was interrupted.");
+ Thread.currentThread().interrupt();
+ } finally {
+ System.out.println(name + " relinquishing leadership.\n");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
new file mode 100644
index 0000000..ad02641
--- /dev/null
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.leader;
+
+import com.google.common.collect.Lists;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.List;
+
+public class LeaderSelectorExample {
+ private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class);
+ private static final int CLIENT_QTY = 10;
+
+ private static final String PATH = "/examples/leader";
+
+ public static void main(String[] args) throws Exception
+ {
+ // all of the useful sample code is in ExampleClient.java
+
+ System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur.");
+ System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times.");
+
+ try
+ {
+ for ( int i = 0; i < CLIENT_QTY; ++i )
+ {
+ CuratorFramework client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
+
+ CuratorClient example = new CuratorClient(client, PATH, "Client #" + i);
+
+ client.start();
+ example.start();
+ }
+
+ System.out.println("Press enter/return to quit\n");
+ new BufferedReader(new InputStreamReader(System.in)).readLine();
+ }
+ finally
+ {
+ System.out.println("Shutting down...");
+
+ /*for ( CuratorClient exampleClient : examples )
+ {
+ CloseableUtils.closeQuietly(exampleClient);
+ }
+ for ( CuratorFramework client : clients )
+ {
+ CloseableUtils.closeQuietly(client);
+ }*/
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c8f1100..c838703 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -31,31 +31,38 @@ import org.apache.airavata.common.utils.*;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFac;
import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
+import org.apache.airavata.gfac.leader.CuratorClient;
import org.apache.airavata.messaging.core.MessageContext;
import org.apache.airavata.messaging.core.MessageHandler;
import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
import org.apache.airavata.model.messaging.event.MessageType;
import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
public class GfacServerHandler implements GfacService.Iface, Watcher{
@@ -88,6 +95,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
+ CuratorFramework curatorFramework = null;
+
+
public GfacServerHandler() throws Exception{
// registering with zk
try {
@@ -114,6 +124,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
if(ServerSettings.isGFacPassiveMode()) {
rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+ curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
}
@@ -229,14 +240,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
inHandlerFutures.add(GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker));
// we immediately return when we have a threadpool
return true;
-// }else{
-// logger.error(experimentId, "Failed to submit job to the GFac implementation, experiment {}, task {}, " +
-// "gateway {}", experimentId, taskId, gatewayId);
-// return false;
-// }
-// } catch (GFacException e) {
-// throw new TException("Error launching the experiment : " + e.getMessage(), e);
-// }
}
public boolean cancelJob(String experimentId, String taskId) throws TException {
@@ -295,10 +298,26 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
private class TaskLaunchMessageHandler implements MessageHandler {
private String experimentId;
-
+ private String nodeName;
+
+ public TaskLaunchMessageHandler(){
+ try {
+ nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+
public Map<String, Object> getProperties() {
Map<String, Object> props = new HashMap<String, Object>();
- props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
+ try {
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+ } catch (ApplicationSettingsException e) {
+ // if we cannot find gfac node name configured we set a random id
+ logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value.");
+ logger.error("listening to a random generated routing key");
+ props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
+ }
return props;
}
@@ -309,10 +328,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
TBase messageEvent = message.getEvent();
byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
ThriftUtils.createThriftFromBytes(bytes, event);
- submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
- System.out.println(" Message Received with message id '" + message.getMessageId()
- + "' and with message type '" + message.getType());
- } catch (TException e) {
+ CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName);
+ try {
+ curatorClient.start();
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+
+ System.out.println(" Message Received with message id '" + message.getMessageId()
+ + "' and with message type '" + message.getType());
+ } catch (TException e) {
logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
}
}else if(message.getType().equals(MessageType.TERMINATETASK)){
@@ -331,4 +356,57 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
}
}
+ public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
+ private final String name;
+ private final LeaderSelector leaderSelector;
+ private final AtomicInteger leaderCount = new AtomicInteger();
+ private final String path;
+ private TaskSubmitEvent event;
+ private String experimentNode;
+
+ public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) {
+ this.name = name;
+ this.event = taskSubmitEvent;
+ this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId();
+ // create a leader selector using the given path for management
+ // all participants in a given leader selection must use the same path
+ // ExampleClient here is also a LeaderSelectorListener but this isn't required
+ leaderSelector = new LeaderSelector(client, path, this);
+ experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ // for most cases you will want your instance to requeue when it relinquishes leadership
+ leaderSelector.autoRequeue();
+ }
+
+ public void start() throws IOException {
+ // the selection for this instance doesn't start until the leader selector is started
+ // leader selection is done in the background so this call to leaderSelector.start() returns immediately
+ leaderSelector.start();
+ }
+
+ @Override
+ public void close() throws IOException {
+ leaderSelector.close();
+ }
+
+ @Override
+ public void takeLeadership(CuratorFramework client) throws Exception {
+ // we are now the leader. This method should not return until we want to relinquish leadership
+ final int waitSeconds = (int) (5 * Math.random()) + 1;
+
+ System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+ System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+
+ try {
+ GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
+ submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+ Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
+ } catch (InterruptedException e) {
+ System.err.println(name + " was interrupted.");
+ Thread.currentThread().interrupt();
+ } finally {
+ System.out.println(name + " relinquishing leadership.\n");
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 4fc2a15..2a42503 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -131,6 +131,7 @@
<artifactId>zookeeper</artifactId>
<version>3.4.0</version>
</dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index cbbce48..9f104fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1044,9 +1044,9 @@ public class GFacUtils {
}
// This method is dangerous because of moving the experiment data
- public static boolean createExperimentEntry(String experimentID,
- String taskID, ZooKeeper zk, String experimentNode,
- String pickedChild, String tokenId) throws KeeperException,
+ public static boolean createExperimentEntryForRPC(String experimentID,
+ String taskID, ZooKeeper zk, String experimentNode,
+ String pickedChild, String tokenId) throws KeeperException,
InterruptedException {
String experimentPath = experimentNode + File.separator + pickedChild;
String newExpNode = experimentPath + File.separator + experimentID
@@ -1153,6 +1153,116 @@ public class GFacUtils {
return true;
}
+ // This method is dangerous because of moving the experiment data
+ public static boolean createExperimentEntryForPassive(String experimentID,
+ String taskID, ZooKeeper zk, String experimentNode,
+ String pickedChild, String tokenId) throws KeeperException,
+ InterruptedException {
+ String experimentPath = experimentNode + File.separator + pickedChild;
+ String newExpNode = experimentPath + File.separator + experimentID
+ + "+" + taskID;
+ Stat exists1 = zk.exists(newExpNode, false);
+ String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
+ String foundExperimentPath = null;
+ if (exists1 == null && experimentEntry == null) { // this means this is a very new experiment
+ List<String> runningGfacNodeNames = AiravataZKUtils
+ .getAllGfacNodeNames(zk); // here we take old gfac servers
+ // too
+ for (String gfacServerNode : runningGfacNodeNames) {
+ if (!gfacServerNode.equals(pickedChild)) {
+ foundExperimentPath = experimentNode + File.separator
+ + gfacServerNode + File.separator + experimentID
+ + "+" + taskID;
+ exists1 = zk.exists(foundExperimentPath, false);
+ if (exists1 != null) { // when the experiment is found we
+ // break the loop
+ break;
+ }
+ }
+ }
+ if (exists1 == null) { // OK this is a pretty new experiment so we
+ // are going to create a new node
+ log.info("This is a new Job, so creating all the experiment docs from the scratch");
+ zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat expParent = zk.exists(newExpNode, false);
+ if (tokenId != null && expParent != null) {
+ zk.setData(newExpNode, tokenId.getBytes(),
+ expParent.getVersion());
+ }
+ zk.create(newExpNode + File.separator + "state", String
+ .valueOf(GfacExperimentState.LAUNCHED.getValue())
+ .getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ } else {
+ // ohhh this node exists in some other failed gfac folder, we
+ // have to move it to this gfac experiment list,safely
+ log.info("This is an old Job, so copying data from old experiment location");
+ zk.create(newExpNode,
+ zk.getData(foundExperimentPath, false, exists1),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ List<String> children = zk.getChildren(foundExperimentPath,
+ false);
+ for (String childNode1 : children) {
+ String level1 = foundExperimentPath + File.separator
+ + childNode1;
+ Stat exists2 = zk.exists(level1, false); // no need to check
+ // exists
+ String newLeve1 = newExpNode + File.separator + childNode1;
+ log.info("Creating new znode: " + newLeve1); // these has to
+ // be info
+ // logs
+ zk.create(newLeve1, zk.getData(level1, false, exists2),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ for (String childNode2 : zk.getChildren(level1, false)) {
+ String level2 = level1 + File.separator + childNode2;
+ Stat exists3 = zk.exists(level2, false); // no need to
+ // check
+ // exists
+ String newLeve2 = newLeve1 + File.separator
+ + childNode2;
+ log.info("Creating new znode: " + newLeve2);
+ zk.create(newLeve2, zk.getData(level2, false, exists3),
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ }
+ // After all the files are successfully transfered we delete the
+ // old experiment,otherwise we do
+ // not delete a single file
+ log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+ log.info("Deleting experiment data: " + foundExperimentPath);
+ ZKUtil.deleteRecursive(zk, foundExperimentPath);
+ }
+ }else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
+ // this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
+ // node to gfac node specific location, because original request execution will fail with errors
+ log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
+ return false;
+ } else {
+ log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+ + " is already running by this Gfac instance");
+ List<String> runningGfacNodeNames = AiravataZKUtils
+ .getAllGfacNodeNames(zk); // here we take old gfac servers
+ // too
+ for (String gfacServerNode : runningGfacNodeNames) {
+ if (!gfacServerNode.equals(pickedChild)) {
+ foundExperimentPath = experimentNode + File.separator
+ + gfacServerNode + File.separator + experimentID
+ + "+" + taskID;
+ break;
+ }
+ }
+ ZKUtil.deleteRecursive(zk, foundExperimentPath);
+ }
+ return true;
+ }
+
public static String findExperimentEntry(String experimentID,
String taskID, ZooKeeper zk
) throws KeeperException,
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 23863fb..99c0abb 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -127,16 +127,6 @@ the License. -->
<artifactId>zookeeper</artifactId>
<version>${zk.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>${curator.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>${curator.version}</version>
- </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index bfe2b16..78cc6b7 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -39,7 +39,6 @@ import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -113,40 +112,29 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
}
}
String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
- String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
List<String> children = zk.getChildren(gfacServer, this);
if (children.size() == 0) {
// Zookeeper data need cleaning
throw new OrchestratorException("There is no active GFac instance to route the request");
} else {
- String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
- // here we are not using an index because the getChildren does not return the same order everytime
- String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
- logger.info("GFAC instance node data: " + gfacNodeData);
- String[] split = gfacNodeData.split(":");
- gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
- if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
- // before submitting the job we check again the state of the node
- if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
- String gatewayId = null;
- CredentialReader credentialReader = GFacUtils.getCredentialReader();
- if (credentialReader != null) {
- try {
- gatewayId = credentialReader.getGatewayID(tokenId);
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- if(gatewayId == null || gatewayId.isEmpty()){
- gatewayId = ServerSettings.getDefaultUserGateway();
- }
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId);
- MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
- messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
- publisher.publish(messageContext);
+ String gatewayId = null;
+ CredentialReader credentialReader = GFacUtils.getCredentialReader();
+ if (credentialReader != null) {
+ try {
+ gatewayId = credentialReader.getGatewayID(tokenId);
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage());
}
}
+ if(gatewayId == null || gatewayId.isEmpty()){
+ gatewayId = ServerSettings.getDefaultUserGateway();
+ }
+
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId,tokenId);
+ MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+ messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+ publisher.publish(messageContext);
}
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
@@ -204,8 +192,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
// before submitting the job we check again the state of the node
- if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
- TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null);
+ if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
+ TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null,null);
MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null);
publisher.publish(messageContext);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
index 54339a2..b855de2 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
@@ -99,7 +99,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
// before submitting the job we check again the state of the node
- if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
+ if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
String gatewayId = null;
CredentialReader credentialReader = GFacUtils.getCredentialReader();
if (credentialReader != null) {
@@ -167,7 +167,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
// before submitting the job we check again the state of the node
- if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
+ if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
return localhost.cancelJob(experimentID, taskID);
}
}