You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 20:05:49 UTC

[2/5] git commit: committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes

committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/362da4e8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/362da4e8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/362da4e8

Branch: refs/heads/master
Commit: 362da4e88c5ab8ddeba234b3ad7c5f829477c272
Parents: 2bcadf5
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 00:44:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 00:44:33 2014 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +
 .../server/handler/AiravataServerHandler.java   |  94 +++-
 .../handler/ApplicationCatalogHandler.java      |   2 +-
 .../airavata/api/server/util/Constants.java     |   2 -
 .../client/samples/CreateLaunchExperiment.java  | 226 +++-----
 .../airavataAPI.thrift                          |   1 +
 .../airavata/client/tools/DocumentCreator.java  |   2 +-
 .../apache/airavata/common/utils/Constants.java |  15 +
 .../main/resources/airavata-server.properties   |  22 +-
 modules/distribution/server/pom.xml             |   6 +
 .../server/src/main/assembly/bin-assembly.xml   |   3 +-
 modules/gfac/airavata-gfac-service/pom.xml      |   4 +-
 .../apache/airavata/gfac/server/GfacServer.java |  11 +-
 .../airavata/gfac/server/GfacServerHandler.java |  93 +++-
 .../apache/airavata/gfac/util/Constants.java    |  26 -
 modules/gfac/gfac-core/pom.xml                  |   6 +
 .../gfac/core/context/JobExecutionContext.java  |   3 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 504 ++++++++++++++++++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |   6 +
 .../core/monitor/GfacInternalStatusUpdator.java | 105 ++++
 .../airavata/gfac/core/monitor/MonitorID.java   |   6 +-
 .../state/GfacExperimentStateChangeRequest.java |  71 +++
 .../gfac/core/provider/AbstractProvider.java    |   6 +-
 .../experiment/GfacExperimentState.java         |  82 +++
 .../experiment/GfacExperimentStatus.java        | 516 +++++++++++++++++++
 .../experiment/gfacDataModelConstants.java      |  59 +++
 .../generate-gfac-stubs.sh                      |   2 +
 .../gfacDataModel.thrift                        |  55 ++
 .../orchestrator/server/OrchestratorServer.java |   1 +
 .../server/OrchestratorServerHandler.java       | 141 ++++-
 .../util/OrchestratorRecoveryHandler.java       | 107 ++++
 modules/orchestrator/orchestrator-core/pom.xml  |   6 +
 .../core/context/OrchestratorContext.java       |  15 +
 .../core/gfac/GFacClientFactory.java            |   2 +-
 .../core/impl/GFACServiceJobSubmitter.java      |  74 ++-
 .../cpi/impl/AbstractOrchestrator.java          |  14 +
 .../cpi/impl/SimpleOrchestratorImpl.java        |   1 +
 37 files changed, 2052 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index ff21028..1a89fcc 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -66,6 +66,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${org.slf4j.version}</version>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
         
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index be35568..a8f4297 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.api.server.handler;
 
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.error.*;
 import org.apache.airavata.model.workspace.Project;
@@ -35,20 +36,91 @@ import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.thrift.TException;
+import org.apache.tools.ant.types.selectors.FileSelector;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
 
