You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/03/23 20:38:32 UTC

[05/15] airavata git commit: adding curator leader election logic

adding curator leader election logic


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a486b67d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a486b67d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a486b67d

Branch: refs/heads/master
Commit: a486b67d9187294bae14dd15d4d5b90a84484c73
Parents: b6bf782
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Mon Feb 16 22:38:41 2015 -0500
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Mon Feb 16 22:38:41 2015 -0500

----------------------------------------------------------------------
 .../lib/airavata/messagingEvents_types.cpp      |  20 +++-
 .../lib/airavata/messagingEvents_types.h        |  13 ++-
 .../Airavata/Model/Messaging/Event/Types.php    |  20 ++++
 .../client/samples/CreateLaunchExperiment.java  |   2 +-
 .../model/messaging/event/TaskSubmitEvent.java  | 100 +++++++++++++++-
 airavata-api/generate-thrift-files.sh           |   2 +-
 .../messagingEvents.thrift                      |   3 +-
 modules/gfac/airavata-gfac-service/pom.xml      |  10 ++
 .../airavata/gfac/leader/CuratorClient.java     |  79 +++++++++++++
 .../gfac/leader/LeaderSelectorExample.java      |  80 +++++++++++++
 .../airavata/gfac/server/GfacServerHandler.java | 112 +++++++++++++++---
 modules/gfac/gfac-core/pom.xml                  |   1 +
 .../airavata/gfac/core/utils/GFacUtils.java     | 116 ++++++++++++++++++-
 modules/orchestrator/orchestrator-core/pom.xml  |  10 --
 .../core/impl/GFACPassiveJobSubmitter.java      |  46 +++-----
 .../core/impl/GFACRPCJobSubmitter.java          |   4 +-
 16 files changed, 547 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
index a2e72f5..71f45be 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.cpp
@@ -839,8 +839,8 @@ void swap(JobIdentifier &a, JobIdentifier &b) {
   swap(a.gatewayId, b.gatewayId);
 }
 
-const char* TaskSubmitEvent::ascii_fingerprint = "AB879940BD15B6B25691265F7384B271";
-const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+const char* TaskSubmitEvent::ascii_fingerprint = "C93D890311F28844166CF6E571EB3AC2";
+const uint8_t TaskSubmitEvent::binary_fingerprint[16] = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
 
 uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
 
@@ -856,6 +856,7 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
   bool isset_experimentId = false;
   bool isset_taskId = false;
   bool isset_gatewayId = false;
+  bool isset_tokenId = false;
 
   while (true)
   {
@@ -889,6 +890,14 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
           xfer += iprot->skip(ftype);
         }
         break;
+      case 4:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->tokenId);
+          isset_tokenId = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -904,6 +913,8 @@ uint32_t TaskSubmitEvent::read(::apache::thrift::protocol::TProtocol* iprot) {
     throw TProtocolException(TProtocolException::INVALID_DATA);
   if (!isset_gatewayId)
     throw TProtocolException(TProtocolException::INVALID_DATA);
+  if (!isset_tokenId)
+    throw TProtocolException(TProtocolException::INVALID_DATA);
   return xfer;
 }
 
@@ -923,6 +934,10 @@ uint32_t TaskSubmitEvent::write(::apache::thrift::protocol::TProtocol* oprot) co
   xfer += oprot->writeString(this->gatewayId);
   xfer += oprot->writeFieldEnd();
 
