You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:38 UTC

[1/4] airavata git commit: Remove Zookeeper code and dependency from Orchestrator.

Repository: airavata
Updated Branches:
  refs/heads/master 5002df55a -> 4a0617802


Remove Zookeeper code and dependency from Orchestrator.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/915ed04d
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/915ed04d
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/915ed04d

Branch: refs/heads/master
Commit: 915ed04d54c61e083ef0f2b7dccb7083ef3fee9e
Parents: 5002df5
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Wed May 27 16:48:49 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Wed May 27 16:48:49 2015 -0400

----------------------------------------------------------------------
 .../server/OrchestratorServerHandler.java       | 149 +------------------
 modules/orchestrator/orchestrator-core/pom.xml  |   6 -
 .../core/context/OrchestratorContext.java       |  13 --
 .../core/impl/GFACPassiveJobSubmitter.java      |  25 ----
 4 files changed, 3 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 1b3e1f3..4ef9dbc 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -76,40 +76,21 @@ import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowN
 import org.apache.airavata.workflow.core.WorkflowEnactmentService;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-
-import java.io.File;
-import java.io.IOException;
+
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-
-public class OrchestratorServerHandler implements OrchestratorService.Iface,
-		Watcher {
-	private static AiravataLogger log = AiravataLoggerFactory
-			.getLogger(OrchestratorServerHandler.class);
 
+public class OrchestratorServerHandler implements OrchestratorService.Iface {
+	private static AiravataLogger log = AiravataLoggerFactory .getLogger(OrchestratorServerHandler.class);
 	private SimpleOrchestratorImpl orchestrator = null;
-
 	private Registry registry;
-
-	private ZooKeeper zk;
-
 	private static Integer mutex = new Integer(-1);
-
 	private String airavataUserName;
 	private String gatewayName;
 	private Publisher publisher;
-
     private RabbitMQProcessConsumer rabbitMQProcessConsumer;
     private RabbitMQProcessPublisher rabbitMQProcessPublisher;
 
@@ -133,31 +114,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 			
 //            setGatewayName(ServerSettings.getDefaultUserGateway());
             setAiravataUserName(ServerSettings.getDefaultUser());
-			if(!ServerSettings.isGFacPassiveMode()) {
-				try {
-					zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is
-					// required, this
-					// will only use to
-					// store some data
-					log.info("Waiting for zookeeper to connect to the server");
-
-					String OrchServer = ServerSettings
-							.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
-					synchronized (mutex) {
-						mutex.wait(5000); // waiting for the syncConnected event
-					}
-					registerOrchestratorService(airavataServerHostPort, OrchServer);
-					// creating a watch in orchestrator to monitor the gfac
-					// instances
-					zk.getChildren(ServerSettings.getSetting(
-									Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"),
-							this);
-					log.info("Finished starting ZK: " + zk);
-				} catch (IOException|InterruptedException|KeeperException e) {
-					log.error(e.getMessage(), e);
-					throw new OrchestratorException("Error while initializing orchestrator service, Error in Zookeeper", e);
-				}
-			}
 		} catch (AiravataException e) {
             log.error(e.getMessage(), e);
             throw new OrchestratorException("Error while initializing orchestrator service", e);
@@ -169,7 +125,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 			orchestrator = new SimpleOrchestratorImpl();
 			registry = RegistryFactory.getDefaultRegistry();
 			orchestrator.initialize();
-			orchestrator.getOrchestratorContext().setZk(this.zk);
 			orchestrator.getOrchestratorContext().setPublisher(this.publisher);
             startProcessConsumer();
         } catch (OrchestratorException e) {
@@ -194,23 +149,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 
     }
 
-    private void registerOrchestratorService(String airavataServerHostPort, String orchServer) throws KeeperException, InterruptedException {
-        Stat zkStat = zk.exists(orchServer, false);
-        if (zkStat == null) {
-            zk.create(orchServer, new byte[0],
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        }
-        String instantNode = orchServer
-                + File.separator
-                + String.valueOf(new Random()
-                        .nextInt(Integer.MAX_VALUE));
-        zkStat = zk.exists(instantNode, false);
-        if (zkStat == null) {
-            zk.create(instantNode, airavataServerHostPort.getBytes(),
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
-        }
-    }
-
     /**
 	 * * After creating the experiment Data user have the * experimentID as the
 	 * handler to the experiment, during the launchExperiment * We just have to
@@ -327,84 +265,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
         return validateStatesAndCancel(experimentId, tokenId);
 	}
 
-	/**
-	 * This method gracefully handler gfac node failures
-	 */
-	synchronized public void process(WatchedEvent watchedEvent) {
-		log.info(watchedEvent.getPath());
-		synchronized (mutex) {
-			try {
-				Event.KeeperState state = watchedEvent.getState();
-				switch (state) {
-					case SyncConnected:
-						mutex.notify();
-						break;
-					case Expired:
-					case Disconnected:
-						log.info("ZK Connection is "+ state.toString());
-						break;
-				}
-				if (watchedEvent.getPath() != null
-						&& watchedEvent.getPath().startsWith(
-						ServerSettings.getSetting(
-								Constants.ZOOKEEPER_GFAC_SERVER_NODE,
-								"/gfac-server"))) {
-					List<String> children = zk.getChildren(ServerSettings
-							.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,
-									"/gfac-server"), true);
-					for (String gfacNodes : children) {
-						zk.exists(
-								ServerSettings.getSetting(
-										Constants.ZOOKEEPER_GFAC_SERVER_NODE,
-										"/gfac-server")
-										+ File.separator + gfacNodes, this);
-					}
-					switch (watchedEvent.getType()) {
-						case NodeCreated:
-							mutex.notify();
-							break;
-						case NodeDeleted:
-							// here we have to handle gfac node shutdown case
-							if (children.size() == 0) {
-								log.error("There are not gfac instances to route failed jobs");
-								return;
-							}
-							// we recover one gfac node at a time
-							final WatchedEvent event = watchedEvent;
-							final OrchestratorServerHandler handler = this;
-						/*(new Thread() {  // disabling ft implementation with zk
-							public void run() {
-								int retry = 0;
-								while (retry < 3) {
-									try {
-										(new OrchestratorRecoveryHandler(
-												handler, event.getPath()))
-												.recover();
-										break;
-									} catch (Exception e) {
-										e.printStackTrace();
-										log.error("error recovering the jobs for gfac-node: "
-												+ event.getPath());
-										log.error("Retrying again to recover jobs and retry attempt: "
-												+ ++retry);
-									}
-								}
-
-							}
-						}).start();*/
-							break;
-					}
-
-
-				}
-			} catch (KeeperException e) {
-				e.printStackTrace();
-			} catch (InterruptedException e) {
-				e.printStackTrace();
-			}
-		}
-	}
-
 	private String getAiravataUserName() {
 		return airavataUserName;
 	}
@@ -513,9 +373,6 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
         try {
             Experiment experiment = (Experiment) registry.get(
                     RegistryModelType.EXPERIMENT, experimentId);
-            if (zk == null || !zk.getState().isConnected()){
-                zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
-            }
 			log.info("Waiting for zookeeper to connect to the server");
 			synchronized (mutex){
 				mutex.wait(5000);

http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 6cfc17f..a5c41ba 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -116,12 +116,6 @@ the License. -->
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>
         </dependency>
-        <!-- zookeeper and curator dependencies -->
-        <dependency>
-        	<groupId>org.apache.zookeeper</groupId>
-        	<artifactId>zookeeper</artifactId>
-        	<version>${zk.version}</version>
-        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index b77087b..f7f5969 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -34,13 +34,8 @@ import org.apache.zookeeper.ZooKeeper;
  */
 public class OrchestratorContext {
     private List<GFACInstance> gfacInstanceList;
-
     private OrchestratorConfiguration orchestratorConfiguration;
-
     private Registry newRegistry;
-
-    private static ZooKeeper zk; // this instance can be accessed by the Validators and other components
-
     private Publisher publisher;
     
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
@@ -90,12 +85,4 @@ public class OrchestratorContext {
     public void setGfacInstanceList(List<GFACInstance> gfacInstanceList) {
         this.gfacInstanceList.addAll(gfacInstanceList);
     }
-
-    public  void setZk(ZooKeeper zk) {
-        this.zk = zk;
-    }
-
-    public static ZooKeeper getZk() {
-        return zk;
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/915ed04d/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 1faef9e..36282a0 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -21,17 +21,11 @@
 package org.apache.airavata.orchestrator.core.impl;
 
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.AiravataUtils;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.client.GFACInstance;
-import org.apache.airavata.gfac.client.GFacClientFactory;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.PublisherFactory;
@@ -41,17 +35,11 @@ import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
-import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 
 /**
@@ -59,13 +47,9 @@ import java.util.UUID;
  */
 public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
     private final static Logger logger = LoggerFactory.getLogger(GFACPassiveJobSubmitter.class);
-
     public static final String IP = "ip";
-
     private OrchestratorContext orchestratorContext;
-
     private static Integer mutex = -1;
-
     private Publisher publisher;
 
     public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
@@ -160,15 +144,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
         }
     }
 
-    private void closeZK(OrchestratorContext orchestratorContext) {
-        try {
-            if(orchestratorContext!=null && orchestratorContext.getZk()!=null) {
-                orchestratorContext.getZk().close();
-            }
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-        }
-    }
     synchronized public void process(WatchedEvent event) {
         logger.info(getClass().getName() + event.getPath());
         logger.info(getClass().getName()+event.getType());


[3/4] airavata git commit: Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 2cd9ecb..200ffbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -61,6 +61,8 @@ import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -80,57 +82,44 @@ import java.util.*;
  * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
  * the resource, required data for the job has to be stored in registry prior to invoke this object.
  */
-public class BetterGfacImpl implements GFac,Watcher {
+public class BetterGfacImpl implements GFac {
     private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
     public static final String ERROR_SENT = "ErrorSent";
     private Registry registry;
-    // we are not storing zk instance in to jobExecution context
-    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
     private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
     private static File gfacConfigFile;
     private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
     private static MonitorPublisher monitorPublisher;
-    private static Integer mutex = -1;
 
     /**
      * Constructor for GFac
      *
      * @param registry
-     * @param zooKeeper
+     * @param curatorClient
      */
-    public BetterGfacImpl(Registry registry,  AppCatalog appCatalog, ZooKeeper zooKeeper,
+    public BetterGfacImpl(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
                           MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
         this.registry = registry;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
-        this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-        log.info("Waiting until zookeeper client connect to the server...");
-        synchronized (mutex) {
-            mutex.wait(5000);  // waiting for the syncConnected event
-        }
+        this.curatorClient = curatorClient;
     }
 
-    public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+    public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+                                           RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
             Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
                 AbstractActivityListener abstractActivityListener = aClass.newInstance();
                 activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer);
+                abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
                 log.info("Registering listener: " + listenerClass);
                 publisher.registerListener(abstractActivityListener);
             }
-        } catch (ClassNotFoundException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (InstantiationException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (IllegalAccessException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (ApplicationSettingsException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (AiravataException e) {
+        } catch (Exception e) {
             log.error("Error loading the listener classes configured in airavata-server.properties", e);
         }
     }
@@ -151,26 +140,9 @@ public class BetterGfacImpl implements GFac,Watcher {
                 threadedHandler.initProperties(handlerConfig.getProperties());
                 daemonHandlers.add(threadedHandler);
             }
-        } catch (ParserConfigurationException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (IOException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (SAXException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (XPathExpressionException e) {
+        } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+                InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
             log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (ClassNotFoundException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (InstantiationException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (GFacHandlerException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (IllegalAccessException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
         }
         for (ThreadedHandler tHandler : daemonHandlers) {
             (new Thread(tHandler)).start();
@@ -184,13 +156,13 @@ public class BetterGfacImpl implements GFac,Watcher {
         daemonHandlers = new ArrayList<ThreadedHandler>();
         startDaemonHandlers();
     }
-    
+
     public BetterGfacImpl(Registry registry) {
-    	this();
-    	this.registry = registry;
+        this();
+        this.registry = registry;
     }
 
-    
+
     /**
      * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
      * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
@@ -206,13 +178,13 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext.setCredentialStoreToken(tokenId);
             return submitJob(jobExecutionContext);
         } catch (Exception e) {
-            log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
+            log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
             GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
             // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
-            if(jobExecutionContext!=null){
+            if (jobExecutionContext != null) {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -222,8 +194,6 @@ public class BetterGfacImpl implements GFac,Watcher {
                 monitorPublisher.publish(event);
             }
             throw new GFacException(e);
-        }finally {
-            closeZK(jobExecutionContext);
         }
     }
 
@@ -237,7 +207,7 @@ public class BetterGfacImpl implements GFac,Watcher {
          * 1. Get the Task from the task ID and construct the Job object and save it in to registry
          * 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
          */
-        
+
         //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
         TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
 
@@ -289,11 +259,11 @@ public class BetterGfacImpl implements GFac,Watcher {
         jobExecutionContext.setTaskData(taskData);
         jobExecutionContext.setGatewayID(gatewayID);
         jobExecutionContext.setAppCatalog(appCatalog);
-        
-      
+
+
         List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
         //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
-        for(JobDetails jDetails:jobDetailsList){
+        for (JobDetails jDetails : jobDetailsList) {
             jobExecutionContext.setJobDetails(jDetails);
         }
         // setting the registry
@@ -314,11 +284,11 @@ public class BetterGfacImpl implements GFac,Watcher {
 
         jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
         jobExecutionContext.setGfac(this);
-        jobExecutionContext.setZk(zk);
+        jobExecutionContext.setCuratorClient(curatorClient);
 
         // handle job submission protocol
         List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
-        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
             Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
                 @Override
                 public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
@@ -327,7 +297,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             });
 
             jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
-        }else {
+        } else {
             throw new GFacException("Compute resource should have at least one job submission interface defined...");
         }
         // handle data movement protocol
@@ -346,7 +316,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
         populateResourceJobManager(jobExecutionContext);
         // if gateway resource preference is set
-        if (gatewayResourcePreferences != null ) {
+        if (gatewayResourcePreferences != null) {
             if (gatewayResourcePreferences.getScratchLocation() == null) {
                 gatewayResourcePreferences.setScratchLocation("/tmp");
             }
@@ -365,7 +335,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 }
             }
 
-            if(gatewayResourcePreferences.getLoginUserName() != null){
+            if (gatewayResourcePreferences.getLoginUserName() != null) {
                 jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
             }
 
@@ -375,45 +345,45 @@ public class BetterGfacImpl implements GFac,Watcher {
                 jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
                 jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
             } else {
-            	// this check is to avoid NPE when job submission endpoints do 
-            	// not contain any data movement interfaces. 
-            	if((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
-            		for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
-            			if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
-            				jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
-            				break;
-                    	}
-            		}
-            	}
+                // this check is to avoid NPE when job submission endpoints do
+                // not contain any data movement interfaces.
+                if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
+                    for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
+                        if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
+                            jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
+                            break;
+                        }
+                    }
+                }
             }
-        }  else {
+        } else {
             setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
         }
         List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
-        if (taskOutputs == null || taskOutputs.isEmpty() ){
+        if (taskOutputs == null || taskOutputs.isEmpty()) {
             taskOutputs = applicationInterface.getApplicationOutputs();
         }
 
-        for (OutputDataObjectType objectType : taskOutputs){
-            if (objectType.getType() == DataType.URI && objectType.getValue() != null){
+        for (OutputDataObjectType objectType : taskOutputs) {
+            if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
                 String filePath = objectType.getValue();
                 // if output is not in working folder
                 if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
-                	if(objectType.getLocation().startsWith(File.separator)){
-                		filePath = objectType.getLocation() + File.separator + filePath;
-                    }else{
-                    	filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
+                    if (objectType.getLocation().startsWith(File.separator)) {
+                        filePath = objectType.getLocation() + File.separator + filePath;
+                    } else {
+                        filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
                     }
-                }else{
-                	filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
+                } else {
+                    filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
                 }
                 objectType.setValue(filePath);
-                
+
             }
-            if (objectType.getType() == DataType.STDOUT){
+            if (objectType.getType() == DataType.STDOUT) {
                 objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
             }
-            if (objectType.getType() == DataType.STDERR){
+            if (objectType.getType() == DataType.STDERR) {
                 objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
             }
         }
@@ -464,23 +434,23 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+    private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
         try {
             JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
             JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
             if (submissionProtocol == JobSubmissionProtocol.SSH) {
                 SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (sshJobSubmission != null){
+                if (sshJobSubmission != null) {
                     jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
                 }
-            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
                 LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (localJobSubmission != null){
+                if (localJobSubmission != null) {
                     jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
                 }
             }
         } catch (AppCatalogException e) {
-           log.error("Error occured while retrieving job submission interface", e);
+            log.error("Error occured while retrieving job submission interface", e);
         }
     }
 
@@ -488,7 +458,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         // We need to check whether this job is submitted as a part of a large workflow. If yes,
         // we need to setup workflow tracking listerner.
         try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             // Register log event listener. This is required in all scenarios.
             if (isNewJob(gfacExpState)) {
                 // In this scenario We do everything from the beginning
@@ -497,8 +467,8 @@ public class BetterGfacImpl implements GFac,Watcher {
                 launch(jobExecutionContext);
             } else if (isCompletedJob(gfacExpState)) {
                 log.info("There is nothing to recover in this job so we do not re-submit");
-                ZKUtil.deleteRecursive(zk,
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+                ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
             } else {
                 // Now we know this is an old Job, so we have to handle things gracefully
                 log.info("Re-launching the job in GFac because this is re-submitted to GFac");
@@ -547,7 +517,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
         try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             String workflowInstanceID = null;
             if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
                 //todo implement WorkflowTrackingListener properly
@@ -575,27 +545,25 @@ public class BetterGfacImpl implements GFac,Watcher {
 //
 //            }
             return true;
-            }catch(Exception e){
-                log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-                throw new GFacException(e.getMessage(), e);
-            }finally{
-                closeZK(jobExecutionContext);
-            }
+        } catch (Exception e) {
+            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+            throw new GFacException(e.getMessage(), e);
         }
+    }
 
-	private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
-		// Scheduler will decide the execution flow of handlers and provider
-		// which handles
-		// the job.
-		String experimentID = jobExecutionContext.getExperimentID();
-		try {
-			Scheduler.schedule(jobExecutionContext);
+    private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
+        // Scheduler will decide the execution flow of handlers and provider
+        // which handles
+        // the job.
+        String experimentID = jobExecutionContext.getExperimentID();
+        try {
+            Scheduler.schedule(jobExecutionContext);
 
-			// Executing in handlers in the order as they have configured in
-			// GFac configuration
-			// here we do not skip handler if some handler does not have to be
-			// run again during re-run it can implement
-			// that logic in to the handler
+            // Executing in handlers in the order as they have configured in
+            // GFac configuration
+            // here we do not skip handler if some handler does not have to be
+            // run again during re-run it can implement
+            // that logic in to the handler
 
             // After executing the in handlers provider instance should be set
             // to job execution context.
@@ -637,31 +605,31 @@ public class BetterGfacImpl implements GFac,Watcher {
                 default:
                     throw new GFacException("Un-handled GfacExperimentState : " + state.name());
             }
-		} catch (Exception e) {
-            log.error(e.getMessage(),e);
-			try {
-				// we make the experiment as failed due to exception scenario
-				monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 JobIdentifier jobIdentity = new JobIdentifier(
                         jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
-			} catch (NullPointerException e1) {
-				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
+                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
 
-			}
-			jobExecutionContext.setProperty(ERROR_SENT, "true");
-			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
             throw new GFacException(e.getMessage(), e);
         }
     }
@@ -678,34 +646,34 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
-		// Scheduler will decide the execution flow of handlers and provider
-		// which handles
-		// the job.
-		try {
-			Scheduler.schedule(jobExecutionContext);
-
-			// Executing in handlers in the order as they have configured in
-			// GFac configuration
-			// here we do not skip handler if some handler does not have to be
-			// run again during re-run it can implement
-			// that logic in to the handler
+        // Scheduler will decide the execution flow of handlers and provider
+        // which handles
+        // the job.
+        try {
+            Scheduler.schedule(jobExecutionContext);
+
+            // Executing in handlers in the order as they have configured in
+            // GFac configuration
+            // here we do not skip handler if some handler does not have to be
+            // run again during re-run it can implement
+            // that logic in to the handler
             if (!isCancelling(jobExecutionContext)) {
                 invokeInFlowHandlers(jobExecutionContext); // to keep the
                 // consistency we always
                 // try to re-run to
                 // avoid complexity
-            }else{
+            } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
                 GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
             }
             // if (experimentID != null){
-			// registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
-			// }
+            // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+            // }
 
-			// After executing the in handlers provider instance should be set
-			// to job execution context.
-			// We get the provider instance and execute it.
+            // After executing the in handlers provider instance should be set
+            // to job execution context.
+            // We get the provider instance and execute it.
             if (!isCancelling(jobExecutionContext)) {
                 invokeProviderExecute(jobExecutionContext);
             } else {
@@ -713,53 +681,53 @@ public class BetterGfacImpl implements GFac,Watcher {
                 GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return;
             }
-            } catch (Exception e) {
-			try {
-				// we make the experiment as failed due to exception scenario
-				monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-				// monitorPublisher.publish(new
-				// ExperimentStatusChangedEvent(new
-				// ExperimentIdentity(jobExecutionContext.getExperimentID()),
-				// ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
-				// monitorPublisher.publish(new TaskStatusChangeRequest(
-				// new TaskIdentity(jobExecutionContext.getExperimentID(),
-				// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-				// jobExecutionContext.getTaskData().getTaskID()),
-				// TaskState.FAILED
-				// ));
+        } catch (Exception e) {
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                // monitorPublisher.publish(new
+                // ExperimentStatusChangedEvent(new
+                // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                // ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                // monitorPublisher.publish(new TaskStatusChangeRequest(
+                // new TaskIdentity(jobExecutionContext.getExperimentID(),
+                // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                // jobExecutionContext.getTaskData().getTaskID()),
+                // TaskState.FAILED
+                // ));
                 JobIdentifier jobIdentity = new JobIdentifier(
-                        jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-			} catch (NullPointerException e1) {
-				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-				//monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
+                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+                //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
                 monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
-			}
-			jobExecutionContext.setProperty(ERROR_SENT, "true");
-			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
             throw new GFacException(e.getMessage(), e);
-		}
+        }
     }
 
-    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -767,13 +735,13 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             if (submit) {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-                GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
-                GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
                 if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
                     initProvider(provider, jobExecutionContext);
                     executeProvider(provider, jobExecutionContext);
@@ -781,18 +749,16 @@ public class BetterGfacImpl implements GFac,Watcher {
                 } else {
                     provider.recover(jobExecutionContext);
                 }
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             } else {
                 disposeProvider(provider, jobExecutionContext);
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             }
         }
 
-        if (GFacUtils.isSynchronousMode(jobExecutionContext))
-
-        {
+        if (GFacUtils.isSynchronousMode(jobExecutionContext))  {
             invokeOutFlowHandlers(jobExecutionContext);
         }
 
@@ -812,12 +778,12 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     // TODO - Did refactoring, but need to recheck the logic again.
-    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 cancelProvider(provider, jobExecutionContext);
@@ -825,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             } else {
                 provider.recover(jobExecutionContext);
             }
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
 
@@ -885,11 +851,11 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
-                if(!isCancelling(jobExecutionContext)) {
+                if (!isCancelling(jobExecutionContext)) {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
-                        GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                        GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
                         handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                         handler = handlerClass.newInstance();
                         handler.initProperties(handlerClassName.getProperties());
@@ -902,12 +868,12 @@ public class BetterGfacImpl implements GFac,Watcher {
                     }
                     try {
                         handler.invoke(jobExecutionContext);
-                        GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                        GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         // if exception thrown before that we do not make it finished
                     } catch (GFacHandlerException e) {
                         throw new GFacException("Error Executing a InFlow Handler", e.getCause());
                     }
-                }else{
+                } else {
                     log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
                     GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                     break;
@@ -916,38 +882,18 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKED));
         } catch (Exception e) {
-            throw new GFacException("Error Invoking Handlers:"+e.getMessage(), e);
+            throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
         }
     }
 
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         String experimentPath = null;
         try {
-            try {
-                if(jobExecutionContext.getZk()!=null){
-                    closeZK(jobExecutionContext);
-                }
-                jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
-                zk = jobExecutionContext.getZk();
-                log.info("Waiting until zookeeper client connect to the server...");
-                synchronized (mutex) {
-                    mutex.wait(5000);  // waiting for the syncConnected event
-                }
-                experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
-                if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
-                    log.error("Experiment is already finalized so no output handlers will be invoked");
-                    return;
-                }
-            } catch (IOException e) {
-                log.error(e.getMessage(), e);
-            } catch (ApplicationSettingsException e) {
-                log.error(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                log.error(e.getMessage(), e);
-            } catch (KeeperException e) {
-                log.error(e.getMessage(), e);
+            experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+            if (curatorClient.checkExists().forPath(experimentPath) == null) {
+                log.error("Experiment is already finalized so no output handlers will be invoked");
+                return;
             }
-
             GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
             List<GFacHandlerConfig> handlers = null;
             if (gFacConfiguration != null) {
@@ -968,23 +914,20 @@ public class BetterGfacImpl implements GFac,Watcher {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
-                            GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext, handlerClassName.getClassName());
+                            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
                             handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                             handler = handlerClass.newInstance();
                             handler.initProperties(handlerClassName.getProperties());
                         } catch (ClassNotFoundException e) {
                             log.error(e.getMessage());
                             throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                        } catch (InstantiationException e) {
-                            log.error(e.getMessage());
-                            throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                        } catch (IllegalAccessException e) {
+                        } catch (InstantiationException | IllegalAccessException e) {
                             log.error(e.getMessage());
                             throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
                         }
                         try {
                             handler.invoke(jobExecutionContext);
-                            GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         } catch (Exception e) {
                             GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
                             try {
@@ -1008,10 +951,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             } catch (Exception e) {
                 throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
-        } finally{
-            closeZK(jobExecutionContext);
         }
 
         // At this point all the execution is finished so we update the task and experiment statuses.
@@ -1029,16 +970,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     }
 
-    private void closeZK(JobExecutionContext jobExecutionContext) {
-        try {
-            if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
-                jobExecutionContext.getZk().close();
-            }
-        } catch (InterruptedException e) {
-            log.error(e.getMessage(), e);
-        }
-    }
-
     /**
      * If handlers ran successfully we re-run only recoverable handlers
      * If handler never ran we run the normal invoke method
@@ -1058,8 +989,8 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                     handler = handlerClass.newInstance();
-                    GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
-                    GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+                    GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+                    GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
                     handler.initProperties(handlerClassName.getProperties());
                     if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
                         log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
@@ -1069,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
                         handler.recover(jobExecutionContext);
                     }
-                    GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                    GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                 } catch (GFacHandlerException e) {
                     throw new GFacException("Error Executing a InFlow Handler", e.getCause());
                 } catch (ClassNotFoundException e) {
@@ -1086,7 +1017,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             } catch (GFacException e1) {
                 log.error(e1.getLocalizedMessage());
             }
@@ -1117,8 +1048,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                 handler = handlerClass.newInstance();
-                GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
-                GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
                 if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
                     log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
                     handler.initProperties(handlerClassName.getProperties());
@@ -1127,12 +1058,12 @@ public class BetterGfacImpl implements GFac,Watcher {
                     // if these already ran we re-run only recoverable handlers
                     handler.recover(jobExecutionContext);
                 }
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
             } catch (ClassNotFoundException e) {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1142,7 +1073,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1152,7 +1083,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1163,13 +1094,11 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
                 throw new GFacException("Error Executing a OutFlow Handler", e);
-            }finally {
-                closeZK(jobExecutionContext);
             }
         }
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
@@ -1214,23 +1143,14 @@ public class BetterGfacImpl implements GFac,Watcher {
         return registry;
     }
 
-    public ZooKeeper getZk() {
-        return zk;
-    }
-
-    public void setZk(ZooKeeper zk) {
-        this.zk = zk;
-    }
-
-
-    public boolean isCancelled(JobExecutionContext executionContext){
+    public boolean isCancelled(JobExecutionContext executionContext) {
         // we should check whether experiment is cancelled using registry
         try {
-            ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null){
+            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+            if (status != null) {
                 ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null){
-                    if(experimentState == ExperimentState.CANCELED){
+                if (experimentState != null) {
+                    if (experimentState == ExperimentState.CANCELED) {
                         return true;
                     }
                 }
@@ -1241,14 +1161,14 @@ public class BetterGfacImpl implements GFac,Watcher {
         return false;
     }
 
-    public boolean isCancelling(JobExecutionContext executionContext){
+    public boolean isCancelling(JobExecutionContext executionContext) {
         // check whether cancelling request came
         try {
-            ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null){
+            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+            if (status != null) {
                 ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null){
-                    if(experimentState == ExperimentState.CANCELING){
+                if (experimentState != null) {
+                    if (experimentState == ExperimentState.CANCELING) {
                         return true;
                     }
                 }
@@ -1276,43 +1196,4 @@ public class BetterGfacImpl implements GFac,Watcher {
         return false;
     }
 
-    public void process(WatchedEvent watchedEvent) {
-        log.info(watchedEvent.getPath());
-        if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
-            // node data is changed, this means node is cancelled.
-            log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
-        }
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            log.info(state.name());
-            switch (state) {
-                case SyncConnected:
-                    mutex.notify();
-                    break;
-                case Expired:
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        log.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        log.error(e.getMessage(), e);
-                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-                case Disconnected:
-//                    try {
-//                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-//                    } catch (IOException e) {
-//                        log.error(e.getMessage(), e);
-//                    } catch (ApplicationSettingsException e) {
-//                        log.error(e.getMessage(), e);
-//                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-                    log.info("ZK Connection is " + state.toString());
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index ee28c1d..eef0a33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -48,7 +48,7 @@ public abstract class AbstractHandler implements GFacHandler {
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         try {
-            GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+            GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
         } catch (Exception e) {
             logger.error("Error saving Recoverable provider state", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 58ef855..c6ada52 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -34,7 +34,7 @@ public class AppDescriptorCheckHandler implements GFacHandler {
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         logger.info("Invoking ApplicationDescriptorCheckHandler ...");
         try {
-            GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+            GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
         } catch (Exception e) {
             logger.info("Error saving plugin status to ZK");
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 836e2c6..84d72fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -20,31 +20,28 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProducer;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
 
 public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
     private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
 
-    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
 
     private static Integer mutex = -1;
 
@@ -57,39 +54,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                 + File.separator + statusChangeRequest.getMonitorID().getExperimentID();
         Stat exists = null;
         if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
-            try {
-                if (!zk.getState().isConnected()) {
-                    String zkhostPort = AiravataZKUtils.getZKhostPort();
-                    zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-                    logger.info("Waiting for zookeeper to connect to the server");
-                    synchronized (mutex) {
-                        mutex.wait(5000);
-                    }
-                }
-                exists = zk.exists(experimentPath, false);
-                if (exists == null) {
-                    logger.error("ZK path: " + experimentPath + " does not exists !!");
-                    logger.error("Zookeeper is in an inconsistent state !!! ");
-                    return;
-                }
-            } catch (KeeperException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
-            } catch (IOException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
+            exists = curatorClient.checkExists().forPath(experimentPath);
+            if (exists == null) {
+                logger.error("ZK path: " + experimentPath + " does not exists !!");
+                return;
             }
-            Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+            Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
             if (state == null) {
                 // state znode has to be created
-                zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
+                        forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                                String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
             } else {
-                zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+                curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
             }
         }
         switch (statusChangeRequest.getState()) {
@@ -107,8 +85,8 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
 
     public void setup(Object... configurations) {
         for (Object configuration : configurations) {
-            if (configuration instanceof ZooKeeper) {
-                this.zk = (ZooKeeper) configuration;
+            if (configuration instanceof CuratorFramework) {
+                this.curatorClient = (CuratorFramework) configuration;
             }
         }
     }


[2/4] airavata git commit: Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.

Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 0810bfd..f41f29c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -38,22 +38,37 @@ import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.states.GfacHandlerState;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
 import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.appcatalog.computeresource.*;
+import org.apache.airavata.model.appcatalog.computeresource.LOCALSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.SSHJobSubmission;
+import org.apache.airavata.model.appcatalog.computeresource.UnicoreJobSubmission;
 import org.apache.airavata.model.messaging.event.JobIdentifier;
 import org.apache.airavata.model.messaging.event.JobStatusChangeRequestEvent;
 import org.apache.airavata.model.messaging.event.TaskIdentifier;
 import org.apache.airavata.model.messaging.event.TaskStatusChangeRequestEvent;
-import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.model.workspace.experiment.ActionableGroup;
+import org.apache.airavata.model.workspace.experiment.CorrectiveAction;
+import org.apache.airavata.model.workspace.experiment.ErrorCategory;
+import org.apache.airavata.model.workspace.experiment.ErrorDetails;
+import org.apache.airavata.model.workspace.experiment.Experiment;
+import org.apache.airavata.model.workspace.experiment.ExperimentState;
+import org.apache.airavata.model.workspace.experiment.JobDetails;
+import org.apache.airavata.model.workspace.experiment.JobState;
+import org.apache.airavata.model.workspace.experiment.JobStatus;
+import org.apache.airavata.model.workspace.experiment.TaskState;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.ChildDataType;
 import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,19 +77,33 @@ import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.xml.xpath.*;
-
-import java.io.*;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 //import org.apache.airavata.commons.gfac.type.ActualParameter;
 
 public class GFacUtils {
 	private final static Logger log = LoggerFactory.getLogger(GFacUtils.class);
+	public static final ArrayList<ACL> OPEN_ACL_UNSAFE = ZooDefs.Ids.OPEN_ACL_UNSAFE;
 
 	private GFacUtils() {
 	}
@@ -268,11 +297,10 @@ public class GFacUtils {
         return map;
     }
 
-	public static GfacExperimentState getZKExperimentState(ZooKeeper zk,
+	public static GfacExperimentState getZKExperimentState(CuratorFramework curatorClient,
 			JobExecutionContext jobExecutionContext)
-			throws ApplicationSettingsException, KeeperException,
-			InterruptedException {
-		String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
+			throws Exception {
+		String expState = AiravataZKUtils.getExpState(curatorClient, jobExecutionContext
 				.getExperimentID());
         if (expState == null || expState.isEmpty()) {
             return GfacExperimentState.UNKNOWN;
@@ -280,136 +308,86 @@ public class GFacUtils {
         return GfacExperimentState.findByValue(Integer.valueOf(expState));
     }
 
-	public static int getZKExperimentStateValue(ZooKeeper zk,
-			JobExecutionContext jobExecutionContext)
-			throws ApplicationSettingsException, KeeperException,
-			InterruptedException {
-		String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
-				.getExperimentID());
-		if (expState == null) {
-			return -1;
-		}
-		return Integer.parseInt(expState);
-	}
-
-    public static int getZKExperimentStateValue(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
-            KeeperException, InterruptedException {
-        Stat exists = zk.exists(fullPath+File.separator+"state", false);
-        if (exists != null) {
-            return Integer.parseInt(new String(zk.getData(fullPath+File.separator+"state", false, exists)));
-        }
-        return -1;
-    }
-
-	public static boolean createHandlerZnode(ZooKeeper zk,
+	public static boolean createHandlerZnode(CuratorFramework curatorClient,
                                              JobExecutionContext jobExecutionContext, String className)
-			throws ApplicationSettingsException, KeeperException,
-			InterruptedException {
+			throws Exception {
 		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
 				jobExecutionContext.getExperimentID(), className);
-		Stat exists = zk.exists(expState, false);
+		Stat exists = curatorClient.checkExists().forPath(expState);
 		if (exists == null) {
-			zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-					CreateMode.PERSISTENT);
-
-			zk.create(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
-					ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(expState, new byte[0]);
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
 		} else {
-			exists = zk.exists(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
 			if (exists == null) {
-				zk.create(expState + File.separator
-						+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-						new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
+				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
 			}
 		}
 
-		exists = zk.exists(expState + File.separator
-				+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
 		if (exists != null) {
-			zk.setData(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-					String.valueOf(GfacHandlerState.INVOKING.getValue())
-							.getBytes(), exists.getVersion());
+			curatorClient.setData().withVersion(exists.getVersion())
+					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+							String.valueOf(GfacHandlerState.INVOKING.getValue()).getBytes());
 		}
 		return true;
 	}
 
-	public static boolean createHandlerZnode(ZooKeeper zk,
+	public static boolean createHandlerZnode(CuratorFramework curatorClient,
                                              JobExecutionContext jobExecutionContext, String className,
-                                             GfacHandlerState state) throws ApplicationSettingsException,
-			KeeperException, InterruptedException {
+                                             GfacHandlerState state) throws Exception {
 		String expState = AiravataZKUtils.getExpZnodeHandlerPath(
 				jobExecutionContext.getExperimentID(), className);
-		Stat exists = zk.exists(expState, false);
+		Stat exists = curatorClient.checkExists().forPath(expState);
 		if (exists == null) {
-			zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-					CreateMode.PERSISTENT);
-
-			zk.create(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0],
-					ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(expState, new byte[0]);
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
 		} else {
-			exists = zk.exists(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+			exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
 			if (exists == null) {
-				zk.create(expState + File.separator
-						+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-						new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-						CreateMode.PERSISTENT);
+				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+						.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, new byte[0]);
 			}
 		}
 
-		exists = zk.exists(expState + File.separator
-				+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+		exists = curatorClient.checkExists().forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
 		if (exists != null) {
-			zk.setData(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-					String.valueOf(state.getValue()).getBytes(),
-					exists.getVersion());
+			curatorClient.setData().withVersion(exists.getVersion())
+					.forPath(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+							String.valueOf(state.getValue()).getBytes());
 		}
 		return true;
 	}
 
-	public static boolean updateHandlerState(ZooKeeper zk,
+	public static boolean updateHandlerState(CuratorFramework curatorClient,
                                              JobExecutionContext jobExecutionContext, String className,
-                                             GfacHandlerState state) throws ApplicationSettingsException,
-			KeeperException, InterruptedException {
-		if(zk.getState().isConnected()) {
-			String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-					jobExecutionContext.getExperimentID(), className);
-
-			Stat exists = zk.exists(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
-			if (exists != null) {
-				zk.setData(expState + File.separator
-								+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-						String.valueOf(state.getValue()).getBytes(),
-						exists.getVersion());
-			} else {
-				createHandlerZnode(zk, jobExecutionContext, className, state);
-			}
-			return true;
+                                             GfacHandlerState state) throws Exception {
+		String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath(
+				jobExecutionContext.getExperimentID(), className);
+		Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
+		if (exists != null) {
+			curatorClient.setData().withVersion(exists.getVersion())
+					.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes());
+		} else {
+			createHandlerZnode(curatorClient, jobExecutionContext, className, state);
 		}
 		return false;
 	}
 
-	public static GfacHandlerState getHandlerState(ZooKeeper zk,
+	public static GfacHandlerState getHandlerState(CuratorFramework curatorClient,
                                                   JobExecutionContext jobExecutionContext, String className) {
 		try {
-			String expState = AiravataZKUtils.getExpZnodeHandlerPath(
-					jobExecutionContext.getExperimentID(), className);
-
-			Stat exists = zk.exists(expState + File.separator
-					+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+			String handlerPath = AiravataZKUtils.getExpZnodeHandlerPath( jobExecutionContext.getExperimentID(), className);
+			Stat exists = curatorClient.checkExists().forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
 			if (exists != null) {
-                String stateVal = new String(zk.getData(expState + File.separator
-                                + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false,
-                        exists));
-                return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
-            }
+				String stateVal = new String(curatorClient.getData().storingStatIn(exists)
+						.forPath(handlerPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE));
+				return GfacHandlerState.findByValue(Integer.valueOf(stateVal));
+			}
 			return GfacHandlerState.UNKNOWN; // if the node doesn't exist or any other error we
 							// return false
 		} catch (Exception e) {
@@ -420,136 +398,104 @@ public class GFacUtils {
 
 	// This method is dangerous because of moving the experiment data
 	public static boolean createExperimentEntryForPassive(String experimentID,
-														  String taskID, ZooKeeper zk, String experimentNode,
-														  String pickedChild, String tokenId, long deliveryTag) throws KeeperException,
-			InterruptedException, ApplicationSettingsException {
+														  String taskID, CuratorFramework curatorClient, String experimentNode,
+														  String pickedChild, String tokenId, long deliveryTag) throws Exception {
 		String experimentPath = experimentNode + File.separator + pickedChild;
 		String newExperimentPath = experimentPath + File.separator + experimentID;
-		Stat exists1 = zk.exists(newExperimentPath, false);
-		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, zk);
+		Stat exists1 = curatorClient.checkExists().forPath(newExperimentPath);
+		String oldExperimentPath = GFacUtils.findExperimentEntry(experimentID, curatorClient);
 		if (oldExperimentPath == null) {  // this means this is a very new experiment
 			// are going to create a new node
 			log.info("This is a new Job, so creating all the experiment docs from the scratch");
-
-			zk.create(newExperimentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-					CreateMode.PERSISTENT);
-
-            String s = zk.create(newExperimentPath + File.separator + "state", String
-							.valueOf(GfacExperimentState.LAUNCHED.getValue())
-							.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-					CreateMode.PERSISTENT);
-
-			if(zk.exists(s,false)!=null){
-				log.info("Created the node: "+s+" successfully !");
-			}else{
-				log.error("Error creating node: "+s+" successfully !");
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath, new byte[0]);
+            String stateNodePath = curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(newExperimentPath + File.separator + "state",
+							String .valueOf(GfacExperimentState.LAUNCHED.getValue()) .getBytes());
+
+			if(curatorClient.checkExists().forPath(stateNodePath)!=null) {
+				log.info("Created the node: " + stateNodePath + " successfully !");
+			}else {
+				log.error("Error creating node: " + stateNodePath + " successfully !");
 			}
-			zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag), ZooDefs.Ids.OPEN_ACL_UNSAFE,  // here we store the value of delivery message
-					CreateMode.PERSISTENT);
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag));
 		} else {
 			log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " was running by some Gfac instance,but it failed");
-            removeCancelDeliveryTagNode(oldExperimentPath, zk); // remove previous cancel deliveryTagNode
+            removeCancelDeliveryTagNode(oldExperimentPath, curatorClient); // remove previous cancel deliveryTagNode
             if(newExperimentPath.equals(oldExperimentPath)){
                 log.info("Re-launch experiment came to the same GFac instance");
             }else {
 				log.info("Re-launch experiment came to a new GFac instance so we are moving data to new gfac node");
-				zk.create(newExperimentPath, zk.getData(oldExperimentPath, false, exists1),
-						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // recursively copy children
-                copyChildren(zk, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
+				curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE).forPath(newExperimentPath,
+						curatorClient.getData().storingStatIn(exists1).forPath(oldExperimentPath)); // recursively copy children
+                copyChildren(curatorClient, oldExperimentPath, newExperimentPath, 2); // we need to copy children up to depth 2
 				String oldDeliveryTag = oldExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX;
-				Stat exists = zk.exists(oldDeliveryTag, false);
+				Stat exists = curatorClient.checkExists().forPath(oldDeliveryTag);
 				if(exists!=null) {
-					zk.create(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
-							zk.getData(oldDeliveryTag,null,exists),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-					ZKUtil.deleteRecursive(zk,oldDeliveryTag);
+					curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+							.forPath(newExperimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX,
+									curatorClient.getData().storingStatIn(exists).forPath(oldDeliveryTag));
+					ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldDeliveryTag, true);
 				}
 				// After all the files are successfully transfered we delete the // old experiment,otherwise we do
 				// not delete a single file
 				log.info("After a successful copying of experiment data for an old experiment we delete the old data");
 				log.info("Deleting experiment data: " + oldExperimentPath);
-				ZKUtil.deleteRecursive(zk, oldExperimentPath);
+				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), oldExperimentPath, true);
 			}
 		}
 		return true;
 	}
 
-    private static void removeCancelDeliveryTagNode(String experimentPath, ZooKeeper zk) throws KeeperException, InterruptedException {
-        Stat exists = zk.exists(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+    private static void removeCancelDeliveryTagNode(String experimentPath, CuratorFramework curatorClient) throws Exception {
+        Stat exists = curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
         if (exists != null) {
-            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
-        }
-    }
+			ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
+		}
+	}
 
-    private static void copyChildren(ZooKeeper zk, String oldPath, String newPath, int depth) throws KeeperException, InterruptedException {
-        for (String childNode : zk.getChildren(oldPath, false)) {
+    private static void copyChildren(CuratorFramework curatorClient, String oldPath, String newPath, int depth) throws Exception {
+        for (String childNode : curatorClient.getChildren().forPath(oldPath)) {
             String oldChildPath = oldPath + File.separator + childNode;
-            Stat stat = zk.exists(oldChildPath, false); // no need to check exists
+            Stat stat = curatorClient.checkExists().forPath(oldChildPath); // no need to check exists
             String newChildPath = newPath + File.separator + childNode;
             log.info("Creating new znode: " + newChildPath);
-            zk.create(newChildPath, zk.getData(oldChildPath, false, stat), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            if (--depth > 0) {
-                copyChildren(zk , oldChildPath, newChildPath, depth );
-            }
-        }
-    }
-
-    /**
-	 * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
-	 * through gfac-server nodes
-	 * @param experimentID
-	 * @param zk
-	 * @return
-	 * @throws KeeperException
-	 * @throws InterruptedException
-	 */
-    public static String findExperimentEntry(String experimentID, ZooKeeper zk
-                                                ) throws KeeperException,
-            InterruptedException {
-        String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-        List<String> children = zk.getChildren(experimentNode, false);
-        for(String pickedChild:children) {
-            String experimentPath = experimentNode + File.separator + pickedChild;
-            String newExpNode = experimentPath + File.separator + experimentID;
-            Stat exists = zk.exists(newExpNode, false);
-            if(exists == null){
-                continue;
-            }else{
-                return newExpNode;
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(newChildPath, curatorClient.getData().storingStatIn(stat).forPath(oldChildPath));
+			if (--depth > 0) {
+                copyChildren(curatorClient , oldChildPath, newChildPath, depth );
             }
         }
-        return null;
     }
 
 	/**
 	 * This will return a value if the server is down because we iterate through exisiting experiment nodes, not
 	 * through gfac-server nodes
+	 *
 	 * @param experimentID
-	 * @param zk
+	 * @param curatorClient
 	 * @return
 	 * @throws KeeperException
 	 * @throws InterruptedException
 	 */
-	public static String findExperimentEntryPassive(String experimentID, ZooKeeper zk
-	) throws KeeperException,
-			InterruptedException {
+	public static String findExperimentEntry(String experimentID, CuratorFramework curatorClient) throws Exception {
 		String experimentNode = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-		List<String> children = zk.getChildren(experimentNode, false);
-		for(String pickedChild:children) {
+		List<String> children = curatorClient.getChildren().forPath(experimentNode);
+		for (String pickedChild : children) {
 			String experimentPath = experimentNode + File.separator + pickedChild;
 			String newExpNode = experimentPath + File.separator + experimentID;
-			Stat exists = zk.exists(newExpNode, false);
-			if(exists == null){
+			Stat exists = curatorClient.checkExists().forPath(newExpNode);
+			if (exists == null) {
 				continue;
-			}else{
+			} else {
 				return newExpNode;
 			}
 		}
 		return null;
 	}
 
-    public static boolean setExperimentCancel(String experimentId, ZooKeeper zk, long deliveryTag) throws KeeperException,
-            InterruptedException {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+    public static boolean setExperimentCancel(String experimentId, CuratorFramework curatorClient, long deliveryTag) throws Exception {
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
         if (experimentEntry == null) {
             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
@@ -557,51 +503,47 @@ public class GFacUtils {
             return false;
         } else {
             // check cancel operation is being processed for the same experiment.
-            Stat cancelState = zk.exists(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, false);
+            Stat cancelState = curatorClient.checkExists().forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
             if (cancelState != null) {
                 // another cancel operation is being processed. only one cancel operation can exist for a given experiment.
                 return false;
             }
 
-            zk.create(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag),
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); // save cancel delivery tag to be acknowledge at the end.
-            return true;
+			curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(OPEN_ACL_UNSAFE)
+					.forPath(experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, longToBytes(deliveryTag)); // save cancel delivery tag to be acknowledge at the end.
+			return true;
         }
 
     }
-    public static boolean isCancelled(String experimentID, ZooKeeper zk
-    ) throws KeeperException,
-            InterruptedException {
-		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, zk);
-
+    public static boolean isCancelled(String experimentID, CuratorFramework curatorClient ) throws Exception {
+		String experimentEntry = GFacUtils.findExperimentEntry(experimentID, curatorClient);
         if(experimentEntry == null){
             return false;
         }else {
-            Stat exists = zk.exists(experimentEntry, false);
+            Stat exists = curatorClient.checkExists().forPath(experimentEntry);
             if (exists != null) {
-                String operation = new String(zk.getData(experimentEntry+File.separator+"operation", false, exists));
-                if ("cancel".equals(operation)) {
-                    return true;
-                }
-            }
-        }
+				String operation = new String(curatorClient.getData().storingStatIn(exists).forPath(experimentEntry + File.separator + "operation"));
+				if ("cancel".equals(operation)) {
+					return true;
+				}
+			}
+		}
         return false;
     }
 
     public static void saveHandlerData(JobExecutionContext jobExecutionContext,
                                        StringBuffer data, String className) throws GFacHandlerException {
 		try {
-			ZooKeeper zk = jobExecutionContext.getZk();
-			if (zk != null) {
+			CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+			if (curatorClient != null) {
 				String expZnodeHandlerPath = AiravataZKUtils
 						.getExpZnodeHandlerPath(
 								jobExecutionContext.getExperimentID(),
 								className);
-				Stat exists = zk.exists(expZnodeHandlerPath, false);
+				Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
                 if (exists != null) {
-                    zk.setData(expZnodeHandlerPath, data.toString().getBytes(),
-                            exists.getVersion());
-                } else {
+					curatorClient.setData().withVersion(exists.getVersion()).forPath(expZnodeHandlerPath, data.toString().getBytes());
+				} else {
                     log.error("Saving Handler data failed, Stat is null");
                 }
             }
@@ -610,18 +552,15 @@ public class GFacUtils {
 		}
 	}
 
-	public static String getHandlerData(JobExecutionContext jobExecutionContext,
-                                        String className) throws ApplicationSettingsException,
-			KeeperException, InterruptedException {
-		ZooKeeper zk = jobExecutionContext.getZk();
-		if (zk != null) {
+	public static String getHandlerData(JobExecutionContext jobExecutionContext, String className) throws Exception {
+		CuratorFramework curatorClient = jobExecutionContext.getCuratorClient();
+		if (curatorClient != null) {
 			String expZnodeHandlerPath = AiravataZKUtils
 					.getExpZnodeHandlerPath(
 							jobExecutionContext.getExperimentID(),
 							className);
-			Stat exists = zk.exists(expZnodeHandlerPath, false);
-			return new String(jobExecutionContext.getZk().getData(
-					expZnodeHandlerPath, false, exists));
+			Stat exists = curatorClient.checkExists().forPath(expZnodeHandlerPath);
+			return new String(jobExecutionContext.getCuratorClient().getData().storingStatIn(exists).forPath(expZnodeHandlerPath));
 		}
 		return null;
 	}
@@ -664,18 +603,6 @@ public class GFacUtils {
         }
     }
 
-    public static GlobusJobSubmission getGlobusJobSubmission (String submissionId) throws AppCatalogException{
-        return null;
-//        try {
-//            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-//            return appCatalog.getComputeResource().getGlobus(submissionId);
-//        }catch (Exception e){
-//            String errorMsg = "Error while retrieving local job submission with submission id : " + submissionId;
-//            log.error(errorMsg, e);
-//            throw new AppCatalogException(errorMsg, e);
-//        }
-    }
-
     public static SSHJobSubmission getSSHJobSubmission (String submissionId) throws AppCatalogException{
         try {
             AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
@@ -687,17 +614,6 @@ public class GFacUtils {
         }
     }
 
-    public static CloudJobSubmission getCloudJobSubmission (String submissionId) throws AppCatalogException{
-        try {
-            AppCatalog appCatalog = AppCatalogFactory.getAppCatalog();
-            return appCatalog.getComputeResource().getCloudJobSubmission(submissionId);
-        }catch (Exception e){
-            String errorMsg = "Error while retrieving SSH job submission with submission id : " + submissionId;
-            log.error(errorMsg, e);
-            throw new AppCatalogException(errorMsg, e);
-        }
-    }
-
     /**
      * To convert list to separated value
      * @param listOfStrings
@@ -764,21 +680,21 @@ public class GFacUtils {
         return false;
     }
 
-    public static boolean ackCancelRequest(String experimentId, ZooKeeper zk) throws KeeperException, InterruptedException {
-        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, zk);
+    public static boolean ackCancelRequest(String experimentId, CuratorFramework curatorClient) throws Exception {
+        String experimentEntry = GFacUtils.findExperimentEntry(experimentId, curatorClient);
         String cancelNodePath = experimentEntry + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
         if (experimentEntry == null) {
             // This should be handle in validation request. Gfac shouldn't get any invalidate experiment.
             log.error("Cannot find the experiment Entry, so cancel operation cannot be performed. " +
-                    "This happen when experiment completed and already removed from the zookeeper");
+                    "This happen when experiment completed and already removed from the CuratorFramework");
         } else {
             // check cancel operation is being processed for the same experiment.
-            Stat cancelState = zk.exists(cancelNodePath, false);
+            Stat cancelState = curatorClient.checkExists().forPath(cancelNodePath);
             if (cancelState != null) {
-                ZKUtil.deleteRecursive(zk,cancelNodePath);
-                return true;
-            }
-        }
+				ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), cancelNodePath, true);
+				return true;
+			}
+		}
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index c0784fb..540eaac 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -264,11 +264,7 @@ public class GSISSHProvider extends AbstractProvider {
                 }
                 return;
             }
-        } catch (ApplicationSettingsException e) {
-            log.error("Error while  recovering provider", e);
-        } catch (KeeperException e) {
-            log.error("Error while  recovering provider", e);
-        } catch (InterruptedException e) {
+        } catch (Exception e) {
             log.error("Error while  recovering provider", e);
         }
         try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
index 15b7241..e53fe09 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/util/CommonUtils.java
@@ -20,10 +20,8 @@
 */
 package org.apache.airavata.gfac.monitor.util;
 
-import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.logger.AiravataLogger;
 import org.apache.airavata.common.logger.AiravataLoggerFactory;
-import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -34,15 +32,16 @@ import org.apache.airavata.gfac.monitor.HostMonitorData;
 import org.apache.airavata.gfac.monitor.UserMonitorData;
 import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
 import org.apache.airavata.model.appcatalog.computeresource.ComputeResourceDescription;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CountDownLatch;
 
 public class CommonUtils {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(CommonUtils.class);
@@ -189,53 +188,38 @@ public class CommonUtils {
 
         /**
          *  Update job count for a given set of paths.
-         * @param zk - zookeeper instance
+         * @param curatorClient - CuratorFramework instance
          * @param changeCountMap - map of change job count with relevant path
          * @param isAdd - Should add or reduce existing job count by the given job count.
          */
-    public static void updateZkWithJobCount(ZooKeeper zk, final Map<String, Integer> changeCountMap, boolean isAdd) {
+    public static void updateZkWithJobCount(CuratorFramework curatorClient, final Map<String, Integer> changeCountMap, boolean isAdd) {
         StringBuilder changeZNodePaths = new StringBuilder();
         try {
-            if (zk == null || !zk.getState().isConnected()) {
-                try {
-                    final CountDownLatch countDownLatch = new CountDownLatch(1);
-                    zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), new Watcher() {
-                        @Override
-                        public void process(WatchedEvent event) {
-                            countDownLatch.countDown();
-                        }
-                    });
-                    countDownLatch.await();
-                } catch (ApplicationSettingsException e) {
-                    logger.error("Error while reading zookeeper hostport string");
-                } catch (IOException e) {
-                    logger.error("Error while reconnect attempt to zookeeper where zookeeper connection loss state");
-                }
-            }
-
             for (String path : changeCountMap.keySet()) {
                 if (isAdd) {
-                    CommonUtils.checkAndCreateZNode(zk, path);
+                    CommonUtils.checkAndCreateZNode(curatorClient, path);
                 }
-                byte[] byteData = zk.getData(path, null, null);
+                byte[] byteData = curatorClient.getData().forPath(path);
                 String nodeData;
                 if (byteData == null) {
                     if (isAdd) {
-                        zk.setData(path, String.valueOf(changeCountMap.get(path)).getBytes(), -1);
+                        curatorClient.setData().withVersion(-1).forPath(path, String.valueOf(changeCountMap.get(path)).getBytes());
                     } else {
                         // This is not possible, but we handle in case there any data zookeeper communication failure
                         logger.warn("Couldn't reduce job count in " + path + " as it returns null data. Hence reset the job count to 0");
-                        zk.setData(path, "0".getBytes(), -1);
+                        curatorClient.setData().withVersion(-1).forPath(path, "0".getBytes());
                     }
                 } else {
                     nodeData = new String(byteData);
                     if (isAdd) {
-                        zk.setData(path, String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes(), -1);
+                        curatorClient.setData().withVersion(-1).forPath(path,
+                                String.valueOf(changeCountMap.get(path) + Integer.parseInt(nodeData)).getBytes());
                     } else {
                         int previousCount = Integer.parseInt(nodeData);
                         int removeCount = changeCountMap.get(path);
                         if (previousCount >= removeCount) {
-                            zk.setData(path, String.valueOf(previousCount - removeCount).getBytes(), -1);
+                            curatorClient.setData().withVersion(-1).forPath(path,
+                                    String.valueOf(previousCount - removeCount).getBytes());
                         } else {
                             // This is not possible, do we need to reset the job count to 0 ?
                             logger.error("Requested remove job count is " + removeCount +
@@ -250,11 +234,9 @@ public class CommonUtils {
             // update stat node to trigger orchestrator watchers
             if (changeCountMap.size() > 0) {
                 changeZNodePaths.deleteCharAt(changeZNodePaths.length() - 1);
-                zk.setData("/" + Constants.STAT, changeZNodePaths.toString().getBytes(), -1);
+                curatorClient.setData().withVersion(-1).forPath("/" + Constants.STAT, changeZNodePaths.toString().getBytes());
             }
-        } catch (KeeperException e) {
-            logger.error("Error while writing job count to zookeeper", e);
-        } catch (InterruptedException e) {
+        } catch (Exception e) {
             logger.error("Error while writing job count to zookeeper", e);
         }
 
@@ -267,7 +249,7 @@ public class CommonUtils {
     public static void increaseZkJobCount(MonitorID monitorID) {
         Map<String, Integer> addMap = new HashMap<String, Integer>();
         addMap.put(CommonUtils.getJobCountUpdatePath(monitorID), 1);
-        updateZkWithJobCount(monitorID.getJobExecutionContext().getZk(), addMap, true);
+        updateZkWithJobCount(monitorID.getJobExecutionContext().getCuratorClient(), addMap, true);
     }
 
     /**
@@ -282,17 +264,17 @@ public class CommonUtils {
 
     /**
      * Check whether znode is exist in given path if not create a new znode
-     * @param zk - zookeeper instance
+     * @param curatorClient - zookeeper instance
      * @param path - path to check znode
      * @throws KeeperException
      * @throws InterruptedException
      */
-    private static void checkAndCreateZNode(ZooKeeper zk , String path) throws KeeperException, InterruptedException {
-        if (zk.exists(path, null) == null) { // if znode doesn't exist
+    private static void checkAndCreateZNode(CuratorFramework curatorClient , String path) throws Exception {
+        if (curatorClient.checkExists().forPath(path) == null) { // if znode doesn't exist
             if (path.lastIndexOf("/") > 1) {  // recursively traverse to parent znode and check parent exist
-                checkAndCreateZNode(zk, (path.substring(0, path.lastIndexOf("/"))));
+                checkAndCreateZNode(curatorClient, (path.substring(0, path.lastIndexOf("/"))));
             }
-            zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);// create a znode
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 19ea3ac..f967dbf 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -410,11 +410,7 @@ public class SSHProvider extends AbstractProvider {
                     this.execute(jobExecutionContext);
                     return;
                 }
-            } catch (ApplicationSettingsException e) {
-                log.error("Error while  recovering provider", e);
-            } catch (KeeperException e) {
-                log.error("Error while  recovering provider", e);
-            } catch (InterruptedException e) {
+            } catch (Exception e) {
                 log.error("Error while  recovering provider", e);
             }
             try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
deleted file mode 100644
index 9d40557..0000000
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.orchestrator.util;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.AiravataZKUtils;
-import org.apache.airavata.common.utils.Constants;
-import org.apache.airavata.common.utils.ServerSettings;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
-import org.apache.airavata.orchestrator.server.OrchestratorServerHandler;
-import org.apache.thrift.TException;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-public class OrchestratorRecoveryHandler implements Watcher {
-    private static Logger log = LoggerFactory.getLogger(OrchestratorRecoveryHandler.class);
-
-    private ZooKeeper zk;
-
-    private String gfacId;
-
-    private static Integer mutex = -1;
-
-    private OrchestratorServerHandler serverHandler;
-
-    public OrchestratorRecoveryHandler(OrchestratorServerHandler handler, String zkExpPath) {
-        this.zk = zk;
-        int index = zkExpPath.split(File.separator).length - 1;
-        this.gfacId = zkExpPath.split(File.separator)[index];
-        this.serverHandler = handler;
-    }
-
-    /**
-     * This method return the list of experimentId
-     *
-     * @return
-     * @throws OrchestratorException
-     * @throws ApplicationSettingsException
-     * @throws IOException
-     * @throws KeeperException
-     * @throws InterruptedException
-     */
-    public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
-        String zkhostPort = AiravataZKUtils.getZKhostPort();
-        zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-        log.info("Waiting for zookeeper to connect to the server");
-        synchronized (mutex) {
-            mutex.wait(5000);
-        }
-        List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
-                + File.separator + gfacId, false);
-            log.info("------------------ Recovering Experiments started ----------------------------------");
-        for (String expId : children) {
-            log.info("Recovering Experiment: " + expId.split("\\+")[0]);
-            log.info("------------------------------------------------------------------------------------");
-            try {
-                if(GFacUtils.isCancelled(expId.split("\\+")[0], zk)) {// during relaunching we check the operation and then launch
-                    serverHandler.terminateExperiment(expId.split("\\+")[0], null);
-                }else {
-                    serverHandler.launchExperiment(expId.split("\\+")[0], null);
-                }
-                // we do not move the old experiment in to new gfac node, gfac will do it
-            } catch (Exception e) {       // we attempt all the experiments
-                log.error(e.getMessage(), e);
-            }
-            log.info("------------------------------------------------------------------------------------");
-        }
-    }
-
-    synchronized public void process(WatchedEvent watchedEvent) {
-        log.info(watchedEvent.getPath());
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            switch (state) {
-                case SyncConnected:
-                    mutex.notify();
-                    break;
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 76b2a75..d57d85a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
 		<axiom.version>1.2.8</axiom.version>
 		<surefire.version>2.18.1</surefire.version>
 		<junit.version>4.12</junit.version>
-		<curator.version>2.7.1</curator.version>
+		<curator.version>2.8.0</curator.version>
 		<xmlbeans.version>2.5.0</xmlbeans.version>
 		<xpp3.version>1.1.6</xpp3.version>
 		<xpp5.version>1.2.8</xpp5.version>
@@ -426,6 +426,11 @@
 				<artifactId>zookeeper</artifactId>
 				<version>${zk.version}</version>
 			</dependency>
+			<dependency>
+				<groupId>org.apache.curator</groupId>
+				<artifactId>curator-framework</artifactId>
+				<version>${curator.version}</version>
+			</dependency>
 		</dependencies>
 	</dependencyManagement>
 


[4/4] airavata git commit: Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.

Posted by sh...@apache.org.
Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4a061780
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4a061780
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4a061780

Branch: refs/heads/master
Commit: 4a06178022c0cd1a215c47e3c65a7b6ffd342bbb
Parents: 915ed04
Author: Shameera Rathanyaka <sh...@gmail.com>
Authored: Thu May 28 11:26:36 2015 -0400
Committer: Shameera Rathanyaka <sh...@gmail.com>
Committed: Thu May 28 11:26:36 2015 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +-
 .../AiravataExperimentStatusUpdator.java        |  44 +-
 modules/commons/utils/pom.xml                   |   6 +-
 .../airavata/common/utils/AiravataZKUtils.java  |  61 +--
 modules/distribution/server/pom.xml             |   5 +
 modules/gfac/airavata-gfac-service/pom.xml      |   5 -
 .../airavata/gfac/server/GfacServerHandler.java | 153 ++----
 modules/gfac/gfac-core/pom.xml                  |   7 +-
 .../gfac/core/context/JobExecutionContext.java  |  46 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 483 +++++++------------
 .../gfac/core/handler/AbstractHandler.java      |   2 +-
 .../core/handler/AppDescriptorCheckHandler.java |   2 +-
 .../core/monitor/GfacInternalStatusUpdator.java |  62 +--
 .../airavata/gfac/core/utils/GFacUtils.java     | 408 +++++++---------
 .../gsissh/provider/impl/GSISSHProvider.java    |   6 +-
 .../airavata/gfac/monitor/util/CommonUtils.java |  62 +--
 .../gfac/ssh/provider/impl/SSHProvider.java     |   6 +-
 .../util/OrchestratorRecoveryHandler.java       | 109 -----
 pom.xml                                         |   7 +-
 19 files changed, 509 insertions(+), 971 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index df87344..28b9ef9 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -81,7 +81,11 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${org.slf4j.version}</version>
         </dependency>
-        
+        <dependency>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index dd7bcd0..f59179e 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -23,11 +23,13 @@ package org.apache.airavata.api.server.listener;
 import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.api.server.util.DataModelUtils;
 import org.apache.airavata.common.exception.AiravataException;
-import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.*;
+import org.apache.airavata.common.utils.AiravataUtils;
+import org.apache.airavata.common.utils.AiravataZKUtils;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.MonitorPublisher;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.messaging.core.MessageContext;
-import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.Publisher;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
 import org.apache.airavata.model.messaging.event.ExperimentStatusChangeEvent;
@@ -38,9 +40,8 @@ import org.apache.airavata.model.workspace.experiment.Experiment;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryModelType;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.ZKUtil;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,18 +50,12 @@ import java.util.Calendar;
 
 public class AiravataExperimentStatusUpdator implements AbstractActivityListener {
     private final static Logger logger = LoggerFactory.getLogger(AiravataExperimentStatusUpdator.class);
-
     private Registry airavataRegistry;
-
     private MonitorPublisher monitorPublisher;
-
     private Publisher publisher;
-
-    private ZooKeeper zk;
-
+    private CuratorFramework curatorClient;
     private RabbitMQTaskLaunchConsumer consumer;
 
-
     public Registry getAiravataRegistry() {
         return airavataRegistry;
     }
@@ -130,9 +125,9 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 		}
     }
 
-    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws KeeperException, InterruptedException, AiravataException {
+    private void cleanup(WorkflowNodeStatusChangeEvent nodeStatus, String experimentNode, String experimentPath) throws Exception {
         int count = 0;
-        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), zk,
+        long deliveryTag = AiravataZKUtils.getDeliveryTag(nodeStatus.getWorkflowNodeIdentity().getExperimentId(), curatorClient,
                 experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         if(deliveryTag>0) {
             if (ServerSettings.isGFacPassiveMode()) {
@@ -152,16 +147,18 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
                 }
             }
         }
-        if (zk.exists(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, false) != null) {
-            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX);
+        if (curatorClient.checkExists().forPath(experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX) != null) {
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                    experimentPath + AiravataZKUtils.DELIVERY_TAG_POSTFIX, true);
         }
-        if (zk.exists(experimentPath, false) != null) {
-            ZKUtil.deleteRecursive(zk, experimentPath);
+
+        if (curatorClient.checkExists().forPath(experimentPath) != null) {
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(), experimentPath, true);
         }
 
         // ack cancel operation if exist
         long cancelDT = AiravataZKUtils.getCancelDeliveryTagIfExist(nodeStatus.getWorkflowNodeIdentity().getExperimentId(),
-                zk, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
+                curatorClient, experimentNode, ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME));
         count  = 0;
         if (cancelDT > 0) {
             while (!consumer.isOpen() && count < 3) {
@@ -180,7 +177,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
             }
         }
         if (cancelDT > 0) {
-            ZKUtil.deleteRecursive(zk, experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX);
+            ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                    experimentPath + AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX, true);
         }
     }
 
@@ -211,8 +209,8 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
                 this.publisher=(Publisher) configuration;
             }else if (configuration instanceof RabbitMQTaskLaunchConsumer) {
                 this.consumer = (RabbitMQTaskLaunchConsumer) configuration;
-            }else if (configuration instanceof ZooKeeper) {
-                this.zk = (ZooKeeper) configuration;
+            }else if (configuration instanceof CuratorFramework) {
+                this.curatorClient = (CuratorFramework) configuration;
             }
 
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 44d7465..09766af 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -130,9 +130,9 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <version>3.4.0</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.thrift</groupId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
index ac617d9..f753bc1 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -20,9 +20,10 @@
 */
 package org.apache.airavata.common.utils;
 
-import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerConfig;
@@ -36,15 +37,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 public class AiravataZKUtils implements Watcher {
     private final static Logger logger = LoggerFactory.getLogger(AiravataZKUtils.class);
 
     public static final String ZK_EXPERIMENT_STATE_NODE = "state";
-
     public static final String DELIVERY_TAG_POSTFIX = "-deliveryTag";
-
     public static final String CANCEL_DELIVERY_TAG_POSTFIX = "-cancel-deliveryTag";
 
     @Override
@@ -81,35 +79,14 @@ public class AiravataZKUtils implements Watcher {
                 "state";
     }
 
-    public static String getExpState(ZooKeeper zk, String expId) throws ApplicationSettingsException,
-            KeeperException, InterruptedException {
-        Stat exists = zk.exists(getExpStatePath(expId), false);
+    public static String getExpState(CuratorFramework curatorClient, String expId) throws Exception {
+        Stat exists = curatorClient.checkExists().forPath(getExpStatePath(expId));
         if (exists != null) {
-            return new String(zk.getData(getExpStatePath(expId), false, exists));
+            return new String(curatorClient.getData().storingStatIn(exists).forPath(getExpStatePath(expId)));
         }
         return null;
     }
 
-
-    public static int getExpStateValueWithGivenPath(ZooKeeper zk,String fullPath)throws ApplicationSettingsException,
-            KeeperException, InterruptedException {
-        Stat exists = zk.exists(fullPath, false);
-        if (exists != null) {
-            return Integer.parseInt(new String(zk.getData(fullPath, false, exists)));
-        }
-        return -1;
-    }
-    public static List<String> getRunningGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
-        String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_API_SERVER_NODE, "/gfac-server");
-        return zk.getChildren(gfacServer, null);
-    }
-
-
-    public static List<String> getAllGfacNodeNames(ZooKeeper zk) throws KeeperException, InterruptedException {
-        String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-        return zk.getChildren(gfacServer, null);
-    }
-
     public static void runZKFromConfig(ServerConfig config,ServerCnxnFactory cnxnFactory) throws IOException {
         AiravataZKUtils.logger.info("Starting Zookeeper server...");
         FileTxnSnapLog txnLog = null;
@@ -177,33 +154,22 @@ public class AiravataZKUtils implements Watcher {
         }
     }
 
-    public static void storeDeliveryTag(ZooKeeper zk,String newExpNode,Double deliveryTag) throws KeeperException, InterruptedException {
-        String s = zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT);
-
-        Stat expParent = zk.exists(newExpNode, false);
-        if (expParent != null) {
-            zk.setData(newExpNode, toByteArray(deliveryTag),
-                    expParent.getVersion());
-        }
-    }
-
     public static byte[] toByteArray(double value) {
         byte[] bytes = new byte[8];
         ByteBuffer.wrap(bytes).putDouble(value);
         return bytes;
     }
 
-    public static long getDeliveryTag(String experimentID, ZooKeeper zk, String experimentNode,
-                                      String pickedChild) throws KeeperException, InterruptedException,AiravataException {
+    public static long getDeliveryTag(String experimentID, CuratorFramework curatorClient, String experimentNode,
+                                      String pickedChild) throws Exception {
         String deliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentID
                 + DELIVERY_TAG_POSTFIX;
-        Stat exists = zk.exists(deliveryTagPath, false);
+        Stat exists = curatorClient.checkExists().forPath(deliveryTagPath);
         if(exists==null) {
             logger.error("Cannot find delivery Tag in path:" + deliveryTagPath + " for this experiment");
             return -1;
         }
-        return bytesToLong(zk.getData(deliveryTagPath, false, exists));
+        return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(deliveryTagPath));
     }
     public static byte[] longToBytes(long x) {
         ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
@@ -222,15 +188,16 @@ public class AiravataZKUtils implements Watcher {
         return ByteBuffer.wrap(bytes).getDouble();
     }
 
-    public static long getCancelDeliveryTagIfExist(String experimentId, ZooKeeper zk, String experimentNode, String pickedChild) throws KeeperException, InterruptedException {
+    public static long getCancelDeliveryTagIfExist(String experimentId, CuratorFramework curatorClient,
+                                                   String experimentNode, String pickedChild) throws Exception {
 
         String cancelDeliveryTagPath = experimentNode + File.separator + pickedChild + File.separator + experimentId +
                 AiravataZKUtils.CANCEL_DELIVERY_TAG_POSTFIX;
-        Stat exists = zk.exists(cancelDeliveryTagPath, false);
+        Stat exists = curatorClient.checkExists().forPath(cancelDeliveryTagPath);
         if (exists == null) {
             return -1; // no cancel deliverytag found
         } else {
-            return bytesToLong(zk.getData(cancelDeliveryTagPath, false, exists));
+            return bytesToLong(curatorClient.getData().storingStatIn(exists).forPath(cancelDeliveryTagPath));
         }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 9a8d2c7..6c2f213 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -605,6 +605,11 @@
 			<artifactId>amqp-client</artifactId>
 			<version>${amqp.client.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-framework</artifactId>
+			<version>${curator.version}</version>
+		</dependency>
 
 		<!-- ======================== Sample =================== -->
 		<dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index 5a178bd..3e8b423 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -82,11 +82,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.curator</groupId>
-            <artifactId>curator-recipes</artifactId>
-            <version>${curator.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.curator</groupId>
             <artifactId>curator-framework</artifactId>
             <version>${curator.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 5d747aa..20926d7 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -22,7 +22,6 @@ package org.apache.airavata.gfac.server;
 
 import com.google.common.eventbus.EventBus;
 import org.airavata.appcatalog.cpi.AppCatalog;
-import org.airavata.appcatalog.cpi.AppCatalogException;
 import org.apache.aiaravata.application.catalog.data.impl.AppCatalogFactory;
 import org.apache.airavata.common.exception.AiravataException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -45,71 +44,65 @@ import org.apache.airavata.messaging.core.MessageContext;
 import org.apache.airavata.messaging.core.MessageHandler;
 import org.apache.airavata.messaging.core.MessagingConstants;
 import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.airavata.model.messaging.event.*;
+import org.apache.airavata.model.messaging.event.MessageType;
+import org.apache.airavata.model.messaging.event.TaskSubmitEvent;
+import org.apache.airavata.model.messaging.event.TaskTerminateEvent;
 import org.apache.airavata.model.workspace.experiment.ExperimentState;
 import org.apache.airavata.model.workspace.experiment.ExperimentStatus;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
-import org.apache.zookeeper.*;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.locks.Lock;
 
 
-public class GfacServerHandler implements GfacService.Iface, Watcher {
+public class GfacServerHandler implements GfacService.Iface {
     private final static AiravataLogger logger = AiravataLoggerFactory.getLogger(GfacServerHandler.class);
-
     private static RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer;
-
     private static int requestCount=0;
-
     private Registry registry;
     private AppCatalog appCatalog;
-
     private String gatewayName;
-
     private String airavataUserName;
-
-    private ZooKeeper zk;
-
+//    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
     private static Integer mutex = -1;
-
     private static Lock lock;
-
     private MonitorPublisher publisher;
-
     private String gfacServer;
-
     private String gfacExperiments;
-
     private String airavataServerHostPort;
-
-
     private BlockingQueue<TaskSubmitEvent> taskSubmitEvents;
 
     public GfacServerHandler() throws Exception {
-        // registering with zk
         try {
+            // start curator client
             String zkhostPort = AiravataZKUtils.getZKhostPort();
-            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
-                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
-            zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);   // no watcher is required, this will only use to store some data
+            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
+            curatorClient = CuratorFrameworkFactory.newClient(zkhostPort, retryPolicy);
+            curatorClient.start();
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
-            logger.info("Waiting for zookeeper to connect to the server");
-            synchronized (mutex) {
-                mutex.wait(5000);  // waiting for the syncConnected event
-            }
+            airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
             storeServerConfig();
-            logger.info("Finished starting ZK: " + zk);
             publisher = new MonitorPublisher(new EventBus());
             BetterGfacImpl.setMonitorPublisher(publisher);
             registry = RegistryFactory.getDefaultRegistry();
@@ -121,24 +114,8 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                 rabbitMQTaskLaunchConsumer = new RabbitMQTaskLaunchConsumer();
                 rabbitMQTaskLaunchConsumer.listen(new TaskLaunchMessageHandler());
             }
-            BetterGfacImpl.startStatusUpdators(registry, zk, publisher, rabbitMQTaskLaunchConsumer);
-        } catch (ApplicationSettingsException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (InterruptedException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (AppCatalogException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (RegistryException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (KeeperException e) {
-            logger.error("Error initialising GFAC", e);
-            throw new Exception("Error initialising GFAC", e);
-        } catch (IOException e) {
-            logger.error("Error initialising GFAC", e);
+            BetterGfacImpl.startStatusUpdators(registry, curatorClient, publisher, rabbitMQTaskLaunchConsumer);
+        } catch (Exception e) {
             throw new Exception("Error initialising GFAC", e);
         }
     }
@@ -152,60 +129,27 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
             logger.error(e.getMessage(), e);
         }
     }
-    private void storeServerConfig() throws KeeperException, InterruptedException, ApplicationSettingsException {
-        Stat zkStat = zk.exists(gfacServer, false);
-        if (zkStat == null) {
-            zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+    private void storeServerConfig() throws Exception {
+        Stat stat = curatorClient.checkExists().forPath(gfacServer);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacServer, new byte[0]);
         }
         String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
         String instanceNode = gfacServer + File.separator + instanceId;
-        zkStat = zk.exists(instanceNode, true);
-        if (zkStat == null) {
-            zk.create(instanceNode,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
-            zk.getChildren(instanceNode, true);
+        stat = curatorClient.checkExists().forPath(instanceNode);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.EPHEMERAL).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(instanceNode, airavataServerHostPort.getBytes());
+            curatorClient.getChildren().watched().forPath(instanceNode);
         }
-        zkStat = zk.exists(gfacExperiments, false);
-        if (zkStat == null) {
-            zk.create(gfacExperiments,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
+        stat = curatorClient.checkExists().forPath(gfacExperiments);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(gfacExperiments, airavataServerHostPort.getBytes());
         }
-        zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
-        if (zkStat == null) {
-            zk.create(gfacExperiments + File.separator + instanceId,
-                    airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } else {
-            logger.error(" Zookeeper is inconsistent state  !!!!!");
-        }
-    }
-
-    synchronized public void process(WatchedEvent watchedEvent) {
-        logger.info(watchedEvent.getPath());
-        logger.info(watchedEvent.getType().toString());
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            logger.info(state.name());
-            switch (state){
-                case SyncConnected:
-                    mutex.notify();
-                    break;
-                case Expired:case Disconnected:
-                   logger.info("ZK Connection is "+ state.toString());
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        logger.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        logger.error(e.getMessage(), e);
-                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-            }
+        stat = curatorClient.checkExists().forPath(gfacExperiments + File.separator + instanceId);
+        if (stat == null) {
+            curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+                    .forPath(gfacExperiments + File.separator + instanceId, airavataServerHostPort.getBytes());
         }
     }
 
@@ -304,7 +248,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 
     private GFac getGfac() throws TException {
         try {
-            return new BetterGfacImpl(registry, appCatalog,null , publisher);
+            return new BetterGfacImpl(registry, appCatalog, curatorClient, publisher);
 
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -379,16 +323,11 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     registry.update(RegistryModelType.EXPERIMENT_STATUS, status, event.getExperimentId());
                     experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
                     try {
-                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), zk, experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
+                        GFacUtils.createExperimentEntryForPassive(event.getExperimentId(), event.getTaskId(), curatorClient,
+                                experimentNode, nodeName, event.getTokenId(), message.getDeliveryTag());
                         AiravataZKUtils.getExpStatePath(event.getExperimentId());
                         submitJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
-                    } catch (KeeperException e) {
-                        logger.error(nodeName + " was interrupted.");
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                    } catch (InterruptedException e) {
-                        logger.error(e.getMessage(), e);
-                        rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
-                    } catch (ApplicationSettingsException e) {
+                    } catch (Exception e) {
                         logger.error(e.getMessage(), e);
                         rabbitMQTaskLaunchConsumer.sendAck(message.getDeliveryTag());
                     }
@@ -404,7 +343,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                 try {
                     byte[] bytes = ThriftUtils.serializeThriftObject(messageEvent);
                     ThriftUtils.createThriftFromBytes(bytes, event);
-                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), zk, message.getDeliveryTag());
+                    boolean saveDeliveryTagSuccess = GFacUtils.setExperimentCancel(event.getExperimentId(), curatorClient, message.getDeliveryTag());
                     if (saveDeliveryTagSuccess) {
                         cancelSuccess = cancelJob(event.getExperimentId(), event.getTaskId(), event.getGatewayId(), event.getTokenId());
                         System.out.println(" Message Received with message id '" + message.getMessageId()
@@ -420,7 +359,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                         // if cancel success , AiravataExperimentStatusUpdator will send an ack to this message.
                     } else {
                         try {
-                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), zk)) {
+                            if (GFacUtils.ackCancelRequest(event.getExperimentId(), curatorClient)) {
                                 if (!rabbitMQTaskLaunchConsumer.isOpen()) {
                                     rabbitMQTaskLaunchConsumer.reconnect();
                                 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index 465f00e..7eede18 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -116,11 +116,10 @@
         <!-- this is the dependency for amqp implementation -->
         <!-- zookeeper dependencies -->
         <dependency>
-        	<groupId>org.apache.zookeeper</groupId>
-        	<artifactId>zookeeper</artifactId>
-        	<version>3.4.0</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-framework</artifactId>
+            <version>${curator.version}</version>
         </dependency>
-
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 6a8dd5f..0ca3828 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -42,6 +42,7 @@ import org.apache.airavata.model.workspace.experiment.JobDetails;
 import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,29 +51,18 @@ public class JobExecutionContext extends AbstractContext implements Serializable
 
     private static final Logger log = LoggerFactory.getLogger(JobExecutionContext.class);
     private GFacConfiguration gfacConfiguration;
-
     private ApplicationContext applicationContext;
-
     private MessageContext inMessageContext;
-
     private MessageContext outMessageContext;
-
     private GFacNotifier notifier;
-
     //FIXME : not needed for gfac
     private Experiment experiment;
-
     private TaskDetails taskData;
-
     private JobDetails jobDetails;
-
     // FIXME : not needed for gfac
     private WorkflowNodeDetails workflowNodeDetails;
-
     private GFac gfac;
-
-    private ZooKeeper zk;
-
+    private CuratorFramework curatorClient;
     private String credentialStoreToken;
     /**
      * User defined scratch/temp directory
@@ -150,10 +140,12 @@ public class JobExecutionContext extends AbstractContext implements Serializable
     // which service description we should refer during the execution of the current job represented
     // by this context instance.
     private String applicationName;
-
     private String experimentID;
-
     private AppCatalog appCatalog;
+    private String gatewayID;
+    private String status;
+    private List<String> outputFileList;
+    private Registry registry;
 
     public String getGatewayID() {
         return gatewayID;
@@ -163,13 +155,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.gatewayID = gatewayID;
     }
 
-    private String gatewayID;
-    
-    private String status;
-
-    private List<String> outputFileList;
-
-    private Registry registry;
 
     /**
      *  Security context is used to handle authentication for input handlers and providers.
@@ -377,15 +362,6 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.gfac = gfac;
     }
 
-    public ZooKeeper getZk() {
-        return zk;
-    }
-
-    public void setZk(ZooKeeper zk) {
-        this.zk = zk;
-
-    }
-
     public String getCredentialStoreToken() {
         return credentialStoreToken;
     }
@@ -494,6 +470,14 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         this.preferredDataMovementInterface = preferredDataMovementInterface;
     }
 
+    public CuratorFramework getCuratorClient() {
+        return curatorClient;
+    }
+
+    public void setCuratorClient(CuratorFramework curatorClient) {
+        this.curatorClient = curatorClient;
+    }
+
     public String getExecutablePath() {
         if (applicationContext == null || applicationContext.getApplicationDeploymentDescription() == null) {
             return null;
@@ -502,6 +486,8 @@ public class JobExecutionContext extends AbstractContext implements Serializable
         }
     }
 
+
+
     public String getLoginUserName() {
         return loginUserName;
     }