-public class AiravataServerHandler implements Airavata.Iface {
-
-    private Registry registry;
-	private OrchestratorService.Client orchestratorClient;
+public class AiravataServerHandler implements Airavata.Iface, Watcher {
     private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
-	
+    private Registry registry;
+    private OrchestratorService.Client orchestratorClient;
+
+    private ZooKeeper zk;
+    private static Integer mutex = -1;
+
+
+    public AiravataServerHandler() {
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST)
+                                + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server");
+                String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server");
+                String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+
+                synchronized (mutex) {
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(apiServer, false);
+                if (zkStat == null) {
+                    zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                    logger.info("Successfully created airavata-server node");
+                }
+
+                zkStat = zk.exists(OrchServer, false);
+                if (zkStat == null) {
+                    zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created orchestrator-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
+    }
+
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            mutex.notify();
+        }
+    }
 
     /**
      * Query Airavata to fetch the API version
@@ -905,8 +977,8 @@ public class AiravataServerHandler implements Airavata.Iface {
     }
 
 	private OrchestratorService.Client getOrchestratorClient() {
-		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
-        final String serverHost = ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_HOST, null);
+		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
+        final String serverHost = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_HOST, null);
         return orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
index b3ce8fb..efec768 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
@@ -64,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ApplicationCatalogHandler implements Iface {
-    private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(ApplicationCatalogHandler.class);
 
 	AiravataRegistry2 registry;
 	private AiravataRegistry2 getRegistry() throws RegException, AiravataConfigurationException{

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
index eb6a119..92eac88 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
@@ -27,6 +27,4 @@ public class Constants {
     public static final String APP_CATALOG_SERVER_PORT = "app.catalog.server.port";
     public static final String APP_CATALOG_SERVER_HOST = "app.catalog.server.host";
     public static final String API_SERVER_MIN_THREADS = "apiserver.server.min.threads";
-    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
-    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c8c9235..170fb99 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,130 +61,26 @@ public class CreateLaunchExperiment {
             AiravataUtils.setExecutionAsClient();
             final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavata.getAPIVersion());
-            addDescriptors();
+//            addDescriptors();
 
 //            final String expId = createExperimentForSSHHost(airavata);
 //            final String expId = createExperimentForTrestles(airavata);
 //            final String expId = createExperimentForStampede(airavata);
-            final String expId = createExperimentForLocalHost(airavata);
+            for (int i = 0; i < 1; i++) {
+                final String expId = createExperimentForLocalHost(airavata);
 //            final String expId = createExperimentForLonestar(airavata);
 //            final String expId = createExperimentWRFTrestles(airavata);
-            System.out.println("Experiment ID : " + expId);
+                System.out.println("Experiment ID : " + expId);
 //            updateExperiment(airavata, expId);
-            launchExperiment(airavata, expId);
-            System.out.println("Launched successfully");
-            List<Experiment> experiments = getExperimentsForUser(airavata, "admin");
-            List<ExperimentSummary> searchedExps1 = searchExperimentsByName(airavata, "admin", "echo");
-            List<ExperimentSummary> searchedExps2 = searchExperimentsByDesc(airavata, "admin", "Echo");
-            List<ExperimentSummary> searchedExps3 = searchExperimentsByApplication(airavata, "admin", "cho");
-            List<Project> projects = getAllUserProject(airavata, "admin");
-            List<Project> searchProjects1 = searchProjectsByProjectName(airavata, "admin", "project");
-            List<Project> searchProjects2 = searchProjectsByProjectDesc(airavata, "admin", "test");
-            for (Experiment exp : experiments){
-                System.out.println(" exp id : " + exp.getExperimentID());
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps1){
-                System.out.println("search results by experiment name");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps2){
-                System.out.println("search results by experiment desc");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps3){
-                System.out.println("search results by application");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
+                launchExperiment(airavata, expId);
             }
-
-            for (Project pr : searchProjects1){
-                System.out.println(" project id : " + pr.getProjectID());
-            }
-
-            for (Project pr : searchProjects2){
-                System.out.println(" project id : " + pr.getProjectID());
-                System.out.println(" project desc : " + pr.getDescription());
-            }
-
-            Thread monitor = (new Thread(){
-                 public void run() {
-                     Map<String, JobStatus> jobStatuses = null;
-                     while (true) {
-                         try {
-                             jobStatuses = airavata.getJobStatuses(expId);
-                             Set<String> strings = jobStatuses.keySet();
-                             for (String key : strings) {
-                                 JobStatus jobStatus = jobStatuses.get(key);
-                                 if(jobStatus == null){
-                                     return;
-                                 }else {
-                                     if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
-                                         System.out.println("Job completed Job ID: " + key);
-                                         return;
-                                     }else{
-                                        System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());
-                                     }
-                                 }
-                             }
-                             ExperimentStatus experimentStatus = airavata.getExperimentStatus(expId);
-                             if(experimentStatus.getExperimentState().equals(ExperimentState.FAILED)){
-                            	 return;
-                             }
-							System.out.println(experimentStatus);
-                             Thread.sleep(5000);
-                         } catch (Exception e) {
-                             e.printStackTrace();
-                         }
-                     }
-
-                 }
-            });
-            monitor.start();
-            try {
-                monitor.join();
-            } catch (InterruptedException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-			try {
-				Thread.sleep(5000);
-			} catch (InterruptedException e) {
-				e.printStackTrace(); // To change body of catch statement use
-										// File | Settings | File Templates.
-			}
-
-//            System.out.println(airavata.getExperimentStatus(expId));
-            List<DataObjectType> output = airavata.getExperimentOutputs(expId);
-            for (DataObjectType dataObjectType : output) {
-                System.out.println(dataObjectType.getKey() + " : " + dataObjectType.getType() + " : " + dataObjectType.getValue());
-                
-				
-			}
-            String clonedExpId = cloneExperiment(airavata, expId);
-            System.out.println("Cloned Experiment ID : " + clonedExpId);
-//            System.out.println("retrieved exp id : " + experiment.getExperimentID());
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();
         }
     }
 
-    public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException  {
+    public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
         try {
             DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
             documentCreator.createLocalHostDocs();
@@ -219,8 +115,8 @@ public class CreateLaunchExperiment {
         return airavataAPI;
     }
 
-    public static String createExperimentForTrestles(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -255,50 +151,50 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("WRF_Namelist");
             input.setType(DataType.URI);
             input.setValue("/Users/raminder/Downloads/wrf_sample_inputs/namelist.input");
-            
+
             DataObjectType input1 = new DataObjectType();
             input1.setKey("WRF_Input_File");
             input1.setType(DataType.URI);
             input1.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfinput_d01");
-            
+
             DataObjectType input2 = new DataObjectType();
             input2.setKey("WRF_Boundary_File");
             input2.setType(DataType.URI);
             input2.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfbdy_d01");
-            
+
             exInputs.add(input);
             exInputs.add(input1);
             exInputs.add(input2);
 
-           
+
             List<DataObjectType> exOut = new ArrayList<DataObjectType>();
             DataObjectType output = new DataObjectType();
             output.setKey("WRF_Output");
             output.setType(DataType.URI);
             output.setValue("");
-            
+
             DataObjectType output1 = new DataObjectType();
             output1.setKey("WRF_Execution_Log");
             output1.setType(DataType.URI);
             output1.setValue("");
-            
-            
+
+
             exOut.add(output);
             exOut.add(output1);
-           
+
 
             Experiment simpleExperiment =
                     ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", "WRF", exInputs);
@@ -320,35 +216,35 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static String cloneExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             return client.cloneExperiment(expId, "cloneExperiment1");
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static void updateExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static void updateExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             Experiment experiment = client.getExperiment(expId);
             experiment.setDescription("updatedDescription");
-            client.updateExperiment(expId, experiment );
-        }catch (TException e) {
+            client.updateExperiment(expId, experiment);
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
 
-    public static String createExperimentForLocalHost(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -386,14 +282,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentForSSHHost(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -432,13 +328,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    public static String createExperimentForStampede(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForStampede(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -477,13 +374,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       public static String createExperimentForLonestar(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForLonestar(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -531,15 +429,15 @@ public class CreateLaunchExperiment {
                 }
             }
             throw e;
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       
-       
-    public static void launchExperiment (Airavata.Client client, String expId)
-            throws TException{
+
+
+    public static void launchExperiment(Airavata.Client client, String expId)
+            throws TException {
         try {
             client.launchExperiment(expId, "testToken");
         } catch (ExperimentNotFoundException e) {
@@ -554,13 +452,13 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static List<Experiment> getExperimentsForUser (Airavata.Client client, String user){
+    public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
         try {
             return client.getAllUserExperiments(user);
         } catch (AiravataSystemException e) {
@@ -569,13 +467,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> getAllUserProject (Airavata.Client client, String user){
+    public static List<Project> getAllUserProject(Airavata.Client client, String user) {
         try {
             return client.getAllUserProjects(user);
         } catch (AiravataSystemException e) {
@@ -584,13 +482,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectName (Airavata.Client client, String user, String projectName){
+    public static List<Project> searchProjectsByProjectName(Airavata.Client client, String user, String projectName) {
         try {
             return client.searchProjectsByProjectName(user, projectName);
         } catch (AiravataSystemException e) {
@@ -599,13 +497,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectDesc (Airavata.Client client, String user, String desc){
+    public static List<Project> searchProjectsByProjectDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchProjectsByProjectDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -614,14 +512,14 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
 
-    public static List<ExperimentSummary> searchExperimentsByName (Airavata.Client client, String user, String expName){
+    public static List<ExperimentSummary> searchExperimentsByName(Airavata.Client client, String user, String expName) {
         try {
             return client.searchExperimentsByName(user, expName);
         } catch (AiravataSystemException e) {
@@ -630,13 +528,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc){
+    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchExperimentsByDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -645,13 +543,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app){
+    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app) {
         try {
             return client.searchExperimentsByApplication(user, app);
         } catch (AiravataSystemException e) {
@@ -660,7 +558,7 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
index 7b2ec60..9c9ec7f 100644
--- a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
+++ b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
@@ -29,6 +29,7 @@ include "airavataDataModel.thrift"
 include "experimentModel.thrift"
 include "workspaceModel.thrift"
 include "applicationCatalogAPI.thrift"
+include "gfacDataMode.thrift"
 
 namespace java org.apache.airavata.api
 namespace php Airavata.API

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index d573da9..ffcff17 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
 		ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
 		ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
 		applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
-		applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
+		applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
 		applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
 
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index bac5913..b8f999a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -31,4 +31,19 @@ public final class Constants {
     public static final String GFAC_CONFIG_XML = "gfac-config.xml";
     public static final String PUSH = "push";
     public static final String PULL = "pull";
+    public static final String API_SERVER_PORT = "apiserver.server.port";
+    public static final String API_SERVER_HOST = "apiserver.server.host";
+    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
+    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
+    public static final String GFAC_SERVER_HOST = "gfac.server.host";
+    public static final String GFAC_SERVER_PORT = "gfac.server.port";
+    public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host";
+    public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port";
+    public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server";
+    public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server";
+    public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments";
+    public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
+    public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index b99c6cb..625f7f2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -276,11 +276,11 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
 job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 submitter.interval=10000
 threadpool.size=10
@@ -297,7 +297,7 @@ appcatalogserver=org.apache.airavata.api.server.ApplicationCatalogServer
 
 
 ###---------------------------Airavata Server Configurations---------------------------###
-servers=apiserver,appcatalogserver,orchestrator
+servers=apiserver,appcatalogserver,orchestrator,gfac
 #shutdown.trategy=NONE
 shutdown.trategy=SELF_TERMINATE
 # credential store specific parameters
@@ -323,4 +323,18 @@ app.catalog.server.host=localhost
 app.catalog.server.port=8931
 orchestrator.server.host=localhost
 orchestrator.server.port=8940
+gfac.server.host=localhost
+gfac.server.port=8950
 orchestrator.server.min.threads=30
+
+##----------------------------- Zookeeper Server Configurations ----------------------###
+
+zookeeper.server.host=localhost
+zookeeper.server.port=2181
+airavata-server=/api-server
+orchestrator-server=/orchestrator-server
+gfac-server=/gfac-server
+gfac-experiments=/gfac-experiments
+gfac-server-name=gfac-node0
+orchestrator-server-name=orch-node0
+airavata-server-name=api-node0

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 108bd32..4d51a13 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -562,7 +562,13 @@
             <artifactId>jackson-annotations</artifactId>
             <version>2.0.0</version>
         </dependency>
+     <!-- zookeeper dependencies -->
 
+        <dependency>
+               	<groupId>org.apache.zookeeper</groupId>
+               	<artifactId>zookeeper</artifactId>
+               	<version>3.4.0</version>
+               </dependency>
 	<dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 0c3e43a..b6c5aa7 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,7 +196,7 @@
                 <include>org.apache.airavata:airavata-data-models:jar</include>
                 <include>org.apache.airavata:airavata-credential-store:jar</include>
                 <include>org.apache.airavata:airavata-gfac-core:jar</include>
-                <include>org.apache.airavata:airavata-gfac-server:jar</include>
+                <include>org.apache.airavata:airavata-gfac-service:jar</include>
                 <include>org.apache.airavata:airavata-gfac-ssh:jar</include>
                 <include>org.apache.airavata:airavata-gfac-local:jar</include>
                 <include>org.apache.airavata:airavata-gfac-gsissh:jar</include>
@@ -243,6 +243,7 @@
                 <include>com.fasterxml.jackson.core:jackson-databind</include>
                 <include>com.fasterxml.jackson.core:jackson-core</include>
                 <include>com.fasterxml.jackson.core:jackson-annotations</include>
+                <include>org.apache.zookeeper:zookeeper</include>
                 <!-- unicore start
                     <include>eu.unicore:ogsabes-client</include>
                     <include>eu.unicore:ogsabes-types</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index e57eccc..d02a658 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -40,12 +40,12 @@
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-model-utils</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
index bf6f933..f96e40f 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
@@ -20,10 +20,10 @@
 */
 package org.apache.airavata.gfac.server;
 
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.IServer;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
@@ -32,6 +32,8 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+
 public class GfacServer implements IServer{
 
     private final static Logger logger = LoggerFactory.getLogger(GfacServer.class);
@@ -50,7 +52,12 @@ public class GfacServer implements IServer{
             throws Exception {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"));
-			TServerTransport serverTransport = new TServerSocket(serverPort);
+            final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null);
+
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+
+			TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
+
             server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor));
 
             new Thread() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1b8d1e8..27733f9 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,12 +20,16 @@
 */
 package org.apache.airavata.gfac.server;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.exception.AiravataConfigurationException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -35,28 +39,111 @@ import org.apache.airavata.registry.api.Gateway;
 import org.apache.airavata.registry.api.exception.RegException;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
 
-public class GfacServerHandler implements GfacService.Iface {
+
+public class GfacServerHandler implements GfacService.Iface, Watcher{
     private final static Logger logger = LoggerFactory.getLogger(GfacServerHandler.class);
 
     private Registry registry;
 
     private String registryURL;
+
     private String gatewayName;
+
     private String airavataUserName;
 
+    private ZooKeeper zk;
+
+    private boolean connected = false;
+
+    private static Integer mutex = new Integer(-1);
+
+    private MonitorPublisher publisher;
+
+
     public GfacServerHandler() {
+        // registering with zk
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+                synchronized(mutex){
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+                String instantNode = gfacServer + File.separator + instanceId;
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                }
+                zkStat = zk.exists(gfacExperiments, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments + File.separator + instanceId,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }else{
+                    logger.error(" Zookeeper is inconsistent state  !!!!!");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
         try {
+            publisher = new MonitorPublisher(new EventBus());
             registry = RegistryFactory.getDefaultRegistry();
             setGatewayProperties();
+            BetterGfacImpl.startDaemonHandlers();
+            BetterGfacImpl.startStatusUpdators(registry,zk,publisher);
         }catch (Exception e){
            logger.error("Error initialising GFAC",e);
         }
     }
 
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            Event.KeeperState state = watchedEvent.getState();
+            if (state == Event.KeeperState.SyncConnected) {
+                mutex.notify();
+                connected = true;
+            }
+        }
+    }
+
     public String getGFACServiceVersion() throws TException {
         return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
     }
@@ -114,9 +201,9 @@ public class GfacServerHandler implements GfacService.Iface {
 
     private GFac getGfac()throws TException{
         try {
-            return new GFacImpl(registry, null,
+            return new BetterGfacImpl(registry, null,
                                 AiravataRegistryFactory.getRegistry(new Gateway(getGatewayName()),
-                                        new AiravataUser(getAiravataUserName())));
+                                        new AiravataUser(getAiravataUserName())),zk,publisher);
         } catch (RegException e) {
             throw new TException("Error initializing gfac instance",e);
         } catch (AiravataConfigurationException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
deleted file mode 100644
index 3e48898..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.util;
-
-public class Constants {
-    public static final String GFAC_SERVER_PORT = "gfac.server.port";
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index da86055..19d5f09 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -120,6 +120,12 @@
             <version>${xmlbeans.version}</version>
         </dependency>
         <!-- this is the dependency for amqp implementation -->
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 86f4055..170c2c8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +38,7 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.registry.cpi.Registry;
 
-public class JobExecutionContext extends AbstractContext{
+public class JobExecutionContext extends AbstractContext implements Serializable{
 
     private GFacConfiguration gfacConfiguration;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
new file mode 100644
index 0000000..195bfc1
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.cpi;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.*;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
+import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+
+/**
+ * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
+ * the resource, required data for the job has to be stored in registry prior to invoke this object.
+ */
+public class BetterGfacImpl implements GFac {
+    private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
+    public static final String ERROR_SENT = "ErrorSent";
+
+    private Registry registry;
+
+    private AiravataAPI airavataAPI;
+
+    private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;                       // we are not storing zk instance in to jobExecution context
+    
+    private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+
+    private static File gfacConfigFile;
+
+    private static List<AbstractActivityListener> activityListeners =  new ArrayList<AbstractActivityListener>();
+
+    private static MonitorPublisher monitorPublisher;
+
+    /**
+     * Constructor for GFac
+     *
+     * @param registry
+     * @param airavataAPI
+     * @param airavataRegistry2
+     * @param zooKeeper
+     */
+    public BetterGfacImpl(Registry registry, AiravataAPI airavataAPI, AiravataRegistry2 airavataRegistry2, ZooKeeper zooKeeper,
+                          MonitorPublisher publisher) {
+        this.registry = registry;
+        this.airavataAPI = airavataAPI;
+        this.airavataRegistry2 = airavataRegistry2;
+        monitorPublisher = publisher;     // This is a EventBus common for gfac
+        this.zk = zooKeeper;
+    }
+
+    public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+        try {
+            String[] listenerClassList = ServerSettings.getActivityListeners();
+            for (String listenerClass : listenerClassList) {
+                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+                AbstractActivityListener abstractActivityListener = aClass.newInstance();
+                activityListeners.add(abstractActivityListener);
+                abstractActivityListener.setup(publisher, registry,zk);
+                log.info("Registering listener: " + listenerClass);
+                publisher.registerListener(abstractActivityListener);
+            }
+        }catch (ClassNotFoundException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (InstantiationException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (IllegalAccessException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (ApplicationSettingsException e){
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        }
+    }
+    public static void startDaemonHandlers()  {
+        List<GFacHandlerConfig> daemonHandlerConfig = null;
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        gfacConfigFile = new File(resource.getPath());
+        try {
+            daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+        } catch (ParserConfigurationException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+        } catch (IOException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (SAXException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (XPathExpressionException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        }
+
+        for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+            String className = handlerConfig.getClassName();
+            try {
+                Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+                ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+                threadedHandler.initProperties(handlerConfig.getProperties());
+                daemonHandlers.add(threadedHandler);
+            }catch (ClassNotFoundException e){
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (InstantiationException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (IllegalAccessException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (GFacHandlerException e) {
+                log.error("Error initializing the handler " + className);
+            } catch (GFacException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+        for(ThreadedHandler tHandler:daemonHandlers){
+            (new Thread(tHandler)).start();
+        }
+    }
+
+    /**
+     * This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
+     */
+    public BetterGfacImpl() {
+        daemonHandlers = new ArrayList<ThreadedHandler>();
+        startDaemonHandlers();
+    }
+
+    /**
+     * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+     * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+     *
+     * @param experimentID
+     * @return
+     * @throws GFacException
+     */
+    public boolean submitJob(String experimentID,String taskID) throws GFacException {
+        JobExecutionContext jobExecutionContext = null;
+        try {
+            jobExecutionContext = createJEC(experimentID, taskID);
+            return submitJob(jobExecutionContext);
+        } catch (Exception e) {
+            log.error("Error inovoking the job with experiment ID: " + experimentID);
+            throw new GFacException(e);
+        }
+    }
+
+    private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+        JobExecutionContext jobExecutionContext;
+        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+        // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
+        // has to be same.
+
+        // 1. Get the Task from the task ID and construct the Job object and save it in to registry
+        // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
+        String serviceName = taskData.getApplicationId();
+        if (serviceName == null) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+       
+        ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
+        if (serviceDescription == null ) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+        String hostName;
+        HostDescription hostDescription = null;
+        if(taskData.getTaskScheduling().getResourceHostId() != null){
+            hostName = taskData.getTaskScheduling().getResourceHostId();
+            hostDescription = airavataRegistry2.getHostDescriptor(hostName);
+        }else{
+        	  List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+              Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+              for (String hostDescName : applicationDescriptors.keySet()) {
+                  registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+              }
+              Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+             HostScheduler hostScheduler = aClass.newInstance();
+            hostDescription = hostScheduler.schedule(registeredHosts);
+        	hostName = hostDescription.getType().getHostName();
+        }
+        if(hostDescription == null){
+        	throw new GFacException("Error executing the job as the host is not registered " + hostName);	
+        }
+        ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        Properties configurationProperties = ServerSettings.getProperties();
+        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+
+        // start constructing jobexecutioncontext
+        jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+        // setting experiment/task/workflownode related information
+        Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
+        jobExecutionContext.setExperiment(experiment);
+        jobExecutionContext.setExperimentID(experimentID);
+        jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
+        jobExecutionContext.setTaskData(taskData);
+
+        // setting the registry
+        jobExecutionContext.setRegistry(registry);
+
+        ApplicationContext applicationContext = new ApplicationContext();
+        applicationContext.setApplicationDeploymentDescription(applicationDescription);
+        applicationContext.setHostDescription(hostDescription);
+        applicationContext.setServiceDescription(serviceDescription);
+        jobExecutionContext.setApplicationContext(applicationContext);
+
+        List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
+        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
+                serviceDescription.getType().getInputParametersArray())));
+
+        List<DataObjectType> outputData = taskData.getApplicationOutputs();
+        jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getMessageContext(outputData,
+                serviceDescription.getType().getOutputParametersArray())));
+
+        jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+
+        return jobExecutionContext;
+    }
+
+    public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
+        // We need to check whether this job is submitted as a part of a large workflow. If yes,
+        // we need to setup workflow tracking listerner.
+        String workflowInstanceID = null;
+        if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+            // This mean we need to register workflow tracking listener.
+            //todo implement WorkflowTrackingListener properly
+            registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+        }
+        // Register log event listener. This is required in all scenarios.
+        jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+        schedule(jobExecutionContext);
+        return true;
+    }
+
+    private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+        // Scheduler will decide the execution flow of handlers and provider which handles
+        // the job.
+        String experimentID = jobExecutionContext.getExperimentID();
+        try {
+            Scheduler.schedule(jobExecutionContext);
+
+            // Executing in handlers in the order as they have configured in GFac configuration
+            invokeInFlowHandlers(jobExecutionContext);
+//            if (experimentID != null){
+//                registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+//            }
+
+            // After executing the in handlers provider instance should be set to job execution context.
+            // We get the provider instance and execute it.
+            GFacProvider provider = jobExecutionContext.getProvider();
+            if (provider != null) {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
+                initProvider(provider, jobExecutionContext);
+                executeProvider(provider, jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
+            }
+            if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+                invokeOutFlowHandlers(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new
+                        ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                        ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                monitorPublisher.publish(new TaskStatusChangeRequest(
+                        new TaskIdentity(jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+                ));
+                monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
+                        new JobIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
+                        "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            throw new GFacException(e.getMessage(), e);
+        }
+    }
+
+    private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.initialize(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
+        }
+    }
+
+    private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+             provider.execute(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+        }
+    }
+
+    private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.dispose(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
+        }
+    }
+
+    private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
+        String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
+        String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+        String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
+        jobExecutionContext.getNotificationService().registerListener(
+                new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
+
+    }
+
+    private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (GFacHandlerException e) {
+                throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+            }
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKED));
+    }
+
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if(gFacConfiguration != null){
+         handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        }else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e) {
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
+            }
+            schedule(jobExecutionContext);
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+        }
+
+        // At this point all the execution is finished so we update the task and experiment statuses.
+        // Handler authors does not have to worry about updating experiment or task statuses.
+        monitorPublisher.publish(new
+                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                ExperimentState.COMPLETED));
+        // Updating the task status if there's any task associated
+        monitorPublisher.publish(new TaskStatusChangeRequest(
+                new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+        ));
+
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+    }
+
+
+    public AiravataAPI getAiravataAPI() {
+        return airavataAPI;
+    }
+
+    public AiravataRegistry2 getAiravataRegistry2() {
+        return airavataRegistry2;
+    }
+
+    public static List<ThreadedHandler> getDaemonHandlers() {
+        return daemonHandlers;
+    }
+
+    public static String getErrorSent() {
+        return ERROR_SENT;
+    }
+
+    public File getGfacConfigFile() {
+        return gfacConfigFile;
+    }
+
+    public static MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public ZooKeeper getZk() {
+        return zk;
+    }
+
+    public void setZk(ZooKeeper zk) {
+        this.zk = zk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index f1fa244..a6908ba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,11 +58,13 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -83,6 +85,8 @@ public class GFacImpl implements GFac {
     private AiravataAPI airavataAPI;
 
     private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;
     
     private static List<ThreadedHandler> daemonHandlers;
 
@@ -436,6 +440,8 @@ public class GFacImpl implements GFac {
                 throw new GFacException("Error Executing a OutFlow Handler", e);
             }
         }
+
+        monitorPublisher.publish(GfacExperimentState.COMPLETED);
         // At this point all the execution is finished so we update the task and experiment statuses.
         // Handler authors does not have to worry about updating experiment or task statuses.
         monitorPublisher.publish(new