You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:41 UTC

[4/4] airavata git commit: Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.

Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a061780
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a061780
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a061780

Branch: refs/heads/master
Commit: 4a06178022c0cd1a215c47e3c65a7b6ffd342bbb
Parents: 915ed04
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu May 28 11:26:36 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu May 28 11:26:36 2015 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +-
 .../AiravataExperimentStatusUpdator.java        |  44 +-
 modules/commons/utils/pom.xml                   |   6 +-
 .../airavata/common/utils/AiravataZKUtils.java  |  61 +--
 modules/distribution/server/pom.xml             |   5 +
 modules/gfac/airavata-gfac-service/pom.xml      |   5 -
 .../airavata/gfac/server/GfacServerHandler.java | 153 ++----
 modules/gfac/gfac-core/pom.xml                  |   7 +-
 .../gfac/core/context/JobExecutionContext.java  |  46 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 483 +++++++------------
 .../gfac/core/handler/AbstractHandler.java      |   2 +-
 .../core/handler/AppDescriptorCheckHandler.java |   2 +-
 .../core/monitor/GfacInternalStatusUpdator.java |  62 +--
 .../airavata/gfac/core/utils/GFacUtils.java     | 408 +++++++---------
 .../gsissh/provider/impl/GSISSHProvider.java    |   6 +-
 .../airavata/gfac/monitor/util/CommonUtils.java |  62 +--
 .../gfac/ssh/provider/impl/SSHProvider.java     |   6 +-
 .../util/OrchestratorRecoveryHandler.java       | 109 -----
 pom.xml                                         |   7 +-
 19 files changed, 509 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index df87344..28b9ef9 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -81,7 +81,11 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${org.slf4j.version}</version>
         </dependency>
-        
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index dd7bcd0..f59179e 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -23,11 +23,13 @@ package org.apache.airavata.api.server.listener;
 import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.*;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
@@ -38,9 +40,8 @@ import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,18 +50,12 @@ import java.util.Calendar;
 
 public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
-
     private Registry airavataRegistry;
-
     private MonitorPublisher monitorPublisher;
-
     private Publisher publisher;
-
-    private ZooKeeper zk;
-
+    private CuratorFramework curatorClient;
     private RabbitMQTaskLaunchConsumer consumer;
 
-
     public Registry getAiravataRegistry() {
         return airavataRegistry;
     }
@@ -130,9 +125,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 		}
     }
 
-    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws KeeperException, InterruptedException, AiravataException {
+    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws Exception {
         int count = 0;
-        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk,
+        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), curatorClient,
                 experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         if(deliveryTag>0) {
             if (ServerSettings.isGFacPassiveMode()) {
@@ -152,16 +147,18 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
                 }
             }
         }
-        if (zk.exists(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, false) != null) {
-            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX);
+        if (curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX) != null) {
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                    experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, true);
         }
-        if (zk.exists(experimentPath, false) != null) {
-            ZKUtil.deleteRecursive(zk, experimentPath);
+
+        if (curatorClient.checkExists().forPath(experimentPath) != null) {
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath, true);
         }
 
         // ack cancel operation if exist
         long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
-                zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+                curatorClient, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         count  = 0;
         if (cancelDT > 0) {
             while (!consumer.isOpen() && count < 3) {
@@ -180,7 +177,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
             }
         }
         if (cancelDT > 0) {
-            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                    experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
         }
     }
 
@@ -211,8 +209,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
                 this.publisher=(Publisher) configuration;
             }else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
                 this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
-            }else if (configuration instanceof ZooKeeper) {
-                this.zk = (ZooKeeper) configuration;
+            }else if (configuration instanceof CuratorFramework) {
+                this.curatorClient = (CuratorFramework) configuration;
             }
 
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 44d7465..09766af 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -130,9 +130,9 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <version>3.4.0</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.thrift</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index ac617d9..f753bc1 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -20,9 +20,10 @@
 */
 package org.apache.airavata.common.utils;
 