+  xfer += oprot->writeFieldBegin("tokenId", ::apache::thrift::protocol::T_STRING, 4);
+  xfer += oprot->writeString(this->tokenId);
+  xfer += oprot->writeFieldEnd();
+
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -933,6 +948,7 @@ void swap(TaskSubmitEvent &a, TaskSubmitEvent &b) {
   swap(a.experimentId, b.experimentId);
   swap(a.taskId, b.taskId);
   swap(a.gatewayId, b.gatewayId);
+  swap(a.tokenId, b.tokenId);
 }
 
 const char* TaskTerminateEvent::ascii_fingerprint = "07A9615F837F7D0A952B595DD3020972";

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
index f063fc2..c7e2bb5 100644
--- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
+++ b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/messagingEvents_types.h
@@ -465,10 +465,10 @@ void swap(JobIdentifier &a, JobIdentifier &b);
 class TaskSubmitEvent {
  public:
 
-  static const char* ascii_fingerprint; // = "AB879940BD15B6B25691265F7384B271";
-  static const uint8_t binary_fingerprint[16]; // = {0xAB,0x87,0x99,0x40,0xBD,0x15,0xB6,0xB2,0x56,0x91,0x26,0x5F,0x73,0x84,0xB2,0x71};
+  static const char* ascii_fingerprint; // = "C93D890311F28844166CF6E571EB3AC2";
+  static const uint8_t binary_fingerprint[16]; // = {0xC9,0x3D,0x89,0x03,0x11,0xF2,0x88,0x44,0x16,0x6C,0xF6,0xE5,0x71,0xEB,0x3A,0xC2};
 
-  TaskSubmitEvent() : experimentId(), taskId(), gatewayId() {
+  TaskSubmitEvent() : experimentId(), taskId(), gatewayId(), tokenId() {
   }
 
   virtual ~TaskSubmitEvent() throw() {}
@@ -476,6 +476,7 @@ class TaskSubmitEvent {
   std::string experimentId;
   std::string taskId;
   std::string gatewayId;
+  std::string tokenId;
 
   void __set_experimentId(const std::string& val) {
     experimentId = val;
@@ -489,6 +490,10 @@ class TaskSubmitEvent {
     gatewayId = val;
   }
 
+  void __set_tokenId(const std::string& val) {
+    tokenId = val;
+  }
+
   bool operator == (const TaskSubmitEvent & rhs) const
   {
     if (!(experimentId == rhs.experimentId))
@@ -497,6 +502,8 @@ class TaskSubmitEvent {
       return false;
     if (!(gatewayId == rhs.gatewayId))
       return false;
+    if (!(tokenId == rhs.tokenId))
+      return false;
     return true;
   }
   bool operator != (const TaskSubmitEvent &rhs) const {

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
index 40810d3..b0d7676 100644
--- a/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
+++ b/airavata-api/airavata-client-sdks/airavata-php-sdk/src/main/resources/lib/Airavata/Model/Messaging/Event/Types.php
@@ -977,6 +977,7 @@ class TaskSubmitEvent {
   public $experimentId = null;
   public $taskId = null;
   public $gatewayId = null;
+  public $tokenId = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -993,6 +994,10 @@ class TaskSubmitEvent {
           'var' => 'gatewayId',
           'type' => TType::STRING,
           ),
+        4 => array(
+          'var' => 'tokenId',
+          'type' => TType::STRING,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -1005,6 +1010,9 @@ class TaskSubmitEvent {
       if (isset($vals['gatewayId'])) {
         $this->gatewayId = $vals['gatewayId'];
       }
+      if (isset($vals['tokenId'])) {
+        $this->tokenId = $vals['tokenId'];
+      }
     }
   }
 
@@ -1048,6 +1056,13 @@ class TaskSubmitEvent {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 4:
+          if ($ftype == TType::STRING) {
+            $xfer += $input->readString($this->tokenId);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -1076,6 +1091,11 @@ class TaskSubmitEvent {
       $xfer += $output->writeString($this->gatewayId);
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->tokenId !== null) {
+      $xfer += $output->writeFieldBegin('tokenId', TType::STRING, 4);
+      $xfer += $output->writeString($this->tokenId);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 8483da7..b7121b9 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -60,7 +60,7 @@ public class CreateLaunchExperiment {
     private static final String DEFAULT_GATEWAY = "default.registry.gateway";
     private static Airavata.Client airavataClient;
 
-    private static String echoAppId = "Echo_78e34255-39f3-4c07-add6-a1a672c80104";
+    private static String echoAppId = "Echo_a8fc8511-7b8e-431a-ad0f-de5eb1a9c576";
     private static String mpiAppId = "HelloMPI_720e159f-198f-4daa-96ca-9f5eafee92c9";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
     private static String amberAppId = "Amber_eda074ea-223d-49d7-a942-6c8742249f36";

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
index c813c76..71d497e 100644
--- a/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
+++ b/airavata-api/airavata-data-models/src/main/java/org/apache/airavata/model/messaging/event/TaskSubmitEvent.java
@@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField EXPERIMENT_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("experimentId", org.apache.thrift.protocol.TType.STRING, (short)1);
   private static final org.apache.thrift.protocol.TField TASK_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("taskId", org.apache.thrift.protocol.TType.STRING, (short)2);
   private static final org.apache.thrift.protocol.TField GATEWAY_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("gatewayId", org.apache.thrift.protocol.TType.STRING, (short)3);
+  private static final org.apache.thrift.protocol.TField TOKEN_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("tokenId", org.apache.thrift.protocol.TType.STRING, (short)4);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -65,12 +66,14 @@ import org.slf4j.LoggerFactory;
   private String experimentId; // required
   private String taskId; // required
   private String gatewayId; // required
+  private String tokenId; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     EXPERIMENT_ID((short)1, "experimentId"),
     TASK_ID((short)2, "taskId"),
-    GATEWAY_ID((short)3, "gatewayId");
+    GATEWAY_ID((short)3, "gatewayId"),
+    TOKEN_ID((short)4, "tokenId");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -91,6 +94,8 @@ import org.slf4j.LoggerFactory;
           return TASK_ID;
         case 3: // GATEWAY_ID
           return GATEWAY_ID;
+        case 4: // TOKEN_ID
+          return TOKEN_ID;
         default:
           return null;
       }
@@ -140,6 +145,8 @@ import org.slf4j.LoggerFactory;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.GATEWAY_ID, new org.apache.thrift.meta_data.FieldMetaData("gatewayId", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.TOKEN_ID, new org.apache.thrift.meta_data.FieldMetaData("tokenId", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TaskSubmitEvent.class, metaDataMap);
   }
@@ -150,12 +157,14 @@ import org.slf4j.LoggerFactory;
   public TaskSubmitEvent(
     String experimentId,
     String taskId,
-    String gatewayId)
+    String gatewayId,
+    String tokenId)
   {
     this();
     this.experimentId = experimentId;
     this.taskId = taskId;
     this.gatewayId = gatewayId;
+    this.tokenId = tokenId;
   }
 
   /**
@@ -171,6 +180,9 @@ import org.slf4j.LoggerFactory;
     if (other.isSetGatewayId()) {
       this.gatewayId = other.gatewayId;
     }
+    if (other.isSetTokenId()) {
+      this.tokenId = other.tokenId;
+    }
   }
 
   public TaskSubmitEvent deepCopy() {
@@ -182,6 +194,7 @@ import org.slf4j.LoggerFactory;
     this.experimentId = null;
     this.taskId = null;
     this.gatewayId = null;
+    this.tokenId = null;
   }
 
   public String getExperimentId() {
@@ -253,6 +266,29 @@ import org.slf4j.LoggerFactory;
     }
   }
 
+  public String getTokenId() {
+    return this.tokenId;
+  }
+
+  public void setTokenId(String tokenId) {
+    this.tokenId = tokenId;
+  }
+
+  public void unsetTokenId() {
+    this.tokenId = null;
+  }
+
+  /** Returns true if field tokenId is set (has been assigned a value) and false otherwise */
+  public boolean isSetTokenId() {
+    return this.tokenId != null;
+  }
+
+  public void setTokenIdIsSet(boolean value) {
+    if (!value) {
+      this.tokenId = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case EXPERIMENT_ID:
@@ -279,6 +315,14 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case TOKEN_ID:
+      if (value == null) {
+        unsetTokenId();
+      } else {
+        setTokenId((String)value);
+      }
+      break;
+
     }
   }
 
@@ -293,6 +337,9 @@ import org.slf4j.LoggerFactory;
     case GATEWAY_ID:
       return getGatewayId();
 
+    case TOKEN_ID:
+      return getTokenId();
+
     }
     throw new IllegalStateException();
   }
@@ -310,6 +357,8 @@ import org.slf4j.LoggerFactory;
       return isSetTaskId();
     case GATEWAY_ID:
       return isSetGatewayId();
+    case TOKEN_ID:
+      return isSetTokenId();
     }
     throw new IllegalStateException();
   }
@@ -354,6 +403,15 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_tokenId = true && this.isSetTokenId();
+    boolean that_present_tokenId = true && that.isSetTokenId();
+    if (this_present_tokenId || that_present_tokenId) {
+      if (!(this_present_tokenId && that_present_tokenId))
+        return false;
+      if (!this.tokenId.equals(that.tokenId))
+        return false;
+    }
+
     return true;
   }
 
@@ -400,6 +458,16 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetTokenId()).compareTo(other.isSetTokenId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTokenId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tokenId, other.tokenId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -443,6 +511,14 @@ import org.slf4j.LoggerFactory;
       sb.append(this.gatewayId);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("tokenId:");
+    if (this.tokenId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.tokenId);
+    }
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -461,6 +537,10 @@ import org.slf4j.LoggerFactory;
       throw new org.apache.thrift.protocol.TProtocolException("Required field 'gatewayId' is unset! Struct:" + toString());
     }
 
+    if (!isSetTokenId()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'tokenId' is unset! Struct:" + toString());
+    }
+
     // check for sub-struct validity
   }
 
@@ -522,6 +602,14 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 4: // TOKEN_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.tokenId = iprot.readString();
+              struct.setTokenIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -550,6 +638,11 @@ import org.slf4j.LoggerFactory;
         oprot.writeString(struct.gatewayId);
         oprot.writeFieldEnd();
       }
