You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/28 17:37:40 UTC

[3/4] airavata git commit: Integrated Apache curator with Gfac. With this fix every zookeeper call goes through CuratorFramework client.

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 2cd9ecb..200ffbe 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -61,6 +61,8 @@ import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.RegistryException;
 import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -80,57 +82,44 @@ import java.util.*;
  * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
  * the resource, required data for the job has to be stored in registry prior to invoke this object.
  */
-public class BetterGfacImpl implements GFac,Watcher {
+public class BetterGfacImpl implements GFac {
     private static final Logger log = LoggerFactory.getLogger(BetterGfacImpl.class);
     public static final String ERROR_SENT = "ErrorSent";
     private Registry registry;
-    // we are not storing zk instance in to jobExecution context
-    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
     private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
     private static File gfacConfigFile;
     private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
     private static MonitorPublisher monitorPublisher;
-    private static Integer mutex = -1;
 
     /**
      * Constructor for GFac
      *
      * @param registry
-     * @param zooKeeper
+     * @param curatorClient
      */
-    public BetterGfacImpl(Registry registry,  AppCatalog appCatalog, ZooKeeper zooKeeper,
+    public BetterGfacImpl(Registry registry, AppCatalog appCatalog, CuratorFramework curatorClient,
                           MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
         this.registry = registry;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
-        this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-        log.info("Waiting until zookeeper client connect to the server...");
-        synchronized (mutex) {
-            mutex.wait(5000);  // waiting for the syncConnected event
-        }
+        this.curatorClient = curatorClient;
     }
 
-    public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher,RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
+    public static void startStatusUpdators(Registry registry, CuratorFramework curatorClient, MonitorPublisher publisher,
+
+                                           RabbitMQTaskLaunchConsumer rabbitMQTaskLaunchConsumer) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
             Publisher rabbitMQPublisher = PublisherFactory.createActivityPublisher();
-
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
                 AbstractActivityListener abstractActivityListener = aClass.newInstance();
                 activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(publisher, registry, zk, rabbitMQPublisher,rabbitMQTaskLaunchConsumer);
+                abstractActivityListener.setup(publisher, registry, curatorClient, rabbitMQPublisher, rabbitMQTaskLaunchConsumer);
                 log.info("Registering listener: " + listenerClass);
                 publisher.registerListener(abstractActivityListener);
             }
-        } catch (ClassNotFoundException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (InstantiationException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (IllegalAccessException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (ApplicationSettingsException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties", e);
-        } catch (AiravataException e) {
+        } catch (Exception e) {
             log.error("Error loading the listener classes configured in airavata-server.properties", e);
         }
     }
@@ -151,26 +140,9 @@ public class BetterGfacImpl implements GFac,Watcher {
                 threadedHandler.initProperties(handlerConfig.getProperties());
                 daemonHandlers.add(threadedHandler);
             }
-        } catch (ParserConfigurationException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (IOException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (SAXException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (XPathExpressionException e) {
+        } catch (ParserConfigurationException | IOException | XPathExpressionException | ClassNotFoundException |
+                InstantiationException | IllegalAccessException | GFacHandlerException | SAXException e) {
             log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
-        } catch (ClassNotFoundException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (InstantiationException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (GFacHandlerException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
-        } catch (IllegalAccessException e) {
-            log.error("Error initializing the handler: " + className);
-            log.error(className + " class has to implement " + ThreadedHandler.class);
         }
         for (ThreadedHandler tHandler : daemonHandlers) {
             (new Thread(tHandler)).start();
@@ -184,13 +156,13 @@ public class BetterGfacImpl implements GFac,Watcher {
         daemonHandlers = new ArrayList<ThreadedHandler>();
         startDaemonHandlers();
     }
-    
+
     public BetterGfacImpl(Registry registry) {
-    	this();
-    	this.registry = registry;
+        this();
+        this.registry = registry;
     }
 
-    
+
     /**
      * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
      * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
@@ -206,13 +178,13 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext.setCredentialStoreToken(tokenId);
             return submitJob(jobExecutionContext);
         } catch (Exception e) {
-            log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
+            log.error("Error inovoking the job with experiment ID: " + experimentID + ":" + e.getMessage());
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
             GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             // FIXME: Here we need to update Experiment status to Failed, as we used chained update approach updating
             // task status will cause to update Experiment status. Remove this chained update approach and fix this correctly (update experiment status)
-            if(jobExecutionContext!=null){
+            if (jobExecutionContext != null) {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -222,8 +194,6 @@ public class BetterGfacImpl implements GFac,Watcher {
                 monitorPublisher.publish(event);
             }
             throw new GFacException(e);
-        }finally {
-            closeZK(jobExecutionContext);
         }
     }
 
@@ -237,7 +207,7 @@ public class BetterGfacImpl implements GFac,Watcher {
          * 1. Get the Task from the task ID and construct the Job object and save it in to registry
          * 2. Add properties of description documents to jobExecutionContext which will be used inside the providers.
          */
-        
+
         //Fetch the Task details for the requested experimentID from the registry. Extract required pointers from the Task object.
         TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
 
@@ -289,11 +259,11 @@ public class BetterGfacImpl implements GFac,Watcher {
         jobExecutionContext.setTaskData(taskData);
         jobExecutionContext.setGatewayID(gatewayID);
         jobExecutionContext.setAppCatalog(appCatalog);
-        
-      
+
+
         List<JobDetails> jobDetailsList = taskData.getJobDetailsList();
         //FIXME: Following for loop only set last jobDetails element to the jobExecutionContext
-        for(JobDetails jDetails:jobDetailsList){
+        for (JobDetails jDetails : jobDetailsList) {
             jobExecutionContext.setJobDetails(jDetails);
         }
         // setting the registry
@@ -314,11 +284,11 @@ public class BetterGfacImpl implements GFac,Watcher {
 
         jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
         jobExecutionContext.setGfac(this);
-        jobExecutionContext.setZk(zk);
+        jobExecutionContext.setCuratorClient(curatorClient);
 
         // handle job submission protocol
         List<JobSubmissionInterface> jobSubmissionInterfaces = computeResource.getJobSubmissionInterfaces();
-        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()){
+        if (jobSubmissionInterfaces != null && !jobSubmissionInterfaces.isEmpty()) {
             Collections.sort(jobSubmissionInterfaces, new Comparator<JobSubmissionInterface>() {
                 @Override
                 public int compare(JobSubmissionInterface jobSubmissionInterface, JobSubmissionInterface jobSubmissionInterface2) {
@@ -327,7 +297,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             });
 
             jobExecutionContext.setHostPrioritizedJobSubmissionInterfaces(jobSubmissionInterfaces);
-        }else {
+        } else {
             throw new GFacException("Compute resource should have at least one job submission interface defined...");
         }
         // handle data movement protocol
@@ -346,7 +316,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         populateDefaultComputeResourceConfiguration(jobExecutionContext, applicationInterface, computeResource);
         populateResourceJobManager(jobExecutionContext);
         // if gateway resource preference is set
-        if (gatewayResourcePreferences != null ) {
+        if (gatewayResourcePreferences != null) {
             if (gatewayResourcePreferences.getScratchLocation() == null) {
                 gatewayResourcePreferences.setScratchLocation("/tmp");
             }
@@ -365,7 +335,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 }
             }
 
-            if(gatewayResourcePreferences.getLoginUserName() != null){
+            if (gatewayResourcePreferences.getLoginUserName() != null) {
                 jobExecutionContext.setLoginUserName(gatewayResourcePreferences.getLoginUserName());
             }
 
@@ -375,45 +345,45 @@ public class BetterGfacImpl implements GFac,Watcher {
                 jobExecutionContext.setPreferredDataMovementInterface(jobExecutionContext.getHostPrioritizedDataMovementInterfaces().get(0));
                 jobExecutionContext.setPreferredDataMovementProtocol(jobExecutionContext.getPreferredDataMovementInterface().getDataMovementProtocol());
             } else {
-            	// this check is to avoid NPE when job submission endpoints do 
-            	// not contain any data movement interfaces. 
-            	if((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
-            		for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
-            			if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
-            				jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
-            				break;
-                    	}
-            		}
-            	}
+                // this check is to avoid NPE when job submission endpoints do
+                // not contain any data movement interfaces.
+                if ((dataMovementInterfaces != null) && (!dataMovementInterfaces.isEmpty())) {
+                    for (DataMovementInterface dataMovementInterface : dataMovementInterfaces) {
+                        if (gatewayResourcePreferences.getPreferredDataMovementProtocol() == dataMovementInterface.getDataMovementProtocol()) {
+                            jobExecutionContext.setPreferredDataMovementInterface(dataMovementInterface);
+                            break;
+                        }
+                    }
+                }
             }
-        }  else {
+        } else {
             setUpWorkingLocation(jobExecutionContext, applicationInterface, "/tmp");
         }
         List<OutputDataObjectType> taskOutputs = taskData.getApplicationOutputs();
-        if (taskOutputs == null || taskOutputs.isEmpty() ){
+        if (taskOutputs == null || taskOutputs.isEmpty()) {
             taskOutputs = applicationInterface.getApplicationOutputs();
         }
 
-        for (OutputDataObjectType objectType : taskOutputs){
-            if (objectType.getType() == DataType.URI && objectType.getValue() != null){
+        for (OutputDataObjectType objectType : taskOutputs) {
+            if (objectType.getType() == DataType.URI && objectType.getValue() != null) {
                 String filePath = objectType.getValue();
                 // if output is not in working folder
                 if (objectType.getLocation() != null && !objectType.getLocation().isEmpty()) {
-                	if(objectType.getLocation().startsWith(File.separator)){
-                		filePath = objectType.getLocation() + File.separator + filePath;
-                    }else{
-                    	filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
+                    if (objectType.getLocation().startsWith(File.separator)) {
+                        filePath = objectType.getLocation() + File.separator + filePath;
+                    } else {
+                        filePath = jobExecutionContext.getOutputDir() + File.separator + objectType.getLocation() + File.separator + filePath;
                     }
-                }else{
-                	filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
+                } else {
+                    filePath = jobExecutionContext.getOutputDir() + File.separator + filePath;
                 }
                 objectType.setValue(filePath);
-                
+
             }
-            if (objectType.getType() == DataType.STDOUT){
+            if (objectType.getType() == DataType.STDOUT) {
                 objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stdout");
             }
-            if (objectType.getType() == DataType.STDERR){
+            if (objectType.getType() == DataType.STDERR) {
                 objectType.setValue(jobExecutionContext.getOutputDir() + File.separator + jobExecutionContext.getApplicationName() + ".stderr");
             }
         }
@@ -464,23 +434,23 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void populateResourceJobManager (JobExecutionContext jobExecutionContext) {
+    private void populateResourceJobManager(JobExecutionContext jobExecutionContext) {
         try {
             JobSubmissionProtocol submissionProtocol = jobExecutionContext.getPreferredJobSubmissionProtocol();
             JobSubmissionInterface jobSubmissionInterface = jobExecutionContext.getPreferredJobSubmissionInterface();
             if (submissionProtocol == JobSubmissionProtocol.SSH) {
                 SSHJobSubmission sshJobSubmission = GFacUtils.getSSHJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (sshJobSubmission != null){
+                if (sshJobSubmission != null) {
                     jobExecutionContext.setResourceJobManager(sshJobSubmission.getResourceJobManager());
                 }
-            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL){
+            } else if (submissionProtocol == JobSubmissionProtocol.LOCAL) {
                 LOCALSubmission localJobSubmission = GFacUtils.getLocalJobSubmission(jobSubmissionInterface.getJobSubmissionInterfaceId());
-                if (localJobSubmission != null){
+                if (localJobSubmission != null) {
                     jobExecutionContext.setResourceJobManager(localJobSubmission.getResourceJobManager());
                 }
             }
         } catch (AppCatalogException e) {
-           log.error("Error occured while retrieving job submission interface", e);
+            log.error("Error occured while retrieving job submission interface", e);
         }
     }
 
@@ -488,7 +458,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         // We need to check whether this job is submitted as a part of a large workflow. If yes,
         // we need to setup workflow tracking listerner.
         try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             // Register log event listener. This is required in all scenarios.
             if (isNewJob(gfacExpState)) {
                 // In this scenario We do everything from the beginning
@@ -497,8 +467,8 @@ public class BetterGfacImpl implements GFac,Watcher {
                 launch(jobExecutionContext);
             } else if (isCompletedJob(gfacExpState)) {
                 log.info("There is nothing to recover in this job so we do not re-submit");
-                ZKUtil.deleteRecursive(zk,
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
+                ZKPaths.deleteChildren(curatorClient.getZookeeperClient().getZooKeeper(),
+                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()), true);
             } else {
                 // Now we know this is an old Job, so we have to handle things gracefully
                 log.info("Re-launching the job in GFac because this is re-submitted to GFac");
@@ -547,7 +517,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private boolean cancel(JobExecutionContext jobExecutionContext) throws GFacException {
         try {
-            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+            GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(curatorClient, jobExecutionContext);   // this is the original state came, if we query again it might be different,so we preserve this state in the environment
             String workflowInstanceID = null;
             if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
                 //todo implement WorkflowTrackingListener properly
@@ -575,27 +545,25 @@ public class BetterGfacImpl implements GFac,Watcher {
 //
 //            }
             return true;
-            }catch(Exception e){
-                log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-                throw new GFacException(e.getMessage(), e);
-            }finally{
-                closeZK(jobExecutionContext);
-            }
+        } catch (Exception e) {
+            log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
+            throw new GFacException(e.getMessage(), e);
         }
+    }
 
-	private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
-		// Scheduler will decide the execution flow of handlers and provider
-		// which handles
-		// the job.
-		String experimentID = jobExecutionContext.getExperimentID();
-		try {
-			Scheduler.schedule(jobExecutionContext);
+    private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
+        // Scheduler will decide the execution flow of handlers and provider
+        // which handles
+        // the job.
+        String experimentID = jobExecutionContext.getExperimentID();
+        try {
+            Scheduler.schedule(jobExecutionContext);
 
-			// Executing in handlers in the order as they have configured in
-			// GFac configuration
-			// here we do not skip handler if some handler does not have to be
-			// run again during re-run it can implement
-			// that logic in to the handler
+            // Executing in handlers in the order as they have configured in
+            // GFac configuration
+            // here we do not skip handler if some handler does not have to be
+            // run again during re-run it can implement
+            // that logic in to the handler
 
             // After executing the in handlers provider instance should be set
             // to job execution context.
@@ -637,31 +605,31 @@ public class BetterGfacImpl implements GFac,Watcher {
                 default:
                     throw new GFacException("Un-handled GfacExperimentState : " + state.name());
             }
-		} catch (Exception e) {
-            log.error(e.getMessage(),e);
-			try {
-				// we make the experiment as failed due to exception scenario
-				monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 JobIdentifier jobIdentity = new JobIdentifier(
                         jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
-			} catch (NullPointerException e1) {
-				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
+                monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
+                GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
 
-			}
-			jobExecutionContext.setProperty(ERROR_SENT, "true");
-			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
             throw new GFacException(e.getMessage(), e);
         }
     }
@@ -678,34 +646,34 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
-		// Scheduler will decide the execution flow of handlers and provider
-		// which handles
-		// the job.
-		try {
-			Scheduler.schedule(jobExecutionContext);
-
-			// Executing in handlers in the order as they have configured in
-			// GFac configuration
-			// here we do not skip handler if some handler does not have to be
-			// run again during re-run it can implement
-			// that logic in to the handler
+        // Scheduler will decide the execution flow of handlers and provider
+        // which handles
+        // the job.
+        try {
+            Scheduler.schedule(jobExecutionContext);
+
+            // Executing in handlers in the order as they have configured in
+            // GFac configuration
+            // here we do not skip handler if some handler does not have to be
+            // run again during re-run it can implement
+            // that logic in to the handler
             if (!isCancelling(jobExecutionContext)) {
                 invokeInFlowHandlers(jobExecutionContext); // to keep the
                 // consistency we always
                 // try to re-run to
                 // avoid complexity
-            }else{
+            } else {
                 log.info("Experiment is cancelled, so launch operation is stopping immediately");
                 GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return; // if the job is cancelled, status change is handled in cancel operation this thread simply has to be returned
             }
             // if (experimentID != null){
-			// registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
-			// }
+            // registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+            // }
 
-			// After executing the in handlers provider instance should be set
-			// to job execution context.
-			// We get the provider instance and execute it.
+            // After executing the in handlers provider instance should be set
+            // to job execution context.
+            // We get the provider instance and execute it.
             if (!isCancelling(jobExecutionContext)) {
                 invokeProviderExecute(jobExecutionContext);
             } else {
@@ -713,53 +681,53 @@ public class BetterGfacImpl implements GFac,Watcher {
                 GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                 return;
             }
-            } catch (Exception e) {
-			try {
-				// we make the experiment as failed due to exception scenario
-				monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
-				// monitorPublisher.publish(new
-				// ExperimentStatusChangedEvent(new
-				// ExperimentIdentity(jobExecutionContext.getExperimentID()),
-				// ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
-				// monitorPublisher.publish(new TaskStatusChangeRequest(
-				// new TaskIdentity(jobExecutionContext.getExperimentID(),
-				// jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-				// jobExecutionContext.getTaskData().getTaskID()),
-				// TaskState.FAILED
-				// ));
+        } catch (Exception e) {
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+                // monitorPublisher.publish(new
+                // ExperimentStatusChangedEvent(new
+                // ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                // ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                // monitorPublisher.publish(new TaskStatusChangeRequest(
+                // new TaskIdentity(jobExecutionContext.getExperimentID(),
+                // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                // jobExecutionContext.getTaskData().getTaskID()),
+                // TaskState.FAILED
+                // ));
                 JobIdentifier jobIdentity = new JobIdentifier(
-                        jobExecutionContext.getJobDetails().getJobID(),jobExecutionContext.getTaskData().getTaskID(),jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
-				monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
-			} catch (NullPointerException e1) {
-				log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
-						+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-				//monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
-				// Updating the task status if there's any task associated
+                monitorPublisher.publish(new JobStatusChangeEvent(JobState.FAILED, jobIdentity));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+                        + "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+                //monitorPublisher.publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
                 TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
                         jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
                         jobExecutionContext.getExperimentID(),
                         jobExecutionContext.getGatewayID());
                 monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.FAILED, taskIdentity));
 
-			}
-			jobExecutionContext.setProperty(ERROR_SENT, "true");
-			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
             throw new GFacException(e.getMessage(), e);
-		}
+        }
     }
 
-    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void invokeProviderExecute(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -767,13 +735,13 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             if (submit) {
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-                GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
-                GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
                 if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
                     initProvider(provider, jobExecutionContext);
                     executeProvider(provider, jobExecutionContext);
@@ -781,18 +749,16 @@ public class BetterGfacImpl implements GFac,Watcher {
                 } else {
                     provider.recover(jobExecutionContext);
                 }
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             } else {
                 disposeProvider(provider, jobExecutionContext);
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
                 monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             }
         }
 
-        if (GFacUtils.isSynchronousMode(jobExecutionContext))
-
-        {
+        if (GFacUtils.isSynchronousMode(jobExecutionContext))  {
             invokeOutFlowHandlers(jobExecutionContext);
         }
 
@@ -812,12 +778,12 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     // TODO - Did refactoring, but need to recheck the logic again.
-    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws Exception {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+            GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName());
+            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, provider.getClass().getName());
             if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
                 initProvider(provider, jobExecutionContext);
                 cancelProvider(provider, jobExecutionContext);
@@ -825,7 +791,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             } else {
                 provider.recover(jobExecutionContext);
             }
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
 
@@ -885,11 +851,11 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKING));
             for (GFacHandlerConfig handlerClassName : handlers) {
-                if(!isCancelling(jobExecutionContext)) {
+                if (!isCancelling(jobExecutionContext)) {
                     Class<? extends GFacHandler> handlerClass;
                     GFacHandler handler;
                     try {
-                        GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                        GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
                         handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                         handler = handlerClass.newInstance();
                         handler.initProperties(handlerClassName.getProperties());
@@ -902,12 +868,12 @@ public class BetterGfacImpl implements GFac,Watcher {
                     }
                     try {
                         handler.invoke(jobExecutionContext);
-                        GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                        GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         // if exception thrown before that we do not make it finished
                     } catch (GFacHandlerException e) {
                         throw new GFacException("Error Executing a InFlow Handler", e.getCause());
                     }
-                }else{
+                } else {
                     log.info("Experiment execution is cancelled, so InHandler invocation is going to stop");
                     GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.CANCELED);
                     break;
@@ -916,38 +882,18 @@ public class BetterGfacImpl implements GFac,Watcher {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
                     , GfacExperimentState.INHANDLERSINVOKED));
         } catch (Exception e) {
-            throw new GFacException("Error Invoking Handlers:"+e.getMessage(), e);
+            throw new GFacException("Error Invoking Handlers:" + e.getMessage(), e);
         }
     }
 
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         String experimentPath = null;
         try {
-            try {
-                if(jobExecutionContext.getZk()!=null){
-                    closeZK(jobExecutionContext);
-                }
-                jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
-                zk = jobExecutionContext.getZk();
-                log.info("Waiting until zookeeper client connect to the server...");
-                synchronized (mutex) {
-                    mutex.wait(5000);  // waiting for the syncConnected event
-                }
-                experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
-                if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
-                    log.error("Experiment is already finalized so no output handlers will be invoked");
-                    return;
-                }
-            } catch (IOException e) {
-                log.error(e.getMessage(), e);
-            } catch (ApplicationSettingsException e) {
-                log.error(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                log.error(e.getMessage(), e);
-            } catch (KeeperException e) {
-                log.error(e.getMessage(), e);
+            experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+            if (curatorClient.checkExists().forPath(experimentPath) == null) {
+                log.error("Experiment is already finalized so no output handlers will be invoked");
+                return;
             }
-
             GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
             List<GFacHandlerConfig> handlers = null;
             if (gFacConfiguration != null) {
@@ -968,23 +914,20 @@ public class BetterGfacImpl implements GFac,Watcher {
                         Class<? extends GFacHandler> handlerClass;
                         GFacHandler handler;
                         try {
-                            GFacUtils.createHandlerZnode(jobExecutionContext.getZk(), jobExecutionContext, handlerClassName.getClassName());
+                            GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName());
                             handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                             handler = handlerClass.newInstance();
                             handler.initProperties(handlerClassName.getProperties());
                         } catch (ClassNotFoundException e) {
                             log.error(e.getMessage());
                             throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                        } catch (InstantiationException e) {
-                            log.error(e.getMessage());
-                            throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                        } catch (IllegalAccessException e) {
+                        } catch (InstantiationException | IllegalAccessException e) {
                             log.error(e.getMessage());
                             throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
                         }
                         try {
                             handler.invoke(jobExecutionContext);
-                            GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                            GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                         } catch (Exception e) {
                             GFacUtils.publishTaskStatus(jobExecutionContext, monitorPublisher, TaskState.FAILED);
                             try {
@@ -1008,10 +951,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             } catch (Exception e) {
                 throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
-        } finally{
-            closeZK(jobExecutionContext);
         }
 
         // At this point all the execution is finished so we update the task and experiment statuses.
@@ -1029,16 +970,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     }
 
-    private void closeZK(JobExecutionContext jobExecutionContext) {
-        try {
-            if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
-                jobExecutionContext.getZk().close();
-            }
-        } catch (InterruptedException e) {
-            log.error(e.getMessage(), e);
-        }
-    }
-
     /**
      * If handlers ran successfully we re-run only recoverable handlers
      * If handler never ran we run the normal invoke method
@@ -1058,8 +989,8 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                     handler = handlerClass.newInstance();
-                    GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
-                    GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+                    GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+                    GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
                     handler.initProperties(handlerClassName.getProperties());
                     if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
                         log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
@@ -1069,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
                         handler.recover(jobExecutionContext);
                     }
-                    GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                    GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
                 } catch (GFacHandlerException e) {
                     throw new GFacException("Error Executing a InFlow Handler", e.getCause());
                 } catch (ClassNotFoundException e) {
@@ -1086,7 +1017,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 StringWriter errors = new StringWriter();
                 e.printStackTrace(new PrintWriter(errors));
-                GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             } catch (GFacException e1) {
                 log.error(e1.getLocalizedMessage());
             }
@@ -1117,8 +1048,8 @@ public class BetterGfacImpl implements GFac,Watcher {
             try {
                 handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                 handler = handlerClass.newInstance();
-                GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, handlerClassName.getClassName());
-                GFacUtils.createHandlerZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
+                GfacHandlerState plState = GFacUtils.getHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName());
+                GFacUtils.createHandlerZnode(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.INVOKING);
                 if (plState == GfacHandlerState.UNKNOWN || plState == GfacHandlerState.INVOKING) {
                     log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
                     handler.initProperties(handlerClassName.getProperties());
@@ -1127,12 +1058,12 @@ public class BetterGfacImpl implements GFac,Watcher {
                     // if these already ran we re-run only recoverable handlers
                     handler.recover(jobExecutionContext);
                 }
-                GFacUtils.updateHandlerState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
+                GFacUtils.updateHandlerState(curatorClient, jobExecutionContext, handlerClassName.getClassName(), GfacHandlerState.COMPLETED);
             } catch (ClassNotFoundException e) {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1142,7 +1073,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1152,7 +1083,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
@@ -1163,13 +1094,11 @@ public class BetterGfacImpl implements GFac,Watcher {
                 try {
                     StringWriter errors = new StringWriter();
                     e.printStackTrace(new PrintWriter(errors));
-                    GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                    GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                 } catch (GFacException e1) {
                     log.error(e1.getLocalizedMessage());
                 }
                 throw new GFacException("Error Executing a OutFlow Handler", e);
-            }finally {
-                closeZK(jobExecutionContext);
             }
         }
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
@@ -1214,23 +1143,14 @@ public class BetterGfacImpl implements GFac,Watcher {
         return registry;
     }
 
-    public ZooKeeper getZk() {
-        return zk;
-    }
-
-    public void setZk(ZooKeeper zk) {
-        this.zk = zk;
-    }
-
-
-    public boolean isCancelled(JobExecutionContext executionContext){
+    public boolean isCancelled(JobExecutionContext executionContext) {
         // we should check whether experiment is cancelled using registry
         try {
-            ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null){
+            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+            if (status != null) {
                 ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null){
-                    if(experimentState == ExperimentState.CANCELED){
+                if (experimentState != null) {
+                    if (experimentState == ExperimentState.CANCELED) {
                         return true;
                     }
                 }
@@ -1241,14 +1161,14 @@ public class BetterGfacImpl implements GFac,Watcher {
         return false;
     }
 
-    public boolean isCancelling(JobExecutionContext executionContext){
+    public boolean isCancelling(JobExecutionContext executionContext) {
         // check whether cancelling request came
         try {
-            ExperimentStatus status = (ExperimentStatus)registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
-            if (status != null){
+            ExperimentStatus status = (ExperimentStatus) registry.get(RegistryModelType.EXPERIMENT_STATUS, executionContext.getExperimentID());
+            if (status != null) {
                 ExperimentState experimentState = status.getExperimentState();
-                if (experimentState != null){
-                    if(experimentState == ExperimentState.CANCELING){
+                if (experimentState != null) {
+                    if (experimentState == ExperimentState.CANCELING) {
                         return true;
                     }
                 }
@@ -1276,43 +1196,4 @@ public class BetterGfacImpl implements GFac,Watcher {
         return false;
     }
 
-    public void process(WatchedEvent watchedEvent) {
-        log.info(watchedEvent.getPath());
-        if (Event.EventType.NodeDataChanged.equals(watchedEvent.getType())) {
-            // node data is changed, this means node is cancelled.
-            log.info("Experiment is cancelled with this path:" + watchedEvent.getPath());
-        }
-        synchronized (mutex) {
-            Event.KeeperState state = watchedEvent.getState();
-            log.info(state.name());
-            switch (state) {
-                case SyncConnected:
-                    mutex.notify();
-                    break;
-                case Expired:
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        log.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        log.error(e.getMessage(), e);
-                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-                case Disconnected:
-//                    try {
-//                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-//                    } catch (IOException e) {
-//                        log.error(e.getMessage(), e);
-//                    } catch (ApplicationSettingsException e) {
-//                        log.error(e.getMessage(), e);
-//                    }
-//                    synchronized (mutex) {
-//                        mutex.wait(5000);  // waiting for the syncConnected event
-//                    }
-                    log.info("ZK Connection is " + state.toString());
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index ee28c1d..eef0a33 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -48,7 +48,7 @@ public abstract class AbstractHandler implements GFacHandler {
 
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         try {
-            GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+            GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
         } catch (Exception e) {
             logger.error("Error saving Recoverable provider state", e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 58ef855..c6ada52 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -34,7 +34,7 @@ public class AppDescriptorCheckHandler implements GFacHandler {
     public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
         logger.info("Invoking ApplicationDescriptorCheckHandler ...");
         try {
-            GFacUtils.updateHandlerState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
+            GFacUtils.updateHandlerState(jobExecutionContext.getCuratorClient(), jobExecutionContext, this.getClass().getName(), GfacHandlerState.INVOKED);
         } catch (Exception e) {
             logger.info("Error saving plugin status to ZK");
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/4a061780/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 836e2c6..84d72fa 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -20,31 +20,28 @@
 */
 package org.apache.airavata.gfac.core.monitor;
 
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.airavata.common.exception.ApplicationSettingsException;
+import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.common.utils.listener.AbstractActivityListener;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.messaging.core.Publisher;
-import org.apache.airavata.messaging.core.impl.RabbitMQProducer;
-import org.apache.airavata.messaging.core.impl.RabbitMQTaskLaunchConsumer;
-import org.apache.zookeeper.*;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.eventbus.Subscribe;
+import java.io.File;
 
 public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
     private final static Logger logger = LoggerFactory.getLogger(GfacInternalStatusUpdator.class);
 
-    private ZooKeeper zk;
+    private CuratorFramework curatorClient;
 
     private static Integer mutex = -1;
 
@@ -57,39 +54,20 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                 + File.separator + statusChangeRequest.getMonitorID().getExperimentID();
         Stat exists = null;
         if(!(GfacExperimentState.COMPLETED.equals(statusChangeRequest.getState()) || GfacExperimentState.FAILED.equals(statusChangeRequest.getState()))) {
-            try {
-                if (!zk.getState().isConnected()) {
-                    String zkhostPort = AiravataZKUtils.getZKhostPort();
-                    zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-                    logger.info("Waiting for zookeeper to connect to the server");
-                    synchronized (mutex) {
-                        mutex.wait(5000);
-                    }
-                }
-                exists = zk.exists(experimentPath, false);
-                if (exists == null) {
-                    logger.error("ZK path: " + experimentPath + " does not exists !!");
-                    logger.error("Zookeeper is in an inconsistent state !!! ");
-                    return;
-                }
-            } catch (KeeperException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
-            } catch (InterruptedException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
-            } catch (IOException e) {
-                logger.error("Error while updating zk", e);
-                throw new Exception(e.getMessage(), e);
+            exists = curatorClient.checkExists().forPath(experimentPath);
+            if (exists == null) {
+                logger.error("ZK path: " + experimentPath + " does not exists !!");
+                return;
             }
-            Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+            Stat state = curatorClient.checkExists().forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE);
             if (state == null) {
                 // state znode has to be created
-                zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                curatorClient.create().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).
+                        forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                                String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
             } else {
-                zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+                curatorClient.setData().withVersion(state.getVersion()).forPath(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                        String.valueOf(statusChangeRequest.getState().getValue()).getBytes());
             }
         }
         switch (statusChangeRequest.getState()) {
@@ -107,8 +85,8 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
 
     public void setup(Object... configurations) {
         for (Object configuration : configurations) {
-            if (configuration instanceof ZooKeeper) {
-                this.zk = (ZooKeeper) configuration;
+            if (configuration instanceof CuratorFramework) {
+                this.curatorClient = (CuratorFramework) configuration;
             }
         }
     }