-import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
@@ -36,15 +37,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 public class AiravataZKUtils implements Watcher {
     private final static Logger logger = LoggerFactory.getLogger(AiravataZKUtils.class);
 
     public static final String ZK_EXPERIMENT_STATE_NODE = "state";
-
     public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
-
     public static final String CANCEL_DELIVERY_TAG_POSTFIX = "-cancel-deliveryTag";
 
     @Override
@@ -81,35 +79,14 @@ public class AiravataZKUtils implements Watcher {
                 "state";
     }
 
-    public static String getExpState(ZooKeeper zk, String expId) throws ApplicationSettingsException,
-            KeeperException, InterruptedException {
-        Stat exists = zk.exists(getExpStatePath(expId), false);
+    public static String getExpState(CuratorFramework curatorClient, String expId) throws Exception {
+        Stat exists = curatorClient.checkExists().forPath(getExpStatePath(expId));
         if (exists != null) {
-            return new String(zk.getData(getExpStatePath(expId), false, exists));
+            return new String(curatorClient.getData().storingStatIn(exists).forPath(getExpStatePath(expId)));
         }
         return null;
     }
 
-
-    public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
-            KeeperException, InterruptedException {
-        Stat exists = zk.exists(fullPath, false);
-        if (exists != null) {
-            return Integer.parseInt(new String(zk.getData(fullPath, false, exists)));
-        }
-        return -1;
-    }
-    public static List<String> getRunningGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
-        String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_API_SERVER_NODE, "/gfac-server");
-        return zk.getChildren(gfacServer, null);
-    }
-
-
-    public static List<String> getAllGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
-        String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-        return zk.getChildren(gfacServer, null);
-    }
-
     public static void runZKFromConfig(ServerConfig config,ServerCnxnFactory cnxnFactory) throws IOException {
         AiravataZKUtils.logger.info("Starting Zookeeper server...");
         FileTxnSnapLog txnLog = null;
@@ -177,33 +154,22 @@ public class AiravataZKUtils implements Watcher {
         }
     }
 
-    public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException {
-        String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-
-        Stat expParent = zk.exists(newExpNode, false);
-        if (expParent != null) {
-            zk.setData(newExpNode, toByteArray(deliveryTag),
-                    expParent.getVersion());
-        }
-    }
-
     public static byte[] toByteArray(double value) {
         byte[] bytes = new byte[8];
         ByteBuffer.wrap(bytes).putDouble(value);
         return bytes;
     }
 
-    public static long getDeliveryTag(String experimentID, ZooKeeper zk, String experimentNode,
-                                      String pickedChild) throws KeeperException, InterruptedException,AiravataException {
+    public static long getDeliveryTag(String experimentID, CuratorFramework curatorClient, String experimentNode,
+                                      String pickedChild) throws Exception {
         String deliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentID
                 + DELIVERY_TAG_POSTFIX;
-        Stat exists = zk.exists(deliveryTagPath, false);
+        Stat exists = curatorClient.checkExists().forPath(deliveryTagPath);
         if(exists==null) {
             logger.error("Cannot find delivery Tag in path:" + deliveryTagPath + " for this experiment");
             return -1;
         }
-        return bytesToLong(zk.getData(deliveryTagPath, false, exists));
+        return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(deliveryTagPath));
     }
     public static byte[] longToBytes(long x) {
         ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
@@ -222,15 +188,16 @@ public class AiravataZKUtils implements Watcher {
         return ByteBuffer.wrap(bytes).getDouble();
     }
 
-    public static long getCancelDeliveryTagIfExist(String experimentId, ZooKeeper zk, String experimentNode, String pickedChild) throws KeeperException, InterruptedException {
+    public static long getCancelDeliveryTagIfExist(String experimentId, CuratorFramework curatorClient,
+                                                   String experimentNode, String pickedChild) throws Exception {
 
         String cancelDeliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentId +
                 AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
-        Stat exists = zk.exists(cancelDeliveryTagPath, false);
+        Stat exists = curatorClient.checkExists().forPath(cancelDeliveryTagPath);
         if (exists == null) {
             return -1; // no cancel deliverytag found
         } else {
-            return bytesToLong(zk.getData(cancelDeliveryTagPath, false, exists));
+            return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(cancelDeliveryTagPath));
         }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 9a8d2c7..6c2f213 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -605,6 +605,11 @@
 			<artifactId>amqp-client</artifactId>
 			<version>${amqp.client.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-framework</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
 
 		<!-- ======================== Sample =================== -->
 		<dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index 5a178bd..3e8b423 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -82,11 +82,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
             <version>${curator.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 5d747aa..20926d7 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.server;
 
 import com.google.common.eventbus.EventBus;
 import org.airavata.appcatalog.cpi.AppCatalog;
-import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -45,71 +44,65 @@ import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.locks.Lock;
 
 
-public class GfacServerHandler implements GfacService.Iface, Watcher {
+public class GfacServerHandler implements GfacService.Iface {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
-
     private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
-
     private static int requestCount=0;
-
     private Registry registry;
     private AppCatalog appCatalog;
-
     private String gatewayName;
-
     private String airavataUserName;
-
-    private ZooKeeper zk;
-
+//    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
     private static Integer mutex = -1;
-
     private static Lock lock;
-
     private MonitorPublisher publisher;
-
     private String gfacServer;
-
     private String gfacExperiments;
-
     private String airavataServerHostPort;
-
-
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
 
     public GfacServerHandler() throws Exception {
-        // registering with zk
         try {
+            // start curator client
             String zkhostPort = AiravataZKUtils.getZKhostPort();
-            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
-                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
-            zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);   // no watcher is required, this will only use to store some data
+            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+            curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
+            curatorClient.start();
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            logger.info("Waiting for zookeeper to connect to the server");
-            synchronized (mutex) {
-                mutex.wait(5000);  // waiting for the syncConnected event
-            }
+            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
             storeServerConfig();
-            logger.info("Finished starting ZK: " + zk);
             publisher = new MonitorPublisher(new EventBus());
             BetterGfacImpl.setMonitorPublisher(publisher);
             registry = RegistryFactory.getDefaultRegistry();
@@ -121,24 +114,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                 rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
                 rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
             }
-            BetterGfacImpl.startStatusUpdators(registry, zk, publisher, rabbitMQTaskLaunchConsumer);
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (InterruptedException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (AppCatalogException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (RegistryException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (KeeperException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (IOException e) {
-            logger.error("Error initialising GFAC", e);
+            BetterGfacImpl.startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+        } catch (Exception e) {
             throw new Exception("Error initialising GFAC", e);
         }
     }
@@ -152,60 +129,27 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
             logger.error(e.getMessage(), e);
         }
     }
-    private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException {
-        Stat zkStat = zk.exists(gfacServer, false);
-        if (zkStat == null) {
-            zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+    private void storeServerConfig() throws Exception {
+        Stat stat = curatorClient.checkExists().forPath(gfacServer);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacServer, new byte[0]);
         }
         String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
         String instanceNode = gfacServer + File.separator + instanceId;
-        zkStat = zk.exists(instanceNode, true);
-        if (zkStat == null) {
-            zk.create(instanceNode,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
-            zk.getChildren(instanceNode, true);
+        stat = curatorClient.checkExists().forPath(instanceNode);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
+            curatorClient.getChildren().watched().forPath(instanceNode);
         }
-        zkStat = zk.exists(gfacExperiments, false);
-        if (zkStat == null) {
-            zk.create(gfacExperiments,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+        stat = curatorClient.checkExists().forPath(gfacExperiments);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
         }
-        zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
-        if (zkStat == null) {
-            zk.create(gfacExperiments + File.separator + instanceId,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } else {
-            logger.error(" Zookeeper is inconsistent state  !!!!!");
-        }
-    }
-
-    synchronized public void process(WatchedEvent watchedEvent) {
-        logger.info(watchedEvent.getPath());
-        logger.info(watchedEvent.getType().toString());
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            logger.info(state.name());
-            switch (state){
-                case SyncConnected:
-                    mutex.notify();
-                    break;
-                case Expired:case Disconnected:
-                   logger.info("ZK Connection is "+ state.toString());
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        logger.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-            }
+        stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
         }
     }
 
@@ -304,7 +248,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 
     private GFac getGfac() throws TException {
         try {
-            return new BetterGfacImpl(registry, appCatalog,null , publisher);
+            return new BetterGfacImpl(registry, appCatalog, curatorClient, publisher);
 
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -379,16 +323,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     registry.update(RegistryModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
                     experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
                     try {
-                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
+                                experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                         AiravataZKUtils.getExpStatePath(event.getExperimentId());
                         submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
-                    } catch (KeeperException e) {
-                        logger.error(nodeName + " was interrupted.");
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                    } catch (InterruptedException e) {
-                        logger.error(e.getMessage(), e);
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                    } catch (ApplicationSettingsException e) {
+                    } catch (Exception e) {
                         logger.error(e.getMessage(), e);
                         rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
@@ -404,7 +343,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                 try {
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), zk, message.getDeliveryTag());
+                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag());
                     if (saveDeliveryTagSuccess) {
                         cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
                         System.out.println(" Message Received with message id '" + message.getMessageId()
@@ -420,7 +359,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                         // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
                     } else {
                         try {
-                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), zk)) {
+                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) {
                                 if (!rabbitMQTaskLaunchConsumer.isOpen()) {
                                     rabbitMQTaskLaunchConsumer.reconnect();
                                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 465f00e..7eede18 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -116,11 +116,10 @@
         <!-- this is the dependency for amqp implementation -->
         <!-- zookeeper dependencies -->
         <dependency>
-        	<groupId>org.apache.zookeeper</groupId>
-        	<artifactId>zookeeper</artifactId>
-        	<version>3.4.0</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
         </dependency>
-
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 6a8dd5f..0ca3828 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,29 +51,18 @@ public class JobExecutionContext extends AbstractContext implements Serializable
 
     private static final Logger log = LoggerFactory.getLogger(JobExecutionContext.class);
     private GFacConfiguration gfacConfiguration;
-
     private ApplicationContext applicationContext;
-
     private MessageContext inMessageContext;
-
     private MessageContext outMessageContext;
-
     private GFacNotifier notifier;
-
     //FIXME : not needed for gfac
     private Experiment experiment;
-
     private TaskDetails taskData;
-
     private JobDetails jobDetails;
-
     // FIXME : not needed for gfac
     private WorkflowNodeDetails workflowNodeDetails;
-
     private GFac gfac;
-
-    private ZooKeeper zk;
-
+    private CuratorFramework curatorClient;
     private String credentialStoreToken;
     /**
      * User defined scratch/temp directory
@@ -150,10 +140,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable
     // which service description we should refer during the execution of the current job represented
     // by this context instance.
     private String applicationName;
-
     private String experimentID;
-
     private AppCatalog appCatalog;
+    private String gatewayID;
+    private String status;
+    private List<String> outputFileList;
+    private Registry registry;
 
     public String getGatewayID() {
         return gatewayID;
@@ -163,13 +155,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.gatewayID = gatewayID;
     }
 
-    private String gatewayID;
-    
-    private String status;
-
-    private List<String> outputFileList;
-
-    private Registry registry;
 
     /**
      *  Security context is used to handle authentication for input handlers and providers.
@@ -377,15 +362,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.gfac = gfac;
     }
 
-    public ZooKeeper getZk() {
-        return zk;
-    }
-
-    public void setZk(ZooKeeper zk) {
-        this.zk = zk;
-
-    }
-
     public String getCredentialStoreToken() {
         return credentialStoreToken;
     }
@@ -494,6 +470,14 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.preferredDataMovementInterface = preferredDataMovementInterface;
     }
 
+    public CuratorFramework getCuratorClient() {
+        return curatorClient;
+    }
+
+    public void setCuratorClient(CuratorFramework curatorClient) {
+        this.curatorClient = curatorClient;
+    }
+
     public String getExecutablePath() {
         if (applicationContext == null || applicationContext.getApplicationDeploymentDescription() == null) {
             return null;
@@ -502,6 +486,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         }
     }
 
+
+
     public String getLoginUserName() {
         return loginUserName;
     }