+      if (struct.tokenId != null) {
+        oprot.writeFieldBegin(TOKEN_ID_FIELD_DESC);
+        oprot.writeString(struct.tokenId);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -570,6 +663,7 @@ import org.slf4j.LoggerFactory;
       oprot.writeString(struct.experimentId);
       oprot.writeString(struct.taskId);
       oprot.writeString(struct.gatewayId);
+      oprot.writeString(struct.tokenId);
     }
 
     @Override
@@ -581,6 +675,8 @@ import org.slf4j.LoggerFactory;
       struct.setTaskIdIsSet(true);
       struct.gatewayId = iprot.readString();
       struct.setGatewayIdIsSet(true);
+      struct.tokenId = iprot.readString();
+      struct.setTokenIdIsSet(true);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/generate-thrift-files.sh
----------------------------------------------------------------------
diff --git a/airavata-api/generate-thrift-files.sh b/airavata-api/generate-thrift-files.sh
index bd823e4..c8a000d 100755
--- a/airavata-api/generate-thrift-files.sh
+++ b/airavata-api/generate-thrift-files.sh
@@ -27,7 +27,7 @@ DATAMODEL_SRC_DIR='airavata-data-models/src/main/java'
 JAVA_API_SDK_DIR='airavata-api-stubs/src/main/java'
 CPP_SDK_DIR='airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/airavata/'
 PHP_SDK_DIR='airavata-client-sdks/airavata-php-sdk/src/main/resources/lib'
-THRIFT_EXEC=thrift
+THRIFT_EXEC=/usr/local/Cellar/thrift/0.9.1/bin/thrift
 # The Function fail prints error messages on failure and quits the script.
 fail() {
     echo $@

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
index d736701..d9e85d4 100644
--- a/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
+++ b/airavata-api/thrift-interface-descriptions/messagingEvents.thrift
@@ -105,7 +105,8 @@ struct JobIdentifier {
 struct TaskSubmitEvent{
     1: required string experimentId,
     2: required string taskId,
-    3: required string gatewayId
+    3: required string gatewayId,
+    4: required string tokenId
 }
 
 struct TaskTerminateEvent{

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index 0884942..5a178bd 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -80,6 +80,16 @@
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-recipes</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 
     <properties>

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
new file mode 100644
index 0000000..2db9a6f
--- /dev/null
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/CuratorClient.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.airavata.gfac.leader;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * An example leader selector client. Note that {@link LeaderSelectorListenerAdapter} which
+ * has the recommended handling for connection state issues
+ */
+public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
+    private final String name;
+    private final LeaderSelector leaderSelector;
+    private final AtomicInteger leaderCount = new AtomicInteger();
+
+    public CuratorClient(CuratorFramework client, String path, String name) {
+        this.name = name;
+
+        // create a leader selector using the given path for management
+        // all participants in a given leader selection must use the same path
+        // ExampleClient here is also a LeaderSelectorListener but this isn't required
+        leaderSelector = new LeaderSelector(client, path, this);
+
+        // for most cases you will want your instance to requeue when it relinquishes leadership
+        leaderSelector.autoRequeue();
+    }
+
+    public void start() throws IOException {
+        // the selection for this instance doesn't start until the leader selector is started
+        // leader selection is done in the background so this call to leaderSelector.start() returns immediately
+        leaderSelector.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        leaderSelector.close();
+    }
+
+    @Override
+    public void takeLeadership(CuratorFramework client) throws Exception {
+        // we are now the leader. This method should not return until we want to relinquish leadership
+
+        final int waitSeconds = (int) (5 * Math.random()) + 1;
+
+        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+        try {
+            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
+        } catch (InterruptedException e) {
+            System.err.println(name + " was interrupted.");
+            Thread.currentThread().interrupt();
+        } finally {
+            System.out.println(name + " relinquishing leadership.\n");
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
new file mode 100644
index 0000000..ad02641
--- /dev/null
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/leader/LeaderSelectorExample.java
@@ -0,0 +1,80 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.leader;
+
+import com.google.common.collect.Lists;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.List;
+
+public class LeaderSelectorExample {
+    private final static Logger logger = LoggerFactory.getLogger(LeaderSelectorExample.class);
+    private static final int CLIENT_QTY = 10;
+
+    private static final String PATH = "/examples/leader";
+
+    public static void main(String[] args) throws Exception
+    {
+        // all of the useful sample code is in ExampleClient.java
+
+        System.out.println("Create " + CLIENT_QTY + " clients, have each negotiate for leadership and then wait a random number of seconds before letting another leader election occur.");
+        System.out.println("Notice that leader election is fair: all clients will become leader and will do so the same number of times.");
+
+        try
+        {
+            for ( int i = 0; i < CLIENT_QTY; ++i )
+            {
+                CuratorFramework    client = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
+
+                CuratorClient       example = new CuratorClient(client, PATH, "Client #" + i);
+
+                client.start();
+                example.start();
+            }
+
+            System.out.println("Press enter/return to quit\n");
+            new BufferedReader(new InputStreamReader(System.in)).readLine();
+        }
+        finally
+        {
+            System.out.println("Shutting down...");
+
+            /*for ( CuratorClient exampleClient : examples )
+            {
+                CloseableUtils.closeQuietly(exampleClient);
+            }
+            for ( CuratorFramework client : clients )
+            {
+                CloseableUtils.closeQuietly(client);
+            }*/
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index c8f1100..c838703 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -31,31 +31,38 @@ import org.apache.airavata.common.utils.*;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.utils.GFacThreadPoolExecutor;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.core.utils.InputHandlerWorker;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
+import org.apache.airavata.gfac.leader.CuratorClient;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
-import org.apache.airavata.messaging.core.impl.RabbitMQStatusConsumer;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
 import org.apache.airavata.model.messaging.event.MessageType;
 import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
 import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
-import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderSelector;
+import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.*;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 
 public class GfacServerHandler implements GfacService.Iface, Watcher{
@@ -88,6 +95,9 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
 
     private RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
 
+    CuratorFramework curatorFramework = null;
+
+
     public GfacServerHandler() throws Exception{
         // registering with zk
         try {
@@ -114,6 +124,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             if(ServerSettings.isGFacPassiveMode()) {
                 rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
                 rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
+                curatorFramework = CuratorFrameworkFactory.newClient(AiravataZKUtils.getZKhostPort(), new ExponentialBackoffRetry(1000, 3));
             }
 
 
@@ -229,14 +240,6 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         inHandlerFutures.add(GFacThreadPoolExecutor.getFixedThreadPool().submit(inputHandlerWorker));
         // we immediately return when we have a threadpool
         return true;
-//            }else{
-//                logger.error(experimentId, "Failed to submit job to the GFac implementation, experiment {}, task {}, " +
-//                        "gateway {}", experimentId, taskId, gatewayId);
-//                return false;
-//            }
-//        } catch (GFacException e) {
-//            throw new TException("Error launching the experiment : " + e.getMessage(), e);
-//        }
     }
 
     public boolean cancelJob(String experimentId, String taskId) throws TException {
@@ -295,10 +298,26 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     private class TaskLaunchMessageHandler implements MessageHandler {
         private String experimentId;
 
-      
+        private String nodeName;
+
+        public TaskLaunchMessageHandler(){
+            try {
+                nodeName = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+            } catch (ApplicationSettingsException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+
         public Map<String, Object> getProperties() {
             Map<String, Object> props = new HashMap<String, Object>();
-            props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
+            try {
+                props.put(MessagingConstants.RABBIT_ROUTING_KEY, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+            } catch (ApplicationSettingsException e) {
+                // if we cannot find gfac node name configured we set a random id
+                logger.error("airavata-server.properties should configure: " + Constants.ZOOKEEPER_GFAC_SERVER_NAME + " value.");
+                logger.error("listening to a random generated routing key");
+                props.put(MessagingConstants.RABBIT_ROUTING_KEY, UUID.randomUUID().toString());
+            }
             return props;
         }
 
@@ -309,10 +328,16 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
                     TBase messageEvent = message.getEvent();
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
-                    System.out.println(" Message Received with message id '" + message.getMessageId()
-                            + "' and with message type '" + message.getType());
-                } catch (TException e) {
+                    CuratorClient curatorClient = new CuratorClient(curatorFramework, event, nodeName);
+                    try {
+                        curatorClient.start();
+                    } catch (IOException e) {
+                        logger.error(e.getMessage(), e);
+                    }
+
+                        System.out.println(" Message Received with message id '" + message.getMessageId()
+                                + "' and with message type '" + message.getType());
+                    } catch (TException e) {
                     logger.error(e.getMessage(), e); //nobody is listening so nothing to throw
                 }
             }else if(message.getType().equals(MessageType.TERMINATETASK)){
@@ -331,4 +356,57 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
         }
     }
 
+    public class CuratorClient extends LeaderSelectorListenerAdapter implements Closeable {
+        private final String name;
+        private final LeaderSelector leaderSelector;
+        private final AtomicInteger leaderCount = new AtomicInteger();
+        private final String path;
+        private TaskSubmitEvent event;
+        private String experimentNode;
+
+        public CuratorClient(CuratorFramework client, TaskSubmitEvent taskSubmitEvent, String name) {
+            this.name = name;
+            this.event = taskSubmitEvent;
+            this.path = File.separator + event.getExperimentId() + "-" + event.getTaskId() + "-" + event.getGatewayId();
+            // create a leader selector using the given path for management
+            // all participants in a given leader selection must use the same path
+            // ExampleClient here is also a LeaderSelectorListener but this isn't required
+            leaderSelector = new LeaderSelector(client, path, this);
+            experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            // for most cases you will want your instance to requeue when it relinquishes leadership
+            leaderSelector.autoRequeue();
+        }
+
+        public void start() throws IOException {
+            // the selection for this instance doesn't start until the leader selector is started
+            // leader selection is done in the background so this call to leaderSelector.start() returns immediately
+            leaderSelector.start();
+        }
+
+        @Override
+        public void close() throws IOException {
+            leaderSelector.close();
+        }
+
+        @Override
+        public void takeLeadership(CuratorFramework client) throws Exception {
+            // we are now the leader. This method should not return until we want to relinquish leadership
+            final int waitSeconds = (int) (5 * Math.random()) + 1;
+
+            System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
+            System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
+
+            try {
+                GFacUtils.createExperimentEntryForRPC(event.getExperimentId(),event.getTaskId(),client.getZookeeperClient().getZooKeeper(),experimentNode,name,event.getTokenId());
+                submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId());
+                Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
+            } catch (InterruptedException e) {
+                System.err.println(name + " was interrupted.");
+                Thread.currentThread().interrupt();
+            } finally {
+                System.out.println(name + " relinquishing leadership.\n");
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 4fc2a15..2a42503 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -131,6 +131,7 @@
         	<artifactId>zookeeper</artifactId>
         	<version>3.4.0</version>
         </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index cbbce48..9f104fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -1044,9 +1044,9 @@ public class GFacUtils {
 	}
 
 	// This method is dangerous because of moving the experiment data
-	public static boolean createExperimentEntry(String experimentID,
-			String taskID, ZooKeeper zk, String experimentNode,
-			String pickedChild, String tokenId) throws KeeperException,
+	public static boolean createExperimentEntryForRPC(String experimentID,
+													  String taskID, ZooKeeper zk, String experimentNode,
+													  String pickedChild, String tokenId) throws KeeperException,
 			InterruptedException {
 		String experimentPath = experimentNode + File.separator + pickedChild;
 		String newExpNode = experimentPath + File.separator + experimentID
@@ -1153,6 +1153,116 @@ public class GFacUtils {
         return true;
 	}
 
+	// This method is dangerous because of moving the experiment data
+	public static boolean createExperimentEntryForPassive(String experimentID,
+													  String taskID, ZooKeeper zk, String experimentNode,
+													  String pickedChild, String tokenId) throws KeeperException,
+			InterruptedException {
+		String experimentPath = experimentNode + File.separator + pickedChild;
+		String newExpNode = experimentPath + File.separator + experimentID
+				+ "+" + taskID;
+		Stat exists1 = zk.exists(newExpNode, false);
+		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, taskID, zk);
+		String foundExperimentPath = null;
+		if (exists1 == null && experimentEntry == null) {  // this means this is a very new experiment
+			List<String> runningGfacNodeNames = AiravataZKUtils
+					.getAllGfacNodeNames(zk); // here we take old gfac servers
+			// too
+			for (String gfacServerNode : runningGfacNodeNames) {
+				if (!gfacServerNode.equals(pickedChild)) {
+					foundExperimentPath = experimentNode + File.separator
+							+ gfacServerNode + File.separator + experimentID
+							+ "+" + taskID;
+					exists1 = zk.exists(foundExperimentPath, false);
+					if (exists1 != null) { // when the experiment is found we
+						// break the loop
+						break;
+					}
+				}
+			}
+			if (exists1 == null) { // OK this is a pretty new experiment so we
+				// are going to create a new node
+				log.info("This is a new Job, so creating all the experiment docs from the scratch");
+				zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+
+				Stat expParent = zk.exists(newExpNode, false);
+				if (tokenId != null && expParent != null) {
+					zk.setData(newExpNode, tokenId.getBytes(),
+							expParent.getVersion());
+				}
+				zk.create(newExpNode + File.separator + "state", String
+								.valueOf(GfacExperimentState.LAUNCHED.getValue())
+								.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+				zk.create(newExpNode + File.separator + "operation","submit".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+
+			} else {
+				// ohhh this node exists in some other failed gfac folder, we
+				// have to move it to this gfac experiment list,safely
+				log.info("This is an old Job, so copying data from old experiment location");
+				zk.create(newExpNode,
+						zk.getData(foundExperimentPath, false, exists1),
+						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+				List<String> children = zk.getChildren(foundExperimentPath,
+						false);
+				for (String childNode1 : children) {
+					String level1 = foundExperimentPath + File.separator
+							+ childNode1;
+					Stat exists2 = zk.exists(level1, false); // no need to check
+					// exists
+					String newLeve1 = newExpNode + File.separator + childNode1;
+					log.info("Creating new znode: " + newLeve1); // these has to
+					// be info
+					// logs
+					zk.create(newLeve1, zk.getData(level1, false, exists2),
+							ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+					for (String childNode2 : zk.getChildren(level1, false)) {
+						String level2 = level1 + File.separator + childNode2;
+						Stat exists3 = zk.exists(level2, false); // no need to
+						// check
+						// exists
+						String newLeve2 = newLeve1 + File.separator
+								+ childNode2;
+						log.info("Creating new znode: " + newLeve2);
+						zk.create(newLeve2, zk.getData(level2, false, exists3),
+								ZooDefs.Ids.OPEN_ACL_UNSAFE,
+								CreateMode.PERSISTENT);
+					}
+				}
+				// After all the files are successfully transfered we delete the
+				// old experiment,otherwise we do
+				// not delete a single file
+				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+				log.info("Deleting experiment data: " + foundExperimentPath);
+				ZKUtil.deleteRecursive(zk, foundExperimentPath);
+			}
+		}else if(experimentEntry != null && GFacUtils.isCancelled(experimentID,taskID,zk) ){
+			// this happens when a cancel request comes to a differnt gfac node, in this case we do not move gfac experiment
+			// node to gfac node specific location, because original request execution will fail with errors
+			log.error("This experiment is already cancelled and its already executing the cancel operation so cannot submit again !");
+			return false;
+		} else {
+			log.error("ExperimentID: " + experimentID + " taskID: " + taskID
+					+ " is already running by this Gfac instance");
+			List<String> runningGfacNodeNames = AiravataZKUtils
+					.getAllGfacNodeNames(zk); // here we take old gfac servers
+			// too
+			for (String gfacServerNode : runningGfacNodeNames) {
+				if (!gfacServerNode.equals(pickedChild)) {
+					foundExperimentPath = experimentNode + File.separator
+							+ gfacServerNode + File.separator + experimentID
+							+ "+" + taskID;
+					break;
+				}
+			}
+			ZKUtil.deleteRecursive(zk, foundExperimentPath);
+		}
+		return true;
+	}
+
     public static String findExperimentEntry(String experimentID,
                                                 String taskID, ZooKeeper zk
                                                 ) throws KeeperException,

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 23863fb..99c0abb 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -127,16 +127,6 @@ the License. -->
         	<artifactId>zookeeper</artifactId>
         	<version>${zk.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
-            <artifactId>curator-framework</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
     </dependencies>
     <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index bfe2b16..78cc6b7 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -39,7 +39,6 @@ import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -113,40 +112,29 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 }
             }
             String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
-            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
             List<String> children = zk.getChildren(gfacServer, this);
 
             if (children.size() == 0) {
                 // Zookeeper data need cleaning
                 throw new OrchestratorException("There is no active GFac instance to route the request");
             } else {
-                String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
-                // here we are not using an index because the getChildren does not return the same order everytime
-                String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
-                logger.info("GFAC instance node data: " + gfacNodeData);
-                String[] split = gfacNodeData.split(":");
-                gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
-                if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
-                    // before submitting the job we check again the state of the node
-                    if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
-                        String gatewayId = null;
-                        CredentialReader credentialReader = GFacUtils.getCredentialReader();
-                        if (credentialReader != null) {
-                            try {
-                                gatewayId = credentialReader.getGatewayID(tokenId);
-                            } catch (Exception e) {
-                                logger.error(e.getLocalizedMessage());
-                            }
-                        }
-                        if(gatewayId == null || gatewayId.isEmpty()){
-                            gatewayId = ServerSettings.getDefaultUserGateway();
-                        }
-                        TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId);
-                        MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
-                        messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
-                        publisher.publish(messageContext);
+                String gatewayId = null;
+                CredentialReader credentialReader = GFacUtils.getCredentialReader();
+                if (credentialReader != null) {
+                    try {
+                        gatewayId = credentialReader.getGatewayID(tokenId);
+                    } catch (Exception e) {
+                        logger.error(e.getLocalizedMessage());
                     }
                 }
+                if(gatewayId == null || gatewayId.isEmpty()){
+                    gatewayId = ServerSettings.getDefaultUserGateway();
+                }
+
+                TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, gatewayId,tokenId);
+                MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
+                messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
+                publisher.publish(messageContext);
             }
         } catch (InterruptedException e) {
             logger.error(e.getMessage(), e);
@@ -204,8 +192,8 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
                 if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
                     // before submitting the job we check again the state of the node
-                    if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
-                        TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null);
+                    if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
+                        TaskSubmitEvent taskSubmitEvent = new TaskSubmitEvent(experimentID, taskID, null,null);
                         MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK,"LAUNCH.TERMINATE-"+ UUID.randomUUID().toString(),null);
                         publisher.publish(messageContext);
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/a486b67d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
index 54339a2..b855de2 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACRPCJobSubmitter.java
@@ -99,7 +99,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
 				gfacClient = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
 				if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
 					// before submitting the job we check again the state of the node
-					if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
+					if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, tokenId)) {
 						 String gatewayId = null;
                     	 CredentialReader credentialReader = GFacUtils.getCredentialReader();
                          if (credentialReader != null) {
@@ -167,7 +167,7 @@ public class GFACRPCJobSubmitter implements JobSubmitter, Watcher {
                 localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
                 if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {
                     // before submitting the job we check again the state of the node
-                    if (GFacUtils.createExperimentEntry(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
+                    if (GFacUtils.createExperimentEntryForRPC(experimentID, taskID, zk, experimentNode, pickedChild, null)) {
                         return localhost.cancelJob(experimentID, taskID);
                     }
                 }