You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/24 20:05:48 UTC

[1/5] committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes

Repository: airavata
Updated Branches:
  refs/heads/master f723d4937 -> d56dd443e


http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
new file mode 100644
index 0000000..24b3989
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor;
+
+import com.google.common.eventbus.Subscribe;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+public class GfacInternalStatusUpdator implements AbstractActivityListener, Watcher {
+    private final static Logger logger = LoggerFactory.getLogger(AiravataWorkflowNodeStatusUpdator.class);
+
+    private ZooKeeper zk;
+
+    private Integer mutex = -1;
+
+    @Subscribe
+    public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws KeeperException, InterruptedException, ApplicationSettingsException {
+        logger.info("Gfac internal state changed to: " + statusChangeRequest.getState().toString());
+        MonitorID monitorID = statusChangeRequest.getMonitorID();
+        String experimentPath = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments") +
+                File.separator + ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator + statusChangeRequest.getMonitorID().getExperimentID() + "+" + monitorID.getTaskID();
+        Stat exists = null;
+        try {
+            if (!zk.getState().isConnected()) {
+                String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                        + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex){
+                    mutex.wait();
+                }
+            }
+            exists = zk.exists(experimentPath, false);// this znode is created by orchestrator so it has to exist at this level
+            if (exists == null) {
+                logger.error("ZK path: " + experimentPath + " does not exists !!");
+                logger.error("Zookeeper is in an inconsistent state !!! ");
+                return;
+            }
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        switch (statusChangeRequest.getState()) {
+            case COMPLETED:
+                zk.delete(experimentPath, exists.getVersion());
+                break;
+            case FAILED:
+                zk.delete(experimentPath, exists.getVersion());
+                break;
+            default:
+                zk.setData(experimentPath, (statusChangeRequest.getMonitorID().getJobID() +
+                        "," + statusChangeRequest.getMonitorID().getWorkflowNodeID()).getBytes(), exists.getVersion());
+        }
+    }
+
+    public void setup(Object... configurations) {
+        for (Object configuration : configurations) {
+            if (configuration instanceof ZooKeeper) {
+                this.zk = (ZooKeeper) configuration;
+            }
+        }
+    }
+
+    public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            Event.KeeperState state = watchedEvent.getState();
+            if (state == Event.KeeperState.SyncConnected) {
+                mutex.notify();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
index 84978f1..8456e35 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/MonitorID.java
@@ -78,10 +78,14 @@ public class MonitorID {
         this.jobExecutionContext = jobExecutionContext;
         host = jobExecutionContext.getApplicationContext().getHostDescription();
         userName = jobExecutionContext.getExperiment().getUserName();
-        jobID = jobExecutionContext.getJobDetails().getJobID();
         taskID = jobExecutionContext.getTaskData().getTaskID();
         experimentID = jobExecutionContext.getExperiment().getExperimentID();
         workflowNodeID = jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId();// at this point we only have one node todo: fix this
+        try {
+            jobID = jobExecutionContext.getJobDetails().getJobID();
+        }catch(NullPointerException e){
+            logger.error("There is not job created at this point");
+        }
     }
 
     public HostDescription getHost() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
new file mode 100644
index 0000000..50a60de
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.monitor.state;
+
+import org.apache.airavata.gfac.core.monitor.JobIdentity;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+
+public class GfacExperimentStateChangeRequest {
+    private GfacExperimentState state;
+
+    private JobIdentity identity;
+
+    private MonitorID monitorID;
+
+    public GfacExperimentStateChangeRequest(MonitorID monitorID, GfacExperimentState state) {
+        setIdentity(new JobIdentity(monitorID.getExperimentID(), monitorID.getWorkflowNodeID(),
+                monitorID.getTaskID(), monitorID.getJobID()));
+        setMonitorID(monitorID);
+        this.state = state;
+    }
+
+    public GfacExperimentStateChangeRequest(MonitorID monitorID, JobIdentity jobId, GfacExperimentState state) {
+        setIdentity(jobId);
+        setMonitorID(monitorID);
+        this.state = state;
+    }
+
+
+    public GfacExperimentState getState() {
+        return state;
+    }
+
+    public void setState(GfacExperimentState state) {
+        this.state = state;
+    }
+
+    public JobIdentity getIdentity() {
+        return identity;
+    }
+
+    public void setIdentity(JobIdentity identity) {
+        this.identity = identity;
+    }
+
+    public MonitorID getMonitorID() {
+        return monitorID;
+    }
+
+    public void setMonitorID(MonitorID monitorID) {
+        this.monitorID = monitorID;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
index 7b24eb9..6e3f59e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractProvider.java
@@ -24,6 +24,7 @@ package org.apache.airavata.gfac.core.provider;
 import com.google.common.eventbus.EventBus;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.model.workspace.experiment.JobDetails;
@@ -44,8 +45,11 @@ public abstract class AbstractProvider implements GFacProvider{
 
     protected MonitorPublisher monitorPublisher;
 
-    protected AbstractProvider() {
+    protected AbstractProvider() {                                            //todo this has to be fixed
         this.monitorPublisher = GFacImpl.getMonitorPublisher();
+        if(this.monitorPublisher == null){
+            this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
+        }
     }
 
     public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
new file mode 100644
index 0000000..8d06aad
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
@@ -0,0 +1,82 @@
+    /*
+     * Licensed to the Apache Software Foundation (ASF) under one or more
+     * contributor license agreements.  See the NOTICE file distributed with
+     * this work for additional information regarding copyright ownership.
+     * The ASF licenses this file to You under the Apache License, Version 2.0
+     * (the "License"); you may not use this file except in compliance with
+     * the License.  You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+@SuppressWarnings("all") public enum GfacExperimentState implements org.apache.thrift.TEnum {
+  INHANDLERSINVOKING(0),
+  INHANDLERSINVOKED(1),
+  PROVIDERINVOKING(2),
+  PROVIDERINVOKED(3),
+  OUTHANDLERSINVOKING(4),
+  OUTHANDLERSINVOKED(5),
+  COMPLETED(6),
+  FAILED(7),
+  UNKNOWN(8);
+
+  private final int value;
+
+  private GfacExperimentState(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static GfacExperimentState findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return INHANDLERSINVOKING;
+      case 1:
+        return INHANDLERSINVOKED;
+      case 2:
+        return PROVIDERINVOKING;
+      case 3:
+        return PROVIDERINVOKED;
+      case 4:
+        return OUTHANDLERSINVOKING;
+      case 5:
+        return OUTHANDLERSINVOKED;
+      case 6:
+        return COMPLETED;
+      case 7:
+        return FAILED;
+      case 8:
+        return UNKNOWN;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
new file mode 100644
index 0000000..3b9273d
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
@@ -0,0 +1,516 @@
+    /*
+     * Licensed to the Apache Software Foundation (ASF) under one or more
+     * contributor license agreements.  See the NOTICE file distributed with
+     * this work for additional information regarding copyright ownership.
+     * The ASF licenses this file to You under the Apache License, Version 2.0
+     * (the "License"); you may not use this file except in compliance with
+     * the License.  You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class GfacExperimentStatus implements org.apache.thrift.TBase<GfacExperimentStatus, GfacExperimentStatus._Fields>, java.io.Serializable, Cloneable, Comparable<GfacExperimentStatus> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GfacExperimentStatus");
+
+  private static final org.apache.thrift.protocol.TField GFAC_EXPERIMENT_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("gfacExperimentState", org.apache.thrift.protocol.TType.I32, (short)1);
+  private static final org.apache.thrift.protocol.TField TIME_OF_STATE_CHANGE_FIELD_DESC = new org.apache.thrift.protocol.TField("timeOfStateChange", org.apache.thrift.protocol.TType.I64, (short)2);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new GfacExperimentStatusStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new GfacExperimentStatusTupleSchemeFactory());
+  }
+
+  /**
+   * 
+   * @see GfacExperimentState
+   */
+  public GfacExperimentState gfacExperimentState; // required
+  public long timeOfStateChange; // optional
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    /**
+     * 
+     * @see GfacExperimentState
+     */
+    GFAC_EXPERIMENT_STATE((short)1, "gfacExperimentState"),
+    TIME_OF_STATE_CHANGE((short)2, "timeOfStateChange");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // GFAC_EXPERIMENT_STATE
+          return GFAC_EXPERIMENT_STATE;
+        case 2: // TIME_OF_STATE_CHANGE
+          return TIME_OF_STATE_CHANGE;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __TIMEOFSTATECHANGE_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private _Fields optionals[] = {_Fields.TIME_OF_STATE_CHANGE};
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.GFAC_EXPERIMENT_STATE, new org.apache.thrift.meta_data.FieldMetaData("gfacExperimentState", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, GfacExperimentState.class)));
+    tmpMap.put(_Fields.TIME_OF_STATE_CHANGE, new org.apache.thrift.meta_data.FieldMetaData("timeOfStateChange", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GfacExperimentStatus.class, metaDataMap);
+  }
+
+  public GfacExperimentStatus() {
+  }
+
+  public GfacExperimentStatus(
+    GfacExperimentState gfacExperimentState)
+  {
+    this();
+    this.gfacExperimentState = gfacExperimentState;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public GfacExperimentStatus(GfacExperimentStatus other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetGfacExperimentState()) {
+      this.gfacExperimentState = other.gfacExperimentState;
+    }
+    this.timeOfStateChange = other.timeOfStateChange;
+  }
+
+  public GfacExperimentStatus deepCopy() {
+    return new GfacExperimentStatus(this);
+  }
+
+  @Override
+  public void clear() {
+    this.gfacExperimentState = null;
+    setTimeOfStateChangeIsSet(false);
+    this.timeOfStateChange = 0;
+  }
+
+  /**
+   * 
+   * @see GfacExperimentState
+   */
+  public GfacExperimentState getGfacExperimentState() {
+    return this.gfacExperimentState;
+  }
+
+  /**
+   * 
+   * @see GfacExperimentState
+   */
+  public GfacExperimentStatus setGfacExperimentState(GfacExperimentState gfacExperimentState) {
+    this.gfacExperimentState = gfacExperimentState;
+    return this;
+  }
+
+  public void unsetGfacExperimentState() {
+    this.gfacExperimentState = null;
+  }
+
+  /** Returns true if field gfacExperimentState is set (has been assigned a value) and false otherwise */
+  public boolean isSetGfacExperimentState() {
+    return this.gfacExperimentState != null;
+  }
+
+  public void setGfacExperimentStateIsSet(boolean value) {
+    if (!value) {
+      this.gfacExperimentState = null;
+    }
+  }
+
+  public long getTimeOfStateChange() {
+    return this.timeOfStateChange;
+  }
+
+  public GfacExperimentStatus setTimeOfStateChange(long timeOfStateChange) {
+    this.timeOfStateChange = timeOfStateChange;
+    setTimeOfStateChangeIsSet(true);
+    return this;
+  }
+
+  public void unsetTimeOfStateChange() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
+  }
+
+  /** Returns true if field timeOfStateChange is set (has been assigned a value) and false otherwise */
+  public boolean isSetTimeOfStateChange() {
+    return EncodingUtils.testBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
+  }
+
+  public void setTimeOfStateChangeIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case GFAC_EXPERIMENT_STATE:
+      if (value == null) {
+        unsetGfacExperimentState();
+      } else {
+        setGfacExperimentState((GfacExperimentState)value);
+      }
+      break;
+
+    case TIME_OF_STATE_CHANGE:
+      if (value == null) {
+        unsetTimeOfStateChange();
+      } else {
+        setTimeOfStateChange((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case GFAC_EXPERIMENT_STATE:
+      return getGfacExperimentState();
+
+    case TIME_OF_STATE_CHANGE:
+      return Long.valueOf(getTimeOfStateChange());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case GFAC_EXPERIMENT_STATE:
+      return isSetGfacExperimentState();
+    case TIME_OF_STATE_CHANGE:
+      return isSetTimeOfStateChange();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof GfacExperimentStatus)
+      return this.equals((GfacExperimentStatus)that);
+    return false;
+  }
+
+  public boolean equals(GfacExperimentStatus that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_gfacExperimentState = true && this.isSetGfacExperimentState();
+    boolean that_present_gfacExperimentState = true && that.isSetGfacExperimentState();
+    if (this_present_gfacExperimentState || that_present_gfacExperimentState) {
+      if (!(this_present_gfacExperimentState && that_present_gfacExperimentState))
+        return false;
+      if (!this.gfacExperimentState.equals(that.gfacExperimentState))
+        return false;
+    }
+
+    boolean this_present_timeOfStateChange = true && this.isSetTimeOfStateChange();
+    boolean that_present_timeOfStateChange = true && that.isSetTimeOfStateChange();
+    if (this_present_timeOfStateChange || that_present_timeOfStateChange) {
+      if (!(this_present_timeOfStateChange && that_present_timeOfStateChange))
+        return false;
+      if (this.timeOfStateChange != that.timeOfStateChange)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return 0;
+  }
+
+  @Override
+  public int compareTo(GfacExperimentStatus other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetGfacExperimentState()).compareTo(other.isSetGfacExperimentState());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetGfacExperimentState()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gfacExperimentState, other.gfacExperimentState);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetTimeOfStateChange()).compareTo(other.isSetTimeOfStateChange());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetTimeOfStateChange()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timeOfStateChange, other.timeOfStateChange);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("GfacExperimentStatus(");
+    boolean first = true;
+
+    sb.append("gfacExperimentState:");
+    if (this.gfacExperimentState == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.gfacExperimentState);
+    }
+    first = false;
+    if (isSetTimeOfStateChange()) {
+      if (!first) sb.append(", ");
+      sb.append("timeOfStateChange:");
+      sb.append(this.timeOfStateChange);
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (gfacExperimentState == null) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'gfacExperimentState' was not present! Struct: " + toString());
+    }
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class GfacExperimentStatusStandardSchemeFactory implements SchemeFactory {
+    public GfacExperimentStatusStandardScheme getScheme() {
+      return new GfacExperimentStatusStandardScheme();
+    }
+  }
+
+  private static class GfacExperimentStatusStandardScheme extends StandardScheme<GfacExperimentStatus> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // GFAC_EXPERIMENT_STATE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
+              struct.setGfacExperimentStateIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // TIME_OF_STATE_CHANGE
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.timeOfStateChange = iprot.readI64();
+              struct.setTimeOfStateChangeIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.gfacExperimentState != null) {
+        oprot.writeFieldBegin(GFAC_EXPERIMENT_STATE_FIELD_DESC);
+        oprot.writeI32(struct.gfacExperimentState.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.isSetTimeOfStateChange()) {
+        oprot.writeFieldBegin(TIME_OF_STATE_CHANGE_FIELD_DESC);
+        oprot.writeI64(struct.timeOfStateChange);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class GfacExperimentStatusTupleSchemeFactory implements SchemeFactory {
+    public GfacExperimentStatusTupleScheme getScheme() {
+      return new GfacExperimentStatusTupleScheme();
+    }
+  }
+
+  private static class GfacExperimentStatusTupleScheme extends TupleScheme<GfacExperimentStatus> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      oprot.writeI32(struct.gfacExperimentState.getValue());
+      BitSet optionals = new BitSet();
+      if (struct.isSetTimeOfStateChange()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.isSetTimeOfStateChange()) {
+        oprot.writeI64(struct.timeOfStateChange);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
+      struct.setGfacExperimentStateIsSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.timeOfStateChange = iprot.readI64();
+        struct.setTimeOfStateChangeIsSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
new file mode 100644
index 0000000..20099f2
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
@@ -0,0 +1,59 @@
+    /*
+     * Licensed to the Apache Software Foundation (ASF) under one or more
+     * contributor license agreements.  See the NOTICE file distributed with
+     * this work for additional information regarding copyright ownership.
+     * The ASF licenses this file to You under the Apache License, Version 2.0
+     * (the "License"); you may not use this file except in compliance with
+     * the License.  You may obtain a copy of the License at
+     *
+     *     http://www.apache.org/licenses/LICENSE-2.0
+     *
+     * Unless required by applicable law or agreed to in writing, software
+     * distributed under the License is distributed on an "AS IS" BASIS,
+     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+     * See the License for the specific language governing permissions and
+     * limitations under the License.
+     */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.workspace.experiment;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("all") public class gfacDataModelConstants {
+
+  public static final String DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
+
+  public static final String DEFAULT_PROJECT_NAME = "DEFAULT";
+
+  public static final String SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE";
+
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh b/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
index 73cdba3..f2adf51 100755
--- a/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
+++ b/modules/gfac/gfac-thrift-descriptions/generate-gfac-stubs.sh
@@ -118,6 +118,8 @@ rm -rf ${JAVA_GEN_DIR}
 # Using thrify Java generator, generate the java classes based on Airavata API. This
 #   The airavataAPI.thrift includes rest of data models.
 thrift ${THRIFT_ARGS} --gen java gfac.cpi.service.thrift || fail unable to generate java thrift classes
+thrift ${THRIFT_ARGS} --gen java gfacDataModel.thrift || fail unable to generate java thrift classes
+
 
 # For the generated java classes add the ASF V2 License header
 add_license_header $JAVA_GEN_DIR

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift b/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
new file mode 100644
index 0000000..2438e3c
--- /dev/null
+++ b/modules/gfac/gfac-thrift-descriptions/gfacDataModel.thrift
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+include "applicationCatalogDataModel.thrift"
+
+namespace java org.apache.airavata.gfac.workspace.experiment
+namespace php Airavata.Model.Workspace.Experiment
+
+/*
+ * This file describes the definitions of the Gfac Framework level Experiment Data Structures. Each of the
+ *   language specific Airavata Client SDK's will translate this neutral data model into an
+ *   appropriate form for passing to the Airavata Server Execution API Calls.
+ *
+ *   This data-model will not be visible to the outside users but it will be used inside GFAc to recover
+ *   the failed jobs or hanged jobs.
+ *
+*/
+
+const string DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS"
+const string DEFAULT_PROJECT_NAME = "DEFAULT"
+const string SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE"
+
+enum GfacExperimentState {
+    INHANDLERSINVOKING,
+    INHANDLERSINVOKED,
+    PROVIDERINVOKING,
+    PROVIDERINVOKED,
+    OUTHANDLERSINVOKING,
+    OUTHANDLERSINVOKED,
+    COMPLETED,
+    FAILED,
+    UNKNOWN
+}
+
+struct GfacExperimentStatus {
+    1: required GfacExperimentState gfacExperimentState,
+    2: optional i64 timeOfStateChange
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
index df24a9e..4fce52f 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServer.java
@@ -53,6 +53,7 @@ public class OrchestratorServer implements IServer{
             throws Exception {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_PORT,"8940"));
+
             final String serverHost = ServerSettings.getSetting(Constants.ORCHESTRATOT_SERVER_HOST, null);
             
 			TServerTransport serverTransport;

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 2c66293..15e2c63 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -21,11 +21,18 @@
 
 package org.apache.airavata.orchestrator.server;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Calendar;
 import java.util.List;
+import java.util.Random;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.error.LaunchValidationException;
 import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.orchestrator.util.OrchestratorRecoveryHandler;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.cpi.OrchestratorService;
 import org.apache.airavata.orchestrator.cpi.orchestrator_cpi_serviceConstants;
@@ -37,16 +44,23 @@ import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.TaskDetailConstants;
 import org.apache.airavata.registry.cpi.utils.Constants.FieldConstants.WorkflowNodeConstants;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OrchestratorServerHandler implements OrchestratorService.Iface {
+public class OrchestratorServerHandler implements OrchestratorService.Iface, Watcher {
     private static Logger log = LoggerFactory.getLogger(OrchestratorServerHandler.class);
 
     private SimpleOrchestratorImpl orchestrator = null;
 
     private Registry registry;
 
+    private ZooKeeper zk;
+
+    private static Integer mutex = new Integer(-1);
+    ;
+
     /**
      * Query orchestrator server to fetch the CPI version
      */
@@ -57,18 +71,59 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
 
     public OrchestratorServerHandler() {
+        // registering with zk
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
+                synchronized (mutex) {
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(OrchServer, false);
+                if (zkStat == null) {
+                    zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instantNode = OrchServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                }
+                // creating a watch in orchestrator to monitor the gfac instances
+                zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), this);
+                log.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
+        // orchestrator init
         try {
             // first constructing the monitorManager and orchestrator, then fill the required properties
             orchestrator = new SimpleOrchestratorImpl();
             registry = RegistryFactory.getDefaultRegistry();
             orchestrator.initialize();
-        }catch (OrchestratorException e) {
+            orchestrator.getOrchestratorContext().setZk(this.zk);
+        } catch (OrchestratorException e) {
             e.printStackTrace();
         } catch (RegistryException e) {
             e.printStackTrace();
         }
     }
 
+
+
     /**
      * * After creating the experiment Data user have the
      * * experimentID as the handler to the experiment, during the launchExperiment
@@ -82,11 +137,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
      * @param experimentId
      */
     public boolean launchExperiment(String experimentId) throws TException {
-    	Experiment experiment= null;
+        Experiment experiment = null;
         try {
-            List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL,WorkflowNodeConstants.EXPERIMENT_ID,experimentId);
+            List<String> ids = registry.getIds(RegistryModelType.WORKFLOW_NODE_DETAIL, WorkflowNodeConstants.EXPERIMENT_ID, experimentId);
             for (String workflowNodeId : ids) {
-                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails)registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
+                WorkflowNodeDetails workflowNodeDetail = (WorkflowNodeDetails) registry.get(RegistryModelType.WORKFLOW_NODE_DETAIL, workflowNodeId);
                 List<Object> taskDetailList = registry.get(RegistryModelType.TASK_DETAIL, TaskDetailConstants.NODE_ID, workflowNodeId);
                 for (Object o : taskDetailList) {
                     TaskDetails taskID = (TaskDetails) o;
@@ -116,11 +171,11 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
             status.setTimeOfStateChange(Calendar.getInstance().getTimeInMillis());
             experiment.setExperimentStatus(status);
             try {
-				registry.update(RegistryModelType.EXPERIMENT, experiment, experimentId);
-			} catch (RegistryException e1) {
-				 throw new TException(e);
-			}
-      
+                registry.update(RegistryModelType.EXPERIMENT, experiment, experimentId);
+            } catch (RegistryException e1) {
+                throw new TException(e);
+            }
+
             throw new TException(e);
         }
         return true;
@@ -130,11 +185,12 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
     /**
      * This method will validate the experiment before launching, if is failed we do not run the launch in airavata
      * thrift service (only if validation is enabled
+     *
      * @param experimentId
      * @return
      * @throws TException
      */
-    public boolean validateExperiment(String experimentId) throws TException,LaunchValidationException {
+    public boolean validateExperiment(String experimentId) throws TException, LaunchValidationException {
         //TODO: Write the Orchestrator implementaion
         try {
             List<TaskDetails> tasks = orchestrator.createTasks(experimentId);
@@ -167,17 +223,68 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface {
 
     /**
      * This can be used to cancel a running experiment and store the status to terminated in registry
+     *
      * @param experimentId
      * @return
      * @throws TException
      */
     public boolean terminateExperiment(String experimentId) throws TException {
-    	try {
-			orchestrator.cancelExperiment(experimentId);
-		} catch (OrchestratorException e) {
-			log.error("Error canceling experiment "+experimentId,e);
-			return false;
-		}
+        try {
+            orchestrator.cancelExperiment(experimentId);
+        } catch (OrchestratorException e) {
+            log.error("Error canceling experiment " + experimentId, e);
+            return false;
+        }
         return true;
     }
+
+    /** This method gracefully handler gfac node failures */
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            try {
+                Event.KeeperState state = watchedEvent.getState();
+                switch (state) {
+                    case SyncConnected:
+                        mutex.notify();
+                        break;
+                }
+                if (watchedEvent.getPath() != null && watchedEvent.getPath().startsWith(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"))) {
+                    List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server"), true);
+                    for (String gfacNodes : children) {
+                        zk.exists(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server") +
+                                File.separator + gfacNodes, this);
+                    }
+                    switch (watchedEvent.getType()) {
+                        case NodeCreated:
+                            mutex.notify();
+                            break;
+                        case NodeDeleted:
+                            // here we have to handle gfac node shutdown case
+                            if (children.size() == 0) {
+                                log.error("There are not gfac instances to route failed jobs");
+                                return;
+                            }
+                            // we recover one gfac node at a time
+                            final WatchedEvent event = watchedEvent;
+                            final OrchestratorServerHandler handler = this;
+                            (new Thread() {
+                                public void run() {
+                                    try {
+                                        (new OrchestratorRecoveryHandler(handler, event.getPath())).recover();     // run this task in a separate thread
+                                    } catch (Exception e) {
+                                        e.printStackTrace();
+                                        log.error("error recovering the jobs for gfac-node: " + event.getPath());
+                                    }
+                                }
+                            }).start();
+                            break;
+                    }
+                }
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
new file mode 100644
index 0000000..05a87e7
--- /dev/null
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.orchestrator.util;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
+import org.apache.airavata.orchestrator.server.OrchestratorServerHandler;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class OrchestratorRecoveryHandler implements Watcher{
+    private static Logger log = LoggerFactory.getLogger(OrchestratorRecoveryHandler.class);
+
+    private ZooKeeper zk;
+
+    private String gfacId;
+
+    private String zkExperimentPath;
+
+    private static Integer mutex = -1;
+
+    private OrchestratorServerHandler serverHandler;
+
+    public OrchestratorRecoveryHandler(OrchestratorServerHandler handler,String zkExpPath) {
+        this.zk = zk;
+        int index = zkExpPath.split(File.separator).length - 1;
+        this.gfacId = zkExpPath.split(File.separator)[index];
+        this.zkExperimentPath = zkExpPath;
+        this.serverHandler = handler;
+    }
+
+    /**
+     * This method return the list of experimentId
+     * @return
+     * @throws OrchestratorException
+     * @throws ApplicationSettingsException
+     * @throws IOException
+     * @throws KeeperException
+     * @throws InterruptedException
+     */
+    public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
+        String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+        zk = new ZooKeeper(zkhostPort, 6000, this);
+        synchronized (mutex) {
+            mutex.wait();
+        }
+        List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+                + File.separator + gfacId, false);
+        for (String expId : children) {
+            log.info("Recovering1 Experiment: " + expId.split("\\+")[0]);
+            try {
+                serverHandler.launchExperiment(expId.split("\\+")[0]);
+                //return of above call gaurantee that experiment is persist some gfac node in the cluster
+                Stat exists = zk.exists(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+                                + File.separator + gfacId + File.separator + expId, false);
+                if(exists != null) {
+                    zk.delete(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+                                                    + File.separator + gfacId + File.separator + expId, exists.getVersion());
+                }
+            } catch (Exception e) {       // we attempt all the experiments
+                e.printStackTrace();
+            }
+        }
+    }
+
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            Event.KeeperState state = watchedEvent.getState();
+            switch (state) {
+                case SyncConnected:
+                    mutex.notify();
+                    break;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/pom.xml b/modules/orchestrator/orchestrator-core/pom.xml
index 3912dd1..95a45a5 100644
--- a/modules/orchestrator/orchestrator-core/pom.xml
+++ b/modules/orchestrator/orchestrator-core/pom.xml
@@ -126,6 +126,12 @@ the License. -->
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
 	<dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-server-configuration</artifactId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
index 8de0482..be3acda 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/context/OrchestratorContext.java
@@ -27,6 +27,7 @@ import org.apache.airavata.orchestrator.core.OrchestratorConfiguration;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.Registry;
+import org.apache.zookeeper.ZooKeeper;
 
 /**
  * This is the context object used in orchestrator which
@@ -39,6 +40,8 @@ public class OrchestratorContext {
     private AiravataRegistry2 registry;
 
     private Registry newRegistry;
+
+    private ZooKeeper zk;
     
     public OrchestratorContext(List<GFACInstance> gfacInstanceList) {
         this.gfacInstanceList = new ArrayList<GFACInstance>();
@@ -78,4 +81,16 @@ public class OrchestratorContext {
     public void setNewRegistry(Registry newRegistry) {
         this.newRegistry = newRegistry;
     }
+
+    public void setGfacInstanceList(List<GFACInstance> gfacInstanceList) {
+        this.gfacInstanceList = gfacInstanceList;
+    }
+
+    public void setZk(ZooKeeper zk) {
+        this.zk = zk;
+    }
+
+    public ZooKeeper getZk() {
+        return zk;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
index 60ecd58..3ec1e8c 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/gfac/GFacClientFactory.java
@@ -28,7 +28,7 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
 public class GFacClientFactory {
-    public static GfacService.Client createOrchestratorClient(String serverHost, int serverPort){
+    public static GfacService.Client createGFacClient(String serverHost, int serverPort){
           try {
               TTransport transport = new TSocket(serverHost, serverPort);
               transport.open();

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 6167134..23089aa 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -20,28 +20,39 @@
 */
 package org.apache.airavata.orchestrator.core.impl;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
 import org.apache.airavata.orchestrator.core.gfac.GFacClientFactory;
 import org.apache.airavata.orchestrator.core.job.JobSubmitter;
 import org.apache.thrift.TException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
 /*
  * this class is responsible for submitting a job to gfac in service mode,
  * it will select a gfac instance based on the incoming request and submit to that
  * gfac instance.
  */
-public class GFACServiceJobSubmitter implements JobSubmitter {
+public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
     private final static Logger logger = LoggerFactory.getLogger(GFACServiceJobSubmitter.class);
+    public static final String IP = "ip";
 
     private OrchestratorContext orchestratorContext;
 
+    private static Integer mutex = -1;
+
     public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
         this.orchestratorContext = orchestratorContext;
     }
@@ -52,12 +63,65 @@ public class GFACServiceJobSubmitter implements JobSubmitter {
     }
 
     public boolean submit(String experimentID, String taskID) throws OrchestratorException {
-        GfacService.Client localhost = GFacClientFactory.createOrchestratorClient("localhost",
-                Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950")));
+        ZooKeeper zk = orchestratorContext.getZk();
+        int retryCount = 0;
         try {
-            return localhost.submitJob(experimentID, taskID);
+            if (!zk.getState().isConnected()) {
+                String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                        + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+                zk = new ZooKeeper(zkhostPort, 6000, this);
+                synchronized (mutex){
+                    mutex.wait();
+                }
+            }
+            String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
+            String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            List<String> children = zk.getChildren(gfacServer, this);
+
+            String pickedChild = children.get(new Random().nextInt(Integer.MAX_VALUE) % children.size());
+            // here we are not using an index because the getChildren does not return the same order everytime
+
+            String gfacNodeData = new String(zk.getData(gfacServer + File.separator + pickedChild, false, null));
+            logger.info("GFAC instance node data: " + gfacNodeData);
+            String[] split = gfacNodeData.split(":");
+            GfacService.Client localhost = GFacClientFactory.createGFacClient(split[0], Integer.parseInt(split[1]));
+            if (zk.exists(gfacServer + File.separator + pickedChild, false) != null) {      // before submitting the job we check again the state of the node
+                String experimentPath = experimentNode + File.separator + pickedChild;
+                String newExpNode = experimentPath + File.separator + experimentID + "+" + taskID;
+                Stat exists1 = zk.exists(newExpNode, this);
+                if (exists1 == null) {
+                    zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                } else {
+                    logger.info("ExperimentID: " + experimentID + " taskID: " + taskID + " is re-running due to gfac failure");
+                }
+                return localhost.submitJob(experimentID, taskID);
+            }
         } catch (TException e) {
             throw new OrchestratorException(e);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return false;
+    }
+
+    synchronized public void process(WatchedEvent event) {
+        synchronized (mutex) {
+            switch (event.getState()){
+                case SyncConnected:
+                    mutex.notify();
+            }
+            switch (event.getType()) {
+                case NodeCreated:
+                    mutex.notify();
+                    break;
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
index 5bbe4e6..7c5cb85 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/AbstractOrchestrator.java
@@ -59,7 +59,9 @@ public abstract class AbstractOrchestrator implements Orchestrator {
 	protected OrchestratorConfiguration orchestratorConfiguration;
 
     private String registryURL;
+
     private String gatewayName;
+
     private String airavataUserName;
    
     public String getRegistryURL() {
@@ -162,4 +164,16 @@ public abstract class AbstractOrchestrator implements Orchestrator {
        }
        return airavataAPI;
    }
+
+    public AiravataRegistry2 getAiravataRegistry() {
+        return airavataRegistry;
+    }
+
+    public OrchestratorContext getOrchestratorContext() {
+        return orchestratorContext;
+    }
+
+    public OrchestratorConfiguration getOrchestratorConfiguration() {
+        return orchestratorConfiguration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
index 5f19c2f..fce6dde 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/cpi/impl/SimpleOrchestratorImpl.java
@@ -191,4 +191,5 @@ public class SimpleOrchestratorImpl extends AbstractOrchestrator{
     public void initialize() throws OrchestratorException {
 
     }
+
 }


[3/5] git commit: more zk related changes

Posted by la...@apache.org.
more zk related changes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e86504aa
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e86504aa
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e86504aa

Branch: refs/heads/master
Commit: e86504aae30401d109690d8814cbf4bcc6f77ae9
Parents: 362da4e
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 13:46:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 13:46:33 2014 -0400

----------------------------------------------------------------------
 .../airavata/client/tools/DocumentCreator.java  |   2 +-
 modules/commons/utils/pom.xml                   |  12 +-
 .../airavata/common/utils/AiravataZKUtils.java  |  59 +++
 .../airavata/gfac/server/GfacServerHandler.java |  13 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 399 +++++++++-----
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |   3 +-
 .../core/monitor/GfacInternalStatusUpdator.java |  24 +-
 .../state/GfacExperimentStateChangeRequest.java |   2 +-
 .../airavata/gfac/core/utils/GFacUtils.java     |  21 +
 .../gfac/core/utils/GfacExperimentState.java    |  81 +++
 .../experiment/GfacExperimentState.java         |  82 ---
 .../experiment/GfacExperimentStatus.java        | 516 -------------------
 .../experiment/gfacDataModelConstants.java      |  59 ---
 .../server/OrchestratorServerHandler.java       |   5 +-
 .../core/impl/GFACServiceJobSubmitter.java      |   6 +
 15 files changed, 483 insertions(+), 801 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index ffcff17..d573da9 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
 		ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
 		ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
 		applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
-		applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
+		applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
 		applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
 
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/pom.xml
----------------------------------------------------------------------
diff --git a/modules/commons/utils/pom.xml b/modules/commons/utils/pom.xml
index 151a141..aca8fea 100644
--- a/modules/commons/utils/pom.xml
+++ b/modules/commons/utils/pom.xml
@@ -8,7 +8,8 @@
     ANY ~ KIND, either express or implied. See the License for the specific language governing permissions and limitations under 
     the License. -->
 
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
 
     <parent>
         <groupId>org.apache.airavata</groupId>
@@ -96,7 +97,7 @@
             <artifactId>tomcat-embed-core</artifactId>
             <version>7.0.22</version>
         </dependency>
-		<dependency>
+        <dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>
             <version>1.2</version>
@@ -107,7 +108,7 @@
             <artifactId>junit</artifactId>
             <scope>test</scope>
         </dependency>
-	<dependency>
+        <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>
@@ -122,6 +123,11 @@
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.0</version>
+        </dependency>
     </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
new file mode 100644
index 0000000..7349b7e
--- /dev/null
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/AiravataZKUtils.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.common.utils;
+
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.File;
+
+public class AiravataZKUtils {
+    public static final String ZK_EXPERIMENT_STATE_NODE = "state";
+
+    public static String getExpZnodePath(String experimentId, String taskId) throws ApplicationSettingsException {
+        return ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE) +
+                File.separator +
+                ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME) + File.separator
+                + experimentId + "+" + taskId;
+    }
+
+    public static String getZKhostPort() throws ApplicationSettingsException {
+        return ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_HOST)
+                + ":" + ServerSettings.getSetting(Constants.ZOOKEEPER_SERVER_PORT);
+    }
+
+    public static String getExpStatePath(String experimentId, String taskId) throws ApplicationSettingsException {
+        return AiravataZKUtils.getExpZnodePath(experimentId, taskId) +
+                File.separator +
+                "state";
+    }
+
+    public static String getExpState(ZooKeeper zk,String expId,String tId) throws ApplicationSettingsException,
+            KeeperException, InterruptedException {
+        Stat exists = zk.exists(getExpStatePath(expId, tId), false);
+        if(exists  != null) {
+             return new String(zk.getData(getExpStatePath(expId, tId),false, exists));
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 27733f9..60a0f21 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.gfac.server;
 import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.exception.AiravataConfigurationException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
@@ -74,8 +75,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
     public GfacServerHandler() {
         // registering with zk
         try {
-            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
-                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String zkhostPort = AiravataZKUtils.getZKhostPort();
             String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
                     + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
             try {
@@ -140,6 +140,15 @@ public class GfacServerHandler implements GfacService.Iface, Watcher{
             if (state == Event.KeeperState.SyncConnected) {
                 mutex.notify();
                 connected = true;
+            } else if(state == Event.KeeperState.Expired ||
+                    state == Event.KeeperState.Disconnected){
+                try {
+                    zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(),6000,this);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                } catch (ApplicationSettingsException e) {
+                    e.printStackTrace();
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 195bfc1..6fb3e24 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -28,8 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
-import com.google.common.eventbus.EventBus;
-
 import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
@@ -59,12 +57,12 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,12 +86,12 @@ public class BetterGfacImpl implements GFac {
     private AiravataRegistry2 airavataRegistry2;
 
     private ZooKeeper zk;                       // we are not storing zk instance in to jobExecution context
-    
+
     private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
 
     private static File gfacConfigFile;
 
-    private static List<AbstractActivityListener> activityListeners =  new ArrayList<AbstractActivityListener>();
+    private static List<AbstractActivityListener> activityListeners = new ArrayList<AbstractActivityListener>();
 
     private static MonitorPublisher monitorPublisher;
 
@@ -114,35 +112,36 @@ public class BetterGfacImpl implements GFac {
         this.zk = zooKeeper;
     }
 
-    public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+    public static void startStatusUpdators(Registry registry, ZooKeeper zk, MonitorPublisher publisher) {
         try {
             String[] listenerClassList = ServerSettings.getActivityListeners();
             for (String listenerClass : listenerClassList) {
                 Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
                 AbstractActivityListener abstractActivityListener = aClass.newInstance();
                 activityListeners.add(abstractActivityListener);
-                abstractActivityListener.setup(publisher, registry,zk);
+                abstractActivityListener.setup(publisher, registry, zk);
                 log.info("Registering listener: " + listenerClass);
                 publisher.registerListener(abstractActivityListener);
             }
-        }catch (ClassNotFoundException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (ClassNotFoundException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties", e);
         } catch (InstantiationException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+            log.error("Error loading the listener classes configured in airavata-server.properties", e);
         } catch (IllegalAccessException e) {
-            log.error("Error loading the listener classes configured in airavata-server.properties",e);
-        } catch (ApplicationSettingsException e){
-            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+            log.error("Error loading the listener classes configured in airavata-server.properties", e);
+        } catch (ApplicationSettingsException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties", e);
         }
     }
-    public static void startDaemonHandlers()  {
+
+    public static void startDaemonHandlers() {
         List<GFacHandlerConfig> daemonHandlerConfig = null;
         URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
         gfacConfigFile = new File(resource.getPath());
         try {
             daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
         } catch (ParserConfigurationException e) {
-            log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
         } catch (IOException e) {
             log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
         } catch (SAXException e) {
@@ -151,14 +150,14 @@ public class BetterGfacImpl implements GFac {
             log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
         }
 
-        for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+        for (GFacHandlerConfig handlerConfig : daemonHandlerConfig) {
             String className = handlerConfig.getClassName();
             try {
                 Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
                 ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
                 threadedHandler.initProperties(handlerConfig.getProperties());
                 daemonHandlers.add(threadedHandler);
-            }catch (ClassNotFoundException e){
+            } catch (ClassNotFoundException e) {
                 log.error("Error initializing the handler: " + className);
                 log.error(className + " class has to implement " + ThreadedHandler.class);
             } catch (InstantiationException e) {
@@ -173,7 +172,7 @@ public class BetterGfacImpl implements GFac {
                 e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
             }
         }
-        for(ThreadedHandler tHandler:daemonHandlers){
+        for (ThreadedHandler tHandler : daemonHandlers) {
             (new Thread(tHandler)).start();
         }
     }
@@ -194,7 +193,7 @@ public class BetterGfacImpl implements GFac {
      * @return
      * @throws GFacException
      */
-    public boolean submitJob(String experimentID,String taskID) throws GFacException {
+    public boolean submitJob(String experimentID, String taskID) throws GFacException {
         JobExecutionContext jobExecutionContext = null;
         try {
             jobExecutionContext = createJEC(experimentID, taskID);
@@ -216,31 +215,31 @@ public class BetterGfacImpl implements GFac {
         // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
         String serviceName = taskData.getApplicationId();
         if (serviceName == null) {
-            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName);
         }
-       
+
         ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
-        if (serviceDescription == null ) {
-            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        if (serviceDescription == null) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName);
         }
         String hostName;
         HostDescription hostDescription = null;
-        if(taskData.getTaskScheduling().getResourceHostId() != null){
+        if (taskData.getTaskScheduling().getResourceHostId() != null) {
             hostName = taskData.getTaskScheduling().getResourceHostId();
             hostDescription = airavataRegistry2.getHostDescriptor(hostName);
-        }else{
-        	  List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
-              Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
-              for (String hostDescName : applicationDescriptors.keySet()) {
-                  registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
-              }
-              Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
-             HostScheduler hostScheduler = aClass.newInstance();
+        } else {
+            List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+            Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+            for (String hostDescName : applicationDescriptors.keySet()) {
+                registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+            }
+            Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+            HostScheduler hostScheduler = aClass.newInstance();
             hostDescription = hostScheduler.schedule(registeredHosts);
-        	hostName = hostDescription.getType().getHostName();
+            hostName = hostDescription.getType().getHostName();
         }
-        if(hostDescription == null){
-        	throw new GFacException("Error executing the job as the host is not registered " + hostName);	
+        if (hostDescription == null) {
+            throw new GFacException("Error executing the job as the host is not registered " + hostName);
         }
         ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
         URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
@@ -283,6 +282,8 @@ public class BetterGfacImpl implements GFac {
     public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
         // We need to check whether this job is submitted as a part of a large workflow. If yes,
         // we need to setup workflow tracking listerner.
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                           , GfacExperimentState.ACCEPTED));                  // immediately we get the request we update the status
         String workflowInstanceID = null;
         if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
             // This mean we need to register workflow tracking listener.
@@ -291,11 +292,11 @@ public class BetterGfacImpl implements GFac {
         }
         // Register log event listener. This is required in all scenarios.
         jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
-        schedule(jobExecutionContext);
+        launch(jobExecutionContext);
         return true;
     }
 
-    private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+    private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
         // Scheduler will decide the execution flow of handlers and provider which handles
         // the job.
         String experimentID = jobExecutionContext.getExperimentID();
@@ -303,27 +304,33 @@ public class BetterGfacImpl implements GFac {
             Scheduler.schedule(jobExecutionContext);
 
             // Executing in handlers in the order as they have configured in GFac configuration
-            invokeInFlowHandlers(jobExecutionContext);
+            // here we do not skip handler if some handler does not have to be run again during re-run it can implement
+            // that logic in to the handler
+            int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+            if(stateVal >=2){
+                reInvokeInFlowHandlers(jobExecutionContext);
+            }else {
+                invokeInFlowHandlers(jobExecutionContext);               // to keep the consistency we always try to re-run to avoid complexity
+            }
 //            if (experimentID != null){
 //                registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
 //            }
 
             // After executing the in handlers provider instance should be set to job execution context.
             // We get the provider instance and execute it.
-            GFacProvider provider = jobExecutionContext.getProvider();
-            if (provider != null) {
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
-                initProvider(provider, jobExecutionContext);
-                executeProvider(provider, jobExecutionContext);
-                disposeProvider(provider, jobExecutionContext);
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
-            }
-            if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
-                invokeOutFlowHandlers(jobExecutionContext);
+            stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+            if (stateVal == 4) {         // if the job is completed during resubmission we handle it here
+                reInvokeProvider(jobExecutionContext);
+            }else if(stateVal == 3){
+                invokeProvider(jobExecutionContext);
+            }else{
+                log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!");
+                log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
             }
         } catch (Exception e) {
             try {
                 // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
                 monitorPublisher.publish(new
                         ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
                         ExperimentState.FAILED));
@@ -335,9 +342,9 @@ public class BetterGfacImpl implements GFac {
                 ));
                 monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
                         new JobIdentity(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED
+                ));
             } catch (NullPointerException e1) {
                 log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
                         "NullPointerException occurred because at this point there might not have Job Created", e1, e);
@@ -348,6 +355,35 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
+    private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacProvider provider = jobExecutionContext.getProvider();
+        if (provider != null) {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+            initProvider(provider, jobExecutionContext);
+            executeProvider(provider, jobExecutionContext);
+            disposeProvider(provider, jobExecutionContext);
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+        }
+        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+            invokeOutFlowHandlers(jobExecutionContext);
+        }
+    }
+
+    private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacProvider provider = jobExecutionContext.getProvider();
+        if (provider != null) {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+            initProvider(provider, jobExecutionContext);
+            executeProvider(provider, jobExecutionContext);
+            disposeProvider(provider, jobExecutionContext);
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
+        }
+        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+            invokeOutFlowHandlers(jobExecutionContext);
+        }
+    }
+
+
     private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
         try {
             provider.initialize(jobExecutionContext);
@@ -358,7 +394,7 @@ public class BetterGfacImpl implements GFac {
 
     private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
         try {
-             provider.execute(jobExecutionContext);
+            provider.execute(jobExecutionContext);
         } catch (Exception e) {
             throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
         }
@@ -383,86 +419,203 @@ public class BetterGfacImpl implements GFac {
 
     private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                                           ,GfacExperimentState.INHANDLERSINVOKING));
-        for (GFacHandlerConfig handlerClassName : handlers) {
-            Class<? extends GFacHandler> handlerClass;
-            GFacHandler handler;
-            try {
-                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                handler = handlerClass.newInstance();
-                handler.initProperties(handlerClassName.getProperties());
-            } catch (ClassNotFoundException e) {
-                throw new GFacException("Cannot load handler class " + handlerClassName, e);
-            } catch (InstantiationException e) {
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            } catch (IllegalAccessException e) {
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            }
-            try {
-                handler.invoke(jobExecutionContext);
-            } catch (GFacHandlerException e) {
-                throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+        try {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                    , GfacExperimentState.INHANDLERSINVOKING));
+            for (GFacHandlerConfig handlerClassName : handlers) {
+                Class<? extends GFacHandler> handlerClass;
+                GFacHandler handler;
+                try {
+                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                    handler = handlerClass.newInstance();
+                    handler.initProperties(handlerClassName.getProperties());
+                } catch (ClassNotFoundException e) {
+                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                } catch (InstantiationException e) {
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                } catch (IllegalAccessException e) {
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                }
+                try {
+                    handler.invoke(jobExecutionContext);
+                } catch (GFacHandlerException e) {
+                    throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+                }
             }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                    , GfacExperimentState.INHANDLERSINVOKED));
+        } catch (Exception e) {
+            throw new GFacException("Error invoking ZK", e);
         }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
-                                           ,GfacExperimentState.INHANDLERSINVOKED));
     }
 
-    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-        List<GFacHandlerConfig> handlers = null;
-        if(gFacConfiguration != null){
-         handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-        }else {
-            try {
-                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
-            } catch (Exception e) {
-                log.error("Error constructing job execution context during outhandler invocation");
-                throw new GFacException(e);
+    private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+        try {
+            int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+            if (stateVal == 8 || stateVal == -1) {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                        , GfacExperimentState.INHANDLERSINVOKING));
+                for (GFacHandlerConfig handlerClassName : handlers) {
+                    Class<? extends GFacHandler> handlerClass;
+                    GFacHandler handler;
+                    try {
+                        handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                        handler = handlerClass.newInstance();
+                        handler.initProperties(handlerClassName.getProperties());
+                    } catch (ClassNotFoundException e) {
+                        throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                    } catch (InstantiationException e) {
+                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                    } catch (IllegalAccessException e) {
+                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                    }
+                    try {
+                        handler.invoke(jobExecutionContext);
+                    } catch (GFacHandlerException e) {
+                        throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+                    }
+                }
             }
-            schedule(jobExecutionContext);
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                    , GfacExperimentState.INHANDLERSINVOKED));
+        } catch (Exception e) {
+            throw new GFacException("Error invoking ZK", e);
         }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
-        for (GFacHandlerConfig handlerClassName : handlers) {
-            Class<? extends GFacHandler> handlerClass;
-            GFacHandler handler;
-            try {
-                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                handler = handlerClass.newInstance();
-                handler.initProperties(handlerClassName.getProperties());
-            } catch (ClassNotFoundException e) {
-                log.error(e.getMessage());
-                throw new GFacException("Cannot load handler class " + handlerClassName, e);
-            } catch (InstantiationException e) {
-                log.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-            } catch (IllegalAccessException e) {
-                log.error(e.getMessage());
-                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+    }
+
+    public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        int stateVal = -1;
+        try {
+            stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        if (stateVal >= 0 && stateVal < 6) {
+            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+            List<GFacHandlerConfig> handlers = null;
+            if (gFacConfiguration != null) {
+                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+            } else {
+                try {
+                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+                } catch (Exception e) {
+                    log.error("Error constructing job execution context during outhandler invocation");
+                    throw new GFacException(e);
+                }
+                launch(jobExecutionContext);
             }
-            try {
-                handler.invoke(jobExecutionContext);
-            } catch (Exception e) {
-                // TODO: Better error reporting.
-                throw new GFacException("Error Executing a OutFlow Handler", e);
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+            for (GFacHandlerConfig handlerClassName : handlers) {
+                Class<? extends GFacHandler> handlerClass;
+                GFacHandler handler;
+                try {
+                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                    handler = handlerClass.newInstance();
+                    handler.initProperties(handlerClassName.getProperties());
+                } catch (ClassNotFoundException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                } catch (InstantiationException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                } catch (IllegalAccessException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                }
+                try {
+                    handler.invoke(jobExecutionContext);
+                } catch (Exception e) {
+                    // TODO: Better error reporting.
+                    throw new GFacException("Error Executing a OutFlow Handler", e);
+                }
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
             }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+
+            // At this point all the execution is finished so we update the task and experiment statuses.
+            // Handler authors does not have to worry about updating experiment or task statuses.
+            monitorPublisher.publish(new
+                    ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                    ExperimentState.COMPLETED));
+            // Updating the task status if there's any task associated
+            monitorPublisher.publish(new TaskStatusChangeRequest(
+                    new TaskIdentity(jobExecutionContext.getExperimentID(),
+                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                            jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+            ));
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
         }
+    }
 
-        // At this point all the execution is finished so we update the task and experiment statuses.
-        // Handler authors does not have to worry about updating experiment or task statuses.
-        monitorPublisher.publish(new
-                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                ExperimentState.COMPLETED));
-        // Updating the task status if there's any task associated
-        monitorPublisher.publish(new TaskStatusChangeRequest(
-                new TaskIdentity(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-        ));
-
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        int stateVal = -1;
+        try {
+            stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        if (stateVal >= 0 && stateVal < 6) {
+            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+            List<GFacHandlerConfig> handlers = null;
+            if (gFacConfiguration != null) {
+                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+            } else {
+                try {
+                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+                } catch (Exception e) {
+                    log.error("Error constructing job execution context during outhandler invocation");
+                    throw new GFacException(e);
+                }
+                launch(jobExecutionContext);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+            for (GFacHandlerConfig handlerClassName : handlers) {
+                Class<? extends GFacHandler> handlerClass;
+                GFacHandler handler;
+                try {
+                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                    handler = handlerClass.newInstance();
+                    handler.initProperties(handlerClassName.getProperties());
+                } catch (ClassNotFoundException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                } catch (InstantiationException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                } catch (IllegalAccessException e) {
+                    log.error(e.getMessage());
+                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                }
+                try {
+                    handler.invoke(jobExecutionContext);
+                } catch (Exception e) {
+                    // TODO: Better error reporting.
+                    throw new GFacException("Error Executing a OutFlow Handler", e);
+                }
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+            }
+
+            // At this point all the execution is finished so we update the task and experiment statuses.
+            // Handler authors does not have to worry about updating experiment or task statuses.
+            monitorPublisher.publish(new
+                    ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                    ExperimentState.COMPLETED));
+            // Updating the task status if there's any task associated
+            monitorPublisher.publish(new TaskStatusChangeRequest(
+                    new TaskIdentity(jobExecutionContext.getExperimentID(),
+                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                            jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+            ));
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index a6908ba..5bc789c 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,12 +58,11 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.Registry;
-import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 24b3989..3933976 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -22,14 +22,11 @@ package org.apache.airavata.gfac.core.monitor;
 
 import com.google.common.eventbus.Subscribe;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.*;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +39,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
 
     private ZooKeeper zk;
 
-    private Integer mutex = -1;
+    private static Integer mutex = -1;
 
     @Subscribe
     public void updateZK(GfacExperimentStateChangeRequest statusChangeRequest) throws KeeperException, InterruptedException, ApplicationSettingsException {
@@ -73,16 +70,23 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
         } catch (IOException e) {
             e.printStackTrace();
         }
+        Stat state = zk.exists(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+        if(state == null) {
+            // state znode has to be created
+            zk.create(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                    String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }else {
+            zk.setData(experimentPath + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                    String.valueOf(statusChangeRequest.getState().getValue()).getBytes(), state.getVersion());
+        }
         switch (statusChangeRequest.getState()) {
             case COMPLETED:
-                zk.delete(experimentPath, exists.getVersion());
+//                ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             case FAILED:
-                zk.delete(experimentPath, exists.getVersion());
+                ZKUtil.deleteRecursive(zk,experimentPath);
                 break;
             default:
-                zk.setData(experimentPath, (statusChangeRequest.getMonitorID().getJobID() +
-                        "," + statusChangeRequest.getMonitorID().getWorkflowNodeID()).getBytes(), exists.getVersion());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
index 50a60de..5f7f2c2 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/state/GfacExperimentStateChangeRequest.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.core.monitor.state;
 
 import org.apache.airavata.gfac.core.monitor.JobIdentity;
 import org.apache.airavata.gfac.core.monitor.MonitorID;
-import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
 
 public class GfacExperimentStateChangeRequest {
     private GfacExperimentState state;

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 7013f3e..f67b592 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -32,6 +32,8 @@ import java.util.*;
 
 import org.apache.airavata.client.api.AiravataAPI;
 import org.apache.airavata.client.api.exception.AiravataAPIInvocationException;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.StringUtil;
 import org.apache.airavata.commons.gfac.type.ActualParameter;
 import org.apache.airavata.gfac.Constants;
@@ -48,6 +50,8 @@ import org.apache.airavata.registry.cpi.CompositeIdentifier;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.airavata.schemas.gfac.*;
 import org.apache.axiom.om.OMElement;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -658,5 +662,22 @@ public class GFacUtils {
         return stringObjectHashMap;
     }
 
+    public static GfacExperimentState getZKExperimentState(ZooKeeper zk,JobExecutionContext jobExecutionContext)
+            throws ApplicationSettingsException, KeeperException, InterruptedException {
+        String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getTaskData().getTaskID());
+        return GfacExperimentState.findByValue(Integer.parseInt(expState));
+    }
+
+    public static int getZKExperimentStateValue(ZooKeeper zk, JobExecutionContext jobExecutionContext)
+            throws ApplicationSettingsException, KeeperException, InterruptedException {
+        String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getTaskData().getTaskID());
+        if(expState == null){
+            return -1;
+        }
+        return Integer.parseInt(expState);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
new file mode 100644
index 0000000..db2cab0
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GfacExperimentState.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.1)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.airavata.gfac.core.utils;
+
+
+public enum GfacExperimentState {
+    LAUNCHED(0),
+    ACCEPTED(1),
+    INHANDLERSINVOKING(2),
+    INHANDLERSINVOKED(3),
+    PROVIDERINVOKING(4),
+    PROVIDERINVOKED(5),
+    OUTHANDLERSINVOKING(6),
+    OUTHANDLERSINVOKED(7),
+    COMPLETED(8),
+    FAILED(9),
+    UNKNOWN(10);
+
+    private final int value;
+
+    private GfacExperimentState(int value) {
+        this.value = value;
+    }
+
+    /**
+     * Get the integer value of this enum value, as defined in the Thrift IDL.
+     */
+    public int getValue() {
+        return value;
+    }
+
+    /**
+     * Find a the enum type by its integer value, as defined in the Thrift IDL.
+     *
+     * @return null if the value is not found.
+     */
+    public static GfacExperimentState findByValue(int value) {
+        switch (value) {
+            case 0:
+                return INHANDLERSINVOKING;
+            case 1:
+                return INHANDLERSINVOKED;
+            case 2:
+                return PROVIDERINVOKING;
+            case 3:
+                return PROVIDERINVOKED;
+            case 4:
+                return OUTHANDLERSINVOKING;
+            case 5:
+                return OUTHANDLERSINVOKED;
+            case 6:
+                return COMPLETED;
+            case 7:
+                return FAILED;
+            case 8:
+                return UNKNOWN;
+            default:
+                return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
deleted file mode 100644
index 8d06aad..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentState.java
+++ /dev/null
@@ -1,82 +0,0 @@
-    /*
-     * Licensed to the Apache Software Foundation (ASF) under one or more
-     * contributor license agreements.  See the NOTICE file distributed with
-     * this work for additional information regarding copyright ownership.
-     * The ASF licenses this file to You under the Apache License, Version 2.0
-     * (the "License"); you may not use this file except in compliance with
-     * the License.  You may obtain a copy of the License at
-     *
-     *     http://www.apache.org/licenses/LICENSE-2.0
-     *
-     * Unless required by applicable law or agreed to in writing, software
-     * distributed under the License is distributed on an "AS IS" BASIS,
-     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     * See the License for the specific language governing permissions and
-     * limitations under the License.
-     */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
-@SuppressWarnings("all") public enum GfacExperimentState implements org.apache.thrift.TEnum {
-  INHANDLERSINVOKING(0),
-  INHANDLERSINVOKED(1),
-  PROVIDERINVOKING(2),
-  PROVIDERINVOKED(3),
-  OUTHANDLERSINVOKING(4),
-  OUTHANDLERSINVOKED(5),
-  COMPLETED(6),
-  FAILED(7),
-  UNKNOWN(8);
-
-  private final int value;
-
-  private GfacExperimentState(int value) {
-    this.value = value;
-  }
-
-  /**
-   * Get the integer value of this enum value, as defined in the Thrift IDL.
-   */
-  public int getValue() {
-    return value;
-  }
-
-  /**
-   * Find a the enum type by its integer value, as defined in the Thrift IDL.
-   * @return null if the value is not found.
-   */
-  public static GfacExperimentState findByValue(int value) { 
-    switch (value) {
-      case 0:
-        return INHANDLERSINVOKING;
-      case 1:
-        return INHANDLERSINVOKED;
-      case 2:
-        return PROVIDERINVOKING;
-      case 3:
-        return PROVIDERINVOKED;
-      case 4:
-        return OUTHANDLERSINVOKING;
-      case 5:
-        return OUTHANDLERSINVOKED;
-      case 6:
-        return COMPLETED;
-      case 7:
-        return FAILED;
-      case 8:
-        return UNKNOWN;
-      default:
-        return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
deleted file mode 100644
index 3b9273d..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/GfacExperimentStatus.java
+++ /dev/null
@@ -1,516 +0,0 @@
-    /*
-     * Licensed to the Apache Software Foundation (ASF) under one or more
-     * contributor license agreements.  See the NOTICE file distributed with
-     * this work for additional information regarding copyright ownership.
-     * The ASF licenses this file to You under the Apache License, Version 2.0
-     * (the "License"); you may not use this file except in compliance with
-     * the License.  You may obtain a copy of the License at
-     *
-     *     http://www.apache.org/licenses/LICENSE-2.0
-     *
-     * Unless required by applicable law or agreed to in writing, software
-     * distributed under the License is distributed on an "AS IS" BASIS,
-     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     * See the License for the specific language governing permissions and
-     * limitations under the License.
-     */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class GfacExperimentStatus implements org.apache.thrift.TBase<GfacExperimentStatus, GfacExperimentStatus._Fields>, java.io.Serializable, Cloneable, Comparable<GfacExperimentStatus> {
-  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("GfacExperimentStatus");
-
-  private static final org.apache.thrift.protocol.TField GFAC_EXPERIMENT_STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("gfacExperimentState", org.apache.thrift.protocol.TType.I32, (short)1);
-  private static final org.apache.thrift.protocol.TField TIME_OF_STATE_CHANGE_FIELD_DESC = new org.apache.thrift.protocol.TField("timeOfStateChange", org.apache.thrift.protocol.TType.I64, (short)2);
-
-  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
-  static {
-    schemes.put(StandardScheme.class, new GfacExperimentStatusStandardSchemeFactory());
-    schemes.put(TupleScheme.class, new GfacExperimentStatusTupleSchemeFactory());
-  }
-
-  /**
-   * 
-   * @see GfacExperimentState
-   */
-  public GfacExperimentState gfacExperimentState; // required
-  public long timeOfStateChange; // optional
-
-  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-  @SuppressWarnings("all") public enum _Fields implements org.apache.thrift.TFieldIdEnum {
-    /**
-     * 
-     * @see GfacExperimentState
-     */
-    GFAC_EXPERIMENT_STATE((short)1, "gfacExperimentState"),
-    TIME_OF_STATE_CHANGE((short)2, "timeOfStateChange");
-
-    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
-    static {
-      for (_Fields field : EnumSet.allOf(_Fields.class)) {
-        byName.put(field.getFieldName(), field);
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, or null if its not found.
-     */
-    public static _Fields findByThriftId(int fieldId) {
-      switch(fieldId) {
-        case 1: // GFAC_EXPERIMENT_STATE
-          return GFAC_EXPERIMENT_STATE;
-        case 2: // TIME_OF_STATE_CHANGE
-          return TIME_OF_STATE_CHANGE;
-        default:
-          return null;
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, throwing an exception
-     * if it is not found.
-     */
-    public static _Fields findByThriftIdOrThrow(int fieldId) {
-      _Fields fields = findByThriftId(fieldId);
-      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-      return fields;
-    }
-
-    /**
-     * Find the _Fields constant that matches name, or null if its not found.
-     */
-    public static _Fields findByName(String name) {
-      return byName.get(name);
-    }
-
-    private final short _thriftId;
-    private final String _fieldName;
-
-    _Fields(short thriftId, String fieldName) {
-      _thriftId = thriftId;
-      _fieldName = fieldName;
-    }
-
-    public short getThriftFieldId() {
-      return _thriftId;
-    }
-
-    public String getFieldName() {
-      return _fieldName;
-    }
-  }
-
-  // isset id assignments
-  private static final int __TIMEOFSTATECHANGE_ISSET_ID = 0;
-  private byte __isset_bitfield = 0;
-  private _Fields optionals[] = {_Fields.TIME_OF_STATE_CHANGE};
-  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
-  static {
-    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
-    tmpMap.put(_Fields.GFAC_EXPERIMENT_STATE, new org.apache.thrift.meta_data.FieldMetaData("gfacExperimentState", org.apache.thrift.TFieldRequirementType.REQUIRED, 
-        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, GfacExperimentState.class)));
-    tmpMap.put(_Fields.TIME_OF_STATE_CHANGE, new org.apache.thrift.meta_data.FieldMetaData("timeOfStateChange", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
-    metaDataMap = Collections.unmodifiableMap(tmpMap);
-    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(GfacExperimentStatus.class, metaDataMap);
-  }
-
-  public GfacExperimentStatus() {
-  }
-
-  public GfacExperimentStatus(
-    GfacExperimentState gfacExperimentState)
-  {
-    this();
-    this.gfacExperimentState = gfacExperimentState;
-  }
-
-  /**
-   * Performs a deep copy on <i>other</i>.
-   */
-  public GfacExperimentStatus(GfacExperimentStatus other) {
-    __isset_bitfield = other.__isset_bitfield;
-    if (other.isSetGfacExperimentState()) {
-      this.gfacExperimentState = other.gfacExperimentState;
-    }
-    this.timeOfStateChange = other.timeOfStateChange;
-  }
-
-  public GfacExperimentStatus deepCopy() {
-    return new GfacExperimentStatus(this);
-  }
-
-  @Override
-  public void clear() {
-    this.gfacExperimentState = null;
-    setTimeOfStateChangeIsSet(false);
-    this.timeOfStateChange = 0;
-  }
-
-  /**
-   * 
-   * @see GfacExperimentState
-   */
-  public GfacExperimentState getGfacExperimentState() {
-    return this.gfacExperimentState;
-  }
-
-  /**
-   * 
-   * @see GfacExperimentState
-   */
-  public GfacExperimentStatus setGfacExperimentState(GfacExperimentState gfacExperimentState) {
-    this.gfacExperimentState = gfacExperimentState;
-    return this;
-  }
-
-  public void unsetGfacExperimentState() {
-    this.gfacExperimentState = null;
-  }
-
-  /** Returns true if field gfacExperimentState is set (has been assigned a value) and false otherwise */
-  public boolean isSetGfacExperimentState() {
-    return this.gfacExperimentState != null;
-  }
-
-  public void setGfacExperimentStateIsSet(boolean value) {
-    if (!value) {
-      this.gfacExperimentState = null;
-    }
-  }
-
-  public long getTimeOfStateChange() {
-    return this.timeOfStateChange;
-  }
-
-  public GfacExperimentStatus setTimeOfStateChange(long timeOfStateChange) {
-    this.timeOfStateChange = timeOfStateChange;
-    setTimeOfStateChangeIsSet(true);
-    return this;
-  }
-
-  public void unsetTimeOfStateChange() {
-    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
-  }
-
-  /** Returns true if field timeOfStateChange is set (has been assigned a value) and false otherwise */
-  public boolean isSetTimeOfStateChange() {
-    return EncodingUtils.testBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID);
-  }
-
-  public void setTimeOfStateChangeIsSet(boolean value) {
-    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIMEOFSTATECHANGE_ISSET_ID, value);
-  }
-
-  public void setFieldValue(_Fields field, Object value) {
-    switch (field) {
-    case GFAC_EXPERIMENT_STATE:
-      if (value == null) {
-        unsetGfacExperimentState();
-      } else {
-        setGfacExperimentState((GfacExperimentState)value);
-      }
-      break;
-
-    case TIME_OF_STATE_CHANGE:
-      if (value == null) {
-        unsetTimeOfStateChange();
-      } else {
-        setTimeOfStateChange((Long)value);
-      }
-      break;
-
-    }
-  }
-
-  public Object getFieldValue(_Fields field) {
-    switch (field) {
-    case GFAC_EXPERIMENT_STATE:
-      return getGfacExperimentState();
-
-    case TIME_OF_STATE_CHANGE:
-      return Long.valueOf(getTimeOfStateChange());
-
-    }
-    throw new IllegalStateException();
-  }
-
-  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-  public boolean isSet(_Fields field) {
-    if (field == null) {
-      throw new IllegalArgumentException();
-    }
-
-    switch (field) {
-    case GFAC_EXPERIMENT_STATE:
-      return isSetGfacExperimentState();
-    case TIME_OF_STATE_CHANGE:
-      return isSetTimeOfStateChange();
-    }
-    throw new IllegalStateException();
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that == null)
-      return false;
-    if (that instanceof GfacExperimentStatus)
-      return this.equals((GfacExperimentStatus)that);
-    return false;
-  }
-
-  public boolean equals(GfacExperimentStatus that) {
-    if (that == null)
-      return false;
-
-    boolean this_present_gfacExperimentState = true && this.isSetGfacExperimentState();
-    boolean that_present_gfacExperimentState = true && that.isSetGfacExperimentState();
-    if (this_present_gfacExperimentState || that_present_gfacExperimentState) {
-      if (!(this_present_gfacExperimentState && that_present_gfacExperimentState))
-        return false;
-      if (!this.gfacExperimentState.equals(that.gfacExperimentState))
-        return false;
-    }
-
-    boolean this_present_timeOfStateChange = true && this.isSetTimeOfStateChange();
-    boolean that_present_timeOfStateChange = true && that.isSetTimeOfStateChange();
-    if (this_present_timeOfStateChange || that_present_timeOfStateChange) {
-      if (!(this_present_timeOfStateChange && that_present_timeOfStateChange))
-        return false;
-      if (this.timeOfStateChange != that.timeOfStateChange)
-        return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return 0;
-  }
-
-  @Override
-  public int compareTo(GfacExperimentStatus other) {
-    if (!getClass().equals(other.getClass())) {
-      return getClass().getName().compareTo(other.getClass().getName());
-    }
-
-    int lastComparison = 0;
-
-    lastComparison = Boolean.valueOf(isSetGfacExperimentState()).compareTo(other.isSetGfacExperimentState());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetGfacExperimentState()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.gfacExperimentState, other.gfacExperimentState);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = Boolean.valueOf(isSetTimeOfStateChange()).compareTo(other.isSetTimeOfStateChange());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (isSetTimeOfStateChange()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.timeOfStateChange, other.timeOfStateChange);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    return 0;
-  }
-
-  public _Fields fieldForId(int fieldId) {
-    return _Fields.findByThriftId(fieldId);
-  }
-
-  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
-    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
-  }
-
-  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
-    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("GfacExperimentStatus(");
-    boolean first = true;
-
-    sb.append("gfacExperimentState:");
-    if (this.gfacExperimentState == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.gfacExperimentState);
-    }
-    first = false;
-    if (isSetTimeOfStateChange()) {
-      if (!first) sb.append(", ");
-      sb.append("timeOfStateChange:");
-      sb.append(this.timeOfStateChange);
-      first = false;
-    }
-    sb.append(")");
-    return sb.toString();
-  }
-
-  public void validate() throws org.apache.thrift.TException {
-    // check for required fields
-    if (gfacExperimentState == null) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'gfacExperimentState' was not present! Struct: " + toString());
-    }
-    // check for sub-struct validity
-  }
-
-  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-    try {
-      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
-    try {
-      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bitfield = 0;
-      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private static class GfacExperimentStatusStandardSchemeFactory implements SchemeFactory {
-    public GfacExperimentStatusStandardScheme getScheme() {
-      return new GfacExperimentStatusStandardScheme();
-    }
-  }
-
-  private static class GfacExperimentStatusStandardScheme extends StandardScheme<GfacExperimentStatus> {
-
-    public void read(org.apache.thrift.protocol.TProtocol iprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
-      org.apache.thrift.protocol.TField schemeField;
-      iprot.readStructBegin();
-      while (true)
-      {
-        schemeField = iprot.readFieldBegin();
-        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
-          break;
-        }
-        switch (schemeField.id) {
-          case 1: // GFAC_EXPERIMENT_STATE
-            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
-              struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
-              struct.setGfacExperimentStateIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          case 2: // TIME_OF_STATE_CHANGE
-            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
-              struct.timeOfStateChange = iprot.readI64();
-              struct.setTimeOfStateChangeIsSet(true);
-            } else { 
-              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-            }
-            break;
-          default:
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
-        }
-        iprot.readFieldEnd();
-      }
-      iprot.readStructEnd();
-
-      // check for required fields of primitive type, which can't be checked in the validate method
-      struct.validate();
-    }
-
-    public void write(org.apache.thrift.protocol.TProtocol oprot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
-      struct.validate();
-
-      oprot.writeStructBegin(STRUCT_DESC);
-      if (struct.gfacExperimentState != null) {
-        oprot.writeFieldBegin(GFAC_EXPERIMENT_STATE_FIELD_DESC);
-        oprot.writeI32(struct.gfacExperimentState.getValue());
-        oprot.writeFieldEnd();
-      }
-      if (struct.isSetTimeOfStateChange()) {
-        oprot.writeFieldBegin(TIME_OF_STATE_CHANGE_FIELD_DESC);
-        oprot.writeI64(struct.timeOfStateChange);
-        oprot.writeFieldEnd();
-      }
-      oprot.writeFieldStop();
-      oprot.writeStructEnd();
-    }
-
-  }
-
-  private static class GfacExperimentStatusTupleSchemeFactory implements SchemeFactory {
-    public GfacExperimentStatusTupleScheme getScheme() {
-      return new GfacExperimentStatusTupleScheme();
-    }
-  }
-
-  private static class GfacExperimentStatusTupleScheme extends TupleScheme<GfacExperimentStatus> {
-
-    @Override
-    public void write(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
-      TTupleProtocol oprot = (TTupleProtocol) prot;
-      oprot.writeI32(struct.gfacExperimentState.getValue());
-      BitSet optionals = new BitSet();
-      if (struct.isSetTimeOfStateChange()) {
-        optionals.set(0);
-      }
-      oprot.writeBitSet(optionals, 1);
-      if (struct.isSetTimeOfStateChange()) {
-        oprot.writeI64(struct.timeOfStateChange);
-      }
-    }
-
-    @Override
-    public void read(org.apache.thrift.protocol.TProtocol prot, GfacExperimentStatus struct) throws org.apache.thrift.TException {
-      TTupleProtocol iprot = (TTupleProtocol) prot;
-      struct.gfacExperimentState = GfacExperimentState.findByValue(iprot.readI32());
-      struct.setGfacExperimentStateIsSet(true);
-      BitSet incoming = iprot.readBitSet(1);
-      if (incoming.get(0)) {
-        struct.timeOfStateChange = iprot.readI64();
-        struct.setTimeOfStateChangeIsSet(true);
-      }
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
deleted file mode 100644
index 20099f2..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/workspace/experiment/gfacDataModelConstants.java
+++ /dev/null
@@ -1,59 +0,0 @@
-    /*
-     * Licensed to the Apache Software Foundation (ASF) under one or more
-     * contributor license agreements.  See the NOTICE file distributed with
-     * this work for additional information regarding copyright ownership.
-     * The ASF licenses this file to You under the Apache License, Version 2.0
-     * (the "License"); you may not use this file except in compliance with
-     * the License.  You may obtain a copy of the License at
-     *
-     *     http://www.apache.org/licenses/LICENSE-2.0
-     *
-     * Unless required by applicable law or agreed to in writing, software
-     * distributed under the License is distributed on an "AS IS" BASIS,
-     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-     * See the License for the specific language governing permissions and
-     * limitations under the License.
-     */
-/**
- * Autogenerated by Thrift Compiler (0.9.1)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- *  @generated
- */
-package org.apache.airavata.gfac.workspace.experiment;
-
-import org.apache.thrift.scheme.IScheme;
-import org.apache.thrift.scheme.SchemeFactory;
-import org.apache.thrift.scheme.StandardScheme;
-
-import org.apache.thrift.scheme.TupleScheme;
-import org.apache.thrift.protocol.TTupleProtocol;
-import org.apache.thrift.protocol.TProtocolException;
-import org.apache.thrift.EncodingUtils;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.server.AbstractNonblockingServer.*;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@SuppressWarnings("all") public class gfacDataModelConstants {
-
-  public static final String DEFAULT_ID = "DO_NOT_SET_AT_CLIENTS";
-
-  public static final String DEFAULT_PROJECT_NAME = "DEFAULT";
-
-  public static final String SINGLE_APP_NODE_NAME = "SINGLE_APP_NODE";
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 15e2c63..e3705ac 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.error.LaunchValidationException;
@@ -73,8 +74,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
     public OrchestratorServerHandler() {
         // registering with zk
         try {
-            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
-                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String zkhostPort = AiravataZKUtils.getZKhostPort();
             String airavataServerHostPort = ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_HOST)
                     + ":" + ServerSettings.getSetting(Constants.ORCHESTRATOR_SERVER_PORT);
             try {
@@ -124,6 +124,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface, Wat
 
 
 
+
     /**
      * * After creating the experiment Data user have the
      * * experimentID as the handler to the experiment, during the launchExperiment

http://git-wip-us.apache.org/repos/asf/airavata/blob/e86504aa/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
index 23089aa..4629b15 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACServiceJobSubmitter.java
@@ -21,9 +21,11 @@
 package org.apache.airavata.orchestrator.core.impl;
 
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.cpi.GfacService;
+import org.apache.airavata.gfac.core.utils.GfacExperimentState;
 import org.apache.airavata.orchestrator.core.context.OrchestratorContext;
 import org.apache.airavata.orchestrator.core.exception.OrchestratorException;
 import org.apache.airavata.orchestrator.core.gfac.GFACInstance;
@@ -74,6 +76,7 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
                     mutex.wait();
                 }
             }
+            AiravataZKUtils.
             String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             String experimentNode = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
             List<String> children = zk.getChildren(gfacServer, this);
@@ -92,6 +95,9 @@ public class GFACServiceJobSubmitter implements JobSubmitter,Watcher{
                 if (exists1 == null) {
                     zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                             CreateMode.PERSISTENT);
+
+                    zk.create(newExpNode + File.separator + "state", GfacExperimentState.LAUNCHED.toString().getBytes(),
+                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                 } else {
                     logger.info("ExperimentID: " + experimentID + " taskID: " + taskID + " is re-running due to gfac failure");
                 }


[4/5] git commit: Merge branch 'zk-work'

Posted by la...@apache.org.
Merge branch 'zk-work'


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/34816536
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/34816536
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/34816536

Branch: refs/heads/master
Commit: 34816536df1c384aca6bd4d39bc0f6c778c1daa0
Parents: f723d49 e86504a
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 14:04:11 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 14:04:11 2014 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +
 .../server/handler/AiravataServerHandler.java   |  94 ++-
 .../handler/ApplicationCatalogHandler.java      |   2 +-
 .../airavata/api/server/util/Constants.java     |   2 -
 .../client/samples/CreateLaunchExperiment.java  | 226 ++-----
 .../airavataAPI.thrift                          |   1 +
 modules/commons/utils/pom.xml                   |  12 +-
 .../airavata/common/utils/AiravataZKUtils.java  |  59 ++
 .../apache/airavata/common/utils/Constants.java |  15 +
 .../main/resources/airavata-server.properties   |  22 +-
 modules/distribution/server/pom.xml             |   6 +
 .../server/src/main/assembly/bin-assembly.xml   |   3 +-
 modules/gfac/airavata-gfac-service/pom.xml      |   4 +-
 .../apache/airavata/gfac/server/GfacServer.java |  11 +-
 .../airavata/gfac/server/GfacServerHandler.java | 102 ++-
 .../apache/airavata/gfac/util/Constants.java    |  26 -
 modules/gfac/gfac-core/pom.xml                  |   6 +
 .../gfac/core/context/JobExecutionContext.java  |   3 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 657 +++++++++++++++++++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |   7 +-
 .../core/monitor/GfacInternalStatusUpdator.java | 109 +++
 .../airavata/gfac/core/monitor/MonitorID.java   |   6 +-
 .../state/GfacExperimentStateChangeRequest.java |  71 ++
 .../gfac/core/provider/AbstractProvider.java    |   6 +-
 .../airavata/gfac/core/utils/GFacUtils.java     |  21 +
 .../gfac/core/utils/GfacExperimentState.java    |  81 +++
 .../generate-gfac-stubs.sh                      |   2 +
 .../gfacDataModel.thrift                        |  55 ++
 .../orchestrator/server/OrchestratorServer.java |   1 +
 .../server/OrchestratorServerHandler.java       | 142 +++-
 .../util/OrchestratorRecoveryHandler.java       | 107 +++
 modules/orchestrator/orchestrator-core/pom.xml  |   6 +
 .../core/context/OrchestratorContext.java       |  15 +
 .../core/gfac/GFacClientFactory.java            |   2 +-
 .../core/impl/GFACServiceJobSubmitter.java      |  80 ++-
 .../cpi/impl/AbstractOrchestrator.java          |  14 +
 .../cpi/impl/SimpleOrchestratorImpl.java        |   1 +
 37 files changed, 1737 insertions(+), 246 deletions(-)
----------------------------------------------------------------------



[5/5] git commit: adding new type of handler

Posted by la...@apache.org.
adding new type of handler


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/d56dd443
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/d56dd443
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/d56dd443

Branch: refs/heads/master
Commit: d56dd443e6fb4242d475e15264e08e9cab2edc59
Parents: 3481653
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 14:05:28 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 14:05:28 2014 -0400

----------------------------------------------------------------------
 .../handler/AbstractRecoverableHandler.java     | 65 ++++++++++++++++++++
 .../core/handler/GFacRecoverableHandler.java    | 44 +++++++++++++
 2 files changed, 109 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/d56dd443/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
new file mode 100644
index 0000000..0c6810b
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.airavata.registry.cpi.RegistryException;
+
+public abstract class AbstractRecoverableHandler implements GFacRecoverableHandler {
+    protected Registry registry = null;
+
+    protected MonitorPublisher publisher = null;
+
+    protected AbstractRecoverableHandler() {
+        publisher = GFacImpl.getMonitorPublisher();   // This will not be null because this will be initialize in GFacIml
+    }
+
+    public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+		registry = jobExecutionContext.getRegistry();
+        if(registry == null){
+            try {
+                registry = RegistryFactory.getDefaultRegistry();
+            } catch (RegistryException e) {
+                throw new GFacHandlerException("unable to create registry instance", e);
+            }
+        }
+	}
+
+    public MonitorPublisher getPublisher() {
+        return publisher;
+    }
+
+    public void setPublisher(MonitorPublisher publisher) {
+        this.publisher = publisher;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public void setRegistry(Registry registry) {
+        this.registry = registry;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/d56dd443/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
new file mode 100644
index 0000000..fef6b76
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.handler;
+
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+
+/**
+ * This handler type can be used to implement stateful Operation and
+ * we recommend to use the ZK client to store and retrieve the states
+ * of the handler implementation. Framework level we use
+ * ZK to decide handler ran successfully or not so each handler
+ * execution details can be found in following zk path
+ * /gfac-experiment/<gfac-node-name>/experimentId+taskId/full-qualified-handlername/state
+ * ex: /gfac-experiment/gfac-node0/echoExperiment_2c6c11b8-dea0-4ec8-9832-f3e69fe2e6bb+IDontNeedaNode_682faa66-6218-4897-9271-656bfb8b2bd1/org.apache.airavata.gfac.handlers.Test/state
+ */
+public interface GFacRecoverableHandler extends GFacHandler {
+
+    /**
+     * This method can be used to implement recovering part of the stateful handler
+     * If you do not want to recover an already ran handler you can simply implement
+     * GfacAbstract Handler or GFacHandler or leave this recover method empty.
+     * @param jobExecutionContext
+     */
+    public void recover(JobExecutionContext jobExecutionContext);
+
+}


[2/5] git commit: committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes

Posted by la...@apache.org.
committing the initial version of zk work with resubmitting all the failed jobs to the available gfac cluster nodes


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/362da4e8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/362da4e8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/362da4e8

Branch: refs/heads/master
Commit: 362da4e88c5ab8ddeba234b3ad7c5f829477c272
Parents: 2bcadf5
Author: lahiru <la...@apache.org>
Authored: Tue Jun 24 00:44:33 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Tue Jun 24 00:44:33 2014 -0400

----------------------------------------------------------------------
 airavata-api/airavata-api-server/pom.xml        |   6 +
 .../server/handler/AiravataServerHandler.java   |  94 +++-
 .../handler/ApplicationCatalogHandler.java      |   2 +-
 .../airavata/api/server/util/Constants.java     |   2 -
 .../client/samples/CreateLaunchExperiment.java  | 226 +++-----
 .../airavataAPI.thrift                          |   1 +
 .../airavata/client/tools/DocumentCreator.java  |   2 +-
 .../apache/airavata/common/utils/Constants.java |  15 +
 .../main/resources/airavata-server.properties   |  22 +-
 modules/distribution/server/pom.xml             |   6 +
 .../server/src/main/assembly/bin-assembly.xml   |   3 +-
 modules/gfac/airavata-gfac-service/pom.xml      |   4 +-
 .../apache/airavata/gfac/server/GfacServer.java |  11 +-
 .../airavata/gfac/server/GfacServerHandler.java |  93 +++-
 .../apache/airavata/gfac/util/Constants.java    |  26 -
 modules/gfac/gfac-core/pom.xml                  |   6 +
 .../gfac/core/context/JobExecutionContext.java  |   3 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 504 ++++++++++++++++++
 .../apache/airavata/gfac/core/cpi/GFacImpl.java |   6 +
 .../core/monitor/GfacInternalStatusUpdator.java | 105 ++++
 .../airavata/gfac/core/monitor/MonitorID.java   |   6 +-
 .../state/GfacExperimentStateChangeRequest.java |  71 +++
 .../gfac/core/provider/AbstractProvider.java    |   6 +-
 .../experiment/GfacExperimentState.java         |  82 +++
 .../experiment/GfacExperimentStatus.java        | 516 +++++++++++++++++++
 .../experiment/gfacDataModelConstants.java      |  59 +++
 .../generate-gfac-stubs.sh                      |   2 +
 .../gfacDataModel.thrift                        |  55 ++
 .../orchestrator/server/OrchestratorServer.java |   1 +
 .../server/OrchestratorServerHandler.java       | 141 ++++-
 .../util/OrchestratorRecoveryHandler.java       | 107 ++++
 modules/orchestrator/orchestrator-core/pom.xml  |   6 +
 .../core/context/OrchestratorContext.java       |  15 +
 .../core/gfac/GFacClientFactory.java            |   2 +-
 .../core/impl/GFACServiceJobSubmitter.java      |  74 ++-
 .../cpi/impl/AbstractOrchestrator.java          |  14 +
 .../cpi/impl/SimpleOrchestratorImpl.java        |   1 +
 37 files changed, 2052 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/pom.xml
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/pom.xml b/airavata-api/airavata-api-server/pom.xml
index ff21028..1a89fcc 100644
--- a/airavata-api/airavata-api-server/pom.xml
+++ b/airavata-api/airavata-api-server/pom.xml
@@ -66,6 +66,12 @@
             <artifactId>slf4j-log4j12</artifactId>
             <version>${org.slf4j.version}</version>
         </dependency>
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
         
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
index be35568..a8f4297 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/AiravataServerHandler.java
@@ -23,6 +23,7 @@ package org.apache.airavata.api.server.handler;
 
 import org.apache.airavata.api.Airavata;
 import org.apache.airavata.api.airavataAPIConstants;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.model.error.*;
 import org.apache.airavata.model.workspace.Project;
@@ -35,20 +36,91 @@ import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.cpi.*;
 import org.apache.airavata.registry.cpi.utils.Constants;
 import org.apache.thrift.TException;
+import org.apache.tools.ant.types.selectors.FileSelector;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
 
-public class AiravataServerHandler implements Airavata.Iface {
-
-    private Registry registry;
-	private OrchestratorService.Client orchestratorClient;
+public class AiravataServerHandler implements Airavata.Iface, Watcher {
     private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
-	
+    private Registry registry;
+    private OrchestratorService.Client orchestratorClient;
+
+    private ZooKeeper zk;
+    private static Integer mutex = -1;
+
+
+    public AiravataServerHandler() {
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_HOST)
+                                + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.API_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String apiServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_API_SERVER_NODE,"/airavata-server");
+                String OrchServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE,"/orchestrator-server");
+                String gfacServer = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+
+                synchronized (mutex) {
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(apiServer, false);
+                if (zkStat == null) {
+                    zk.create(apiServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instantNode = apiServer + File.separator + String.valueOf(new Random().nextInt(Integer.MAX_VALUE));
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                    logger.info("Successfully created airavata-server node");
+                }
+
+                zkStat = zk.exists(OrchServer, false);
+                if (zkStat == null) {
+                    zk.create(OrchServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created orchestrator-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                    logger.info("Successfully created gfac-server node");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
+    }
+
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            mutex.notify();
+        }
+    }
 
     /**
      * Query Airavata to fetch the API version
@@ -905,8 +977,8 @@ public class AiravataServerHandler implements Airavata.Iface {
     }
 
 	private OrchestratorService.Client getOrchestratorClient() {
-		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
-        final String serverHost = ServerSettings.getSetting(org.apache.airavata.api.server.util.Constants.ORCHESTRATOR_SERVER_HOST, null);
+		final int serverPort = Integer.parseInt(ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_PORT,"8940"));
+        final String serverHost = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ORCHESTRATOR_SERVER_HOST, null);
         return orchestratorClient = OrchestratorClientFactory.createOrchestratorClient(serverHost, serverPort);
 	}
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
index b3ce8fb..efec768 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/handler/ApplicationCatalogHandler.java
@@ -64,7 +64,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ApplicationCatalogHandler implements Iface {
-    private static final Logger logger = LoggerFactory.getLogger(AiravataServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(ApplicationCatalogHandler.class);
 
 	AiravataRegistry2 registry;
 	private AiravataRegistry2 getRegistry() throws RegException, AiravataConfigurationException{

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
index eb6a119..92eac88 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/util/Constants.java
@@ -27,6 +27,4 @@ public class Constants {
     public static final String APP_CATALOG_SERVER_PORT = "app.catalog.server.port";
     public static final String APP_CATALOG_SERVER_HOST = "app.catalog.server.host";
     public static final String API_SERVER_MIN_THREADS = "apiserver.server.min.threads";
-    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
-    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index c8c9235..170fb99 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,130 +61,26 @@ public class CreateLaunchExperiment {
             AiravataUtils.setExecutionAsClient();
             final Airavata.Client airavata = AiravataClientFactory.createAiravataClient(THRIFT_SERVER_HOST, THRIFT_SERVER_PORT);
             System.out.println("API version is " + airavata.getAPIVersion());
-            addDescriptors();
+//            addDescriptors();
 
 //            final String expId = createExperimentForSSHHost(airavata);
 //            final String expId = createExperimentForTrestles(airavata);
 //            final String expId = createExperimentForStampede(airavata);
-            final String expId = createExperimentForLocalHost(airavata);
+            for (int i = 0; i < 1; i++) {
+                final String expId = createExperimentForLocalHost(airavata);
 //            final String expId = createExperimentForLonestar(airavata);
 //            final String expId = createExperimentWRFTrestles(airavata);
-            System.out.println("Experiment ID : " + expId);
+                System.out.println("Experiment ID : " + expId);
 //            updateExperiment(airavata, expId);
-            launchExperiment(airavata, expId);
-            System.out.println("Launched successfully");
-            List<Experiment> experiments = getExperimentsForUser(airavata, "admin");
-            List<ExperimentSummary> searchedExps1 = searchExperimentsByName(airavata, "admin", "echo");
-            List<ExperimentSummary> searchedExps2 = searchExperimentsByDesc(airavata, "admin", "Echo");
-            List<ExperimentSummary> searchedExps3 = searchExperimentsByApplication(airavata, "admin", "cho");
-            List<Project> projects = getAllUserProject(airavata, "admin");
-            List<Project> searchProjects1 = searchProjectsByProjectName(airavata, "admin", "project");
-            List<Project> searchProjects2 = searchProjectsByProjectDesc(airavata, "admin", "test");
-            for (Experiment exp : experiments){
-                System.out.println(" exp id : " + exp.getExperimentID());
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps1){
-                System.out.println("search results by experiment name");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                System.out.println("experiment Description : " + exp.getDescription()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps2){
-                System.out.println("search results by experiment desc");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
-            }
-
-            for (ExperimentSummary exp : searchedExps3){
-                System.out.println("search results by application");
-                System.out.println("experiment ID : " + exp.getExperimentID()) ;
-                if (exp.getExperimentStatus() != null) {
-                    System.out.println(" exp status : " + exp.getExperimentStatus().getExperimentState().toString());
-                }
+                launchExperiment(airavata, expId);
             }
-
-            for (Project pr : searchProjects1){
-                System.out.println(" project id : " + pr.getProjectID());
-            }
-
-            for (Project pr : searchProjects2){
-                System.out.println(" project id : " + pr.getProjectID());
-                System.out.println(" project desc : " + pr.getDescription());
-            }
-
-            Thread monitor = (new Thread(){
-                 public void run() {
-                     Map<String, JobStatus> jobStatuses = null;
-                     while (true) {
-                         try {
-                             jobStatuses = airavata.getJobStatuses(expId);
-                             Set<String> strings = jobStatuses.keySet();
-                             for (String key : strings) {
-                                 JobStatus jobStatus = jobStatuses.get(key);
-                                 if(jobStatus == null){
-                                     return;
-                                 }else {
-                                     if (JobState.COMPLETE.equals(jobStatus.getJobState())) {
-                                         System.out.println("Job completed Job ID: " + key);
-                                         return;
-                                     }else{
-                                        System.out.println("Job ID:" + key + jobStatuses.get(key).getJobState().toString());
-                                     }
-                                 }
-                             }
-                             ExperimentStatus experimentStatus = airavata.getExperimentStatus(expId);
-                             if(experimentStatus.getExperimentState().equals(ExperimentState.FAILED)){
-                            	 return;
-                             }
-							System.out.println(experimentStatus);
-                             Thread.sleep(5000);
-                         } catch (Exception e) {
-                             e.printStackTrace();
-                         }
-                     }
-
-                 }
-            });
-            monitor.start();
-            try {
-                monitor.join();
-            } catch (InterruptedException e) {
-                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-            }
-			try {
-				Thread.sleep(5000);
-			} catch (InterruptedException e) {
-				e.printStackTrace(); // To change body of catch statement use
-										// File | Settings | File Templates.
-			}
-
-//            System.out.println(airavata.getExperimentStatus(expId));
-            List<DataObjectType> output = airavata.getExperimentOutputs(expId);
-            for (DataObjectType dataObjectType : output) {
-                System.out.println(dataObjectType.getKey() + " : " + dataObjectType.getType() + " : " + dataObjectType.getValue());
-                
-				
-			}
-            String clonedExpId = cloneExperiment(airavata, expId);
-            System.out.println("Cloned Experiment ID : " + clonedExpId);
-//            System.out.println("retrieved exp id : " + experiment.getExperimentID());
         } catch (Exception e) {
             logger.error("Error while connecting with server", e.getMessage());
             e.printStackTrace();
         }
     }
 
-    public static void addDescriptors() throws AiravataAPIInvocationException,ApplicationSettingsException  {
+    public static void addDescriptors() throws AiravataAPIInvocationException, ApplicationSettingsException {
         try {
             DocumentCreator documentCreator = new DocumentCreator(getAiravataAPI());
             documentCreator.createLocalHostDocs();
@@ -219,8 +115,8 @@ public class CreateLaunchExperiment {
         return airavataAPI;
     }
 
-    public static String createExperimentForTrestles(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -255,50 +151,50 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentWRFTrestles(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("WRF_Namelist");
             input.setType(DataType.URI);
             input.setValue("/Users/raminder/Downloads/wrf_sample_inputs/namelist.input");
-            
+
             DataObjectType input1 = new DataObjectType();
             input1.setKey("WRF_Input_File");
             input1.setType(DataType.URI);
             input1.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfinput_d01");
-            
+
             DataObjectType input2 = new DataObjectType();
             input2.setKey("WRF_Boundary_File");
             input2.setType(DataType.URI);
             input2.setValue("/Users/raminder/Downloads/wrf_sample_inputs/wrfbdy_d01");
-            
+
             exInputs.add(input);
             exInputs.add(input1);
             exInputs.add(input2);
 
-           
+
             List<DataObjectType> exOut = new ArrayList<DataObjectType>();
             DataObjectType output = new DataObjectType();
             output.setKey("WRF_Output");
             output.setType(DataType.URI);
             output.setValue("");
-            
+
             DataObjectType output1 = new DataObjectType();
             output1.setKey("WRF_Execution_Log");
             output1.setType(DataType.URI);
             output1.setValue("");
-            
-            
+
+
             exOut.add(output);
             exOut.add(output1);
-           
+
 
             Experiment simpleExperiment =
                     ExperimentModelUtil.createSimpleExperiment("default", "admin", "WRFExperiment", "Testing", "WRF", exInputs);
@@ -320,35 +216,35 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static String cloneExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static String cloneExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             return client.cloneExperiment(expId, "cloneExperiment1");
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static void updateExperiment(Airavata.Client client, String expId) throws TException  {
-        try{
+    public static void updateExperiment(Airavata.Client client, String expId) throws TException {
+        try {
             Experiment experiment = client.getExperiment(expId);
             experiment.setDescription("updatedDescription");
-            client.updateExperiment(expId, experiment );
-        }catch (TException e) {
+            client.updateExperiment(expId, experiment);
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
 
-    public static String createExperimentForLocalHost(Airavata.Client client) throws TException  {
-        try{
+    public static String createExperimentForLocalHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -386,14 +282,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    
-    public static String createExperimentForSSHHost(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForSSHHost(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -432,13 +328,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-    public static String createExperimentForStampede(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForStampede(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -477,13 +374,14 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       public static String createExperimentForLonestar(Airavata.Client client) throws TException  {
-        try{
+
+    public static String createExperimentForLonestar(Airavata.Client client) throws TException {
+        try {
             List<DataObjectType> exInputs = new ArrayList<DataObjectType>();
             DataObjectType input = new DataObjectType();
             input.setKey("echo_input");
@@ -531,15 +429,15 @@ public class CreateLaunchExperiment {
                 }
             }
             throw e;
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while creating the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
-       
-       
-    public static void launchExperiment (Airavata.Client client, String expId)
-            throws TException{
+
+
+    public static void launchExperiment(Airavata.Client client, String expId)
+            throws TException {
         try {
             client.launchExperiment(expId, "testToken");
         } catch (ExperimentNotFoundException e) {
@@ -554,13 +452,13 @@ public class CreateLaunchExperiment {
         } catch (AiravataClientException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new AiravataClientException(e);
-        }catch (TException e) {
+        } catch (TException e) {
             logger.error("Error occured while launching the experiment...", e.getMessage());
             throw new TException(e);
         }
     }
 
-    public static List<Experiment> getExperimentsForUser (Airavata.Client client, String user){
+    public static List<Experiment> getExperimentsForUser(Airavata.Client client, String user) {
         try {
             return client.getAllUserExperiments(user);
         } catch (AiravataSystemException e) {
@@ -569,13 +467,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> getAllUserProject (Airavata.Client client, String user){
+    public static List<Project> getAllUserProject(Airavata.Client client, String user) {
         try {
             return client.getAllUserProjects(user);
         } catch (AiravataSystemException e) {
@@ -584,13 +482,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectName (Airavata.Client client, String user, String projectName){
+    public static List<Project> searchProjectsByProjectName(Airavata.Client client, String user, String projectName) {
         try {
             return client.searchProjectsByProjectName(user, projectName);
         } catch (AiravataSystemException e) {
@@ -599,13 +497,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<Project> searchProjectsByProjectDesc (Airavata.Client client, String user, String desc){
+    public static List<Project> searchProjectsByProjectDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchProjectsByProjectDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -614,14 +512,14 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
 
-    public static List<ExperimentSummary> searchExperimentsByName (Airavata.Client client, String user, String expName){
+    public static List<ExperimentSummary> searchExperimentsByName(Airavata.Client client, String user, String expName) {
         try {
             return client.searchExperimentsByName(user, expName);
         } catch (AiravataSystemException e) {
@@ -630,13 +528,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc){
+    public static List<ExperimentSummary> searchExperimentsByDesc(Airavata.Client client, String user, String desc) {
         try {
             return client.searchExperimentsByDesc(user, desc);
         } catch (AiravataSystemException e) {
@@ -645,13 +543,13 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;
     }
 
-    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app){
+    public static List<ExperimentSummary> searchExperimentsByApplication(Airavata.Client client, String user, String app) {
         try {
             return client.searchExperimentsByApplication(user, app);
         } catch (AiravataSystemException e) {
@@ -660,7 +558,7 @@ public class CreateLaunchExperiment {
             e.printStackTrace();
         } catch (AiravataClientException e) {
             e.printStackTrace();
-        }catch (TException e){
+        } catch (TException e) {
             e.printStackTrace();
         }
         return null;

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
----------------------------------------------------------------------
diff --git a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
index 7b2ec60..9c9ec7f 100644
--- a/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
+++ b/airavata-api/thrift-interface-descriptions/airavataAPI.thrift
@@ -29,6 +29,7 @@ include "airavataDataModel.thrift"
 include "experimentModel.thrift"
 include "workspaceModel.thrift"
 include "applicationCatalogAPI.thrift"
+include "gfacDataMode.thrift"
 
 namespace java org.apache.airavata.api
 namespace php Airavata.API

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
----------------------------------------------------------------------
diff --git a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
index d573da9..ffcff17 100644
--- a/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
+++ b/modules/airavata-client/src/main/java/org/apache/airavata/client/tools/DocumentCreator.java
@@ -671,7 +671,7 @@ public class DocumentCreator {
 		ApplicationDescription applicationDeploymentDescription = new ApplicationDescription();
 		ApplicationDeploymentDescriptionType applicationDeploymentDescriptionType = applicationDeploymentDescription.getType();
 		applicationDeploymentDescriptionType.addNewApplicationName().setStringValue(serviceName);
-		applicationDeploymentDescriptionType.setExecutableLocation("/bin/echo");
+		applicationDeploymentDescriptionType.setExecutableLocation("/tmp/echo.sh");
 		applicationDeploymentDescriptionType.setScratchWorkingDirectory("/tmp");
 
 		try {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
----------------------------------------------------------------------
diff --git a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
index bac5913..b8f999a 100644
--- a/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
+++ b/modules/commons/utils/src/main/java/org/apache/airavata/common/utils/Constants.java
@@ -31,4 +31,19 @@ public final class Constants {
     public static final String GFAC_CONFIG_XML = "gfac-config.xml";
     public static final String PUSH = "push";
     public static final String PULL = "pull";
+    public static final String API_SERVER_PORT = "apiserver.server.port";
+    public static final String API_SERVER_HOST = "apiserver.server.host";
+    public static final String ORCHESTRATOR_SERVER_HOST = "orchestrator.server.host";
+    public static final String ORCHESTRATOR_SERVER_PORT = "orchestrator.server.port";
+    public static final String GFAC_SERVER_HOST = "gfac.server.host";
+    public static final String GFAC_SERVER_PORT = "gfac.server.port";
+    public static final String ZOOKEEPER_SERVER_HOST = "zookeeper.server.host";
+    public static final String ZOOKEEPER_SERVER_PORT = "zookeeper.server.port";
+    public static final String ZOOKEEPER_API_SERVER_NODE = "airavata-server";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NODE = "orchestrator-server";
+    public static final String ZOOKEEPER_GFAC_SERVER_NODE = "gfac-server";
+    public static final String ZOOKEEPER_GFAC_EXPERIMENT_NODE = "gfac-experiments";
+    public static final String ZOOKEEPER_GFAC_SERVER_NAME = "gfac-server-name";
+    public static final String ZOOKEEPER_ORCHESTRATOR_SERVER_NAME = "orchestrator-server-name";
+    public static final String ZOOKEEPER_API_SERVER_NAME = "api-server-name";
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index b99c6cb..625f7f2 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -276,11 +276,11 @@ monitors=org.apache.airavata.gfac.monitor.impl.pull.qstat.QstatMonitor,org.apach
 amqp.hosts=info1.dyn.teragrid.org,info2.dyn.teragrid.org
 proxy.file.path=/Users/lahirugunathilake/Downloads/x509up_u503876
 connection.name=xsede
-activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator
+activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
 job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 submitter.interval=10000
 threadpool.size=10
@@ -297,7 +297,7 @@ appcatalogserver=org.apache.airavata.api.server.ApplicationCatalogServer
 
 
 ###---------------------------Airavata Server Configurations---------------------------###
-servers=apiserver,appcatalogserver,orchestrator
+servers=apiserver,appcatalogserver,orchestrator,gfac
 #shutdown.trategy=NONE
 shutdown.trategy=SELF_TERMINATE
 # credential store specific parameters
@@ -323,4 +323,18 @@ app.catalog.server.host=localhost
 app.catalog.server.port=8931
 orchestrator.server.host=localhost
 orchestrator.server.port=8940
+gfac.server.host=localhost
+gfac.server.port=8950
 orchestrator.server.min.threads=30
+
+##----------------------------- Zookeeper Server Configurations ----------------------###
+
+zookeeper.server.host=localhost
+zookeeper.server.port=2181
+airavata-server=/api-server
+orchestrator-server=/orchestrator-server
+gfac-server=/gfac-server
+gfac-experiments=/gfac-experiments
+gfac-server-name=gfac-node0
+orchestrator-server-name=orch-node0
+airavata-server-name=api-node0

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/pom.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/pom.xml b/modules/distribution/server/pom.xml
index 108bd32..4d51a13 100644
--- a/modules/distribution/server/pom.xml
+++ b/modules/distribution/server/pom.xml
@@ -562,7 +562,13 @@
             <artifactId>jackson-annotations</artifactId>
             <version>2.0.0</version>
         </dependency>
+     <!-- zookeeper dependencies -->
 
+        <dependency>
+               	<groupId>org.apache.zookeeper</groupId>
+               	<artifactId>zookeeper</artifactId>
+               	<version>3.4.0</version>
+               </dependency>
 	<dependency>
             <groupId>commons-cli</groupId>
             <artifactId>commons-cli</artifactId>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/distribution/server/src/main/assembly/bin-assembly.xml
----------------------------------------------------------------------
diff --git a/modules/distribution/server/src/main/assembly/bin-assembly.xml b/modules/distribution/server/src/main/assembly/bin-assembly.xml
index 0c3e43a..b6c5aa7 100644
--- a/modules/distribution/server/src/main/assembly/bin-assembly.xml
+++ b/modules/distribution/server/src/main/assembly/bin-assembly.xml
@@ -196,7 +196,7 @@
                 <include>org.apache.airavata:airavata-data-models:jar</include>
                 <include>org.apache.airavata:airavata-credential-store:jar</include>
                 <include>org.apache.airavata:airavata-gfac-core:jar</include>
-                <include>org.apache.airavata:airavata-gfac-server:jar</include>
+                <include>org.apache.airavata:airavata-gfac-service:jar</include>
                 <include>org.apache.airavata:airavata-gfac-ssh:jar</include>
                 <include>org.apache.airavata:airavata-gfac-local:jar</include>
                 <include>org.apache.airavata:airavata-gfac-gsissh:jar</include>
@@ -243,6 +243,7 @@
                 <include>com.fasterxml.jackson.core:jackson-databind</include>
                 <include>com.fasterxml.jackson.core:jackson-core</include>
                 <include>com.fasterxml.jackson.core:jackson-annotations</include>
+                <include>org.apache.zookeeper:zookeeper</include>
                 <!-- unicore start
                     <include>eu.unicore:ogsabes-client</include>
                     <include>eu.unicore:ogsabes-types</include>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/pom.xml b/modules/gfac/airavata-gfac-service/pom.xml
index e57eccc..d02a658 100644
--- a/modules/gfac/airavata-gfac-service/pom.xml
+++ b/modules/gfac/airavata-gfac-service/pom.xml
@@ -40,12 +40,12 @@
             <artifactId>airavata-gfac-core</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-model-utils</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.airavata</groupId>
             <artifactId>airavata-server-configuration</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
index bf6f933..f96e40f 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServer.java
@@ -20,10 +20,10 @@
 */
 package org.apache.airavata.gfac.server;
 
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.IServer;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.cpi.GfacService;
-import org.apache.airavata.gfac.util.Constants;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TServerSocket;
@@ -32,6 +32,8 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.InetSocketAddress;
+
 public class GfacServer implements IServer{
 
     private final static Logger logger = LoggerFactory.getLogger(GfacServer.class);
@@ -50,7 +52,12 @@ public class GfacServer implements IServer{
             throws Exception {
         try {
             final int serverPort = Integer.parseInt(ServerSettings.getSetting(Constants.GFAC_SERVER_PORT, "8950"));
-			TServerTransport serverTransport = new TServerSocket(serverPort);
+            final String serverHost = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST, null);
+
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverHost, serverPort);
+
+			TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
+
             server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(gfacServerHandlerProcessor));
 
             new Thread() {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 1b8d1e8..27733f9 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -20,12 +20,16 @@
 */
 package org.apache.airavata.gfac.server;
 
+import com.google.common.eventbus.EventBus;
 import org.apache.airavata.common.exception.AiravataConfigurationException;
 import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.Constants;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFac;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.gfac.cpi.gfac_cpi_serviceConstants;
 import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
@@ -35,28 +39,111 @@ import org.apache.airavata.registry.api.Gateway;
 import org.apache.airavata.registry.api.exception.RegException;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.Random;
 
-public class GfacServerHandler implements GfacService.Iface {
+
+public class GfacServerHandler implements GfacService.Iface, Watcher{
     private final static Logger logger = LoggerFactory.getLogger(GfacServerHandler.class);
 
     private Registry registry;
 
     private String registryURL;
+
     private String gatewayName;
+
     private String airavataUserName;
 
+    private ZooKeeper zk;
+
+    private boolean connected = false;
+
+    private static Integer mutex = new Integer(-1);
+
+    private MonitorPublisher publisher;
+
+
     public GfacServerHandler() {
+        // registering with zk
+        try {
+            String zkhostPort = ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_SERVER_PORT);
+            String airavataServerHostPort = ServerSettings.getSetting(Constants.GFAC_SERVER_HOST)
+                    + ":" + ServerSettings.getSetting(Constants.GFAC_SERVER_PORT);
+            try {
+                zk = new ZooKeeper(zkhostPort, 6000, this);   // no watcher is required, this will only use to store some data
+                String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE,"/gfac-server");
+                String gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE,"/gfac-experiments");
+                synchronized(mutex){
+                    mutex.wait();  // waiting for the syncConnected event
+                }
+                Stat zkStat = zk.exists(gfacServer, false);
+                if (zkStat == null) {
+                    zk.create(gfacServer, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                String instanceId = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NAME);
+                String instantNode = gfacServer + File.separator + instanceId;
+                zkStat = zk.exists(instantNode, false);
+                if (zkStat == null) {
+                    zk.create(instantNode,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.EPHEMERAL);      // other component will watch these childeren creation deletion to monitor the status of the node
+                }
+                zkStat = zk.exists(gfacExperiments, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }
+                zkStat = zk.exists(gfacExperiments + File.separator + instanceId, false);
+                if (zkStat == null) {
+                    zk.create(gfacExperiments + File.separator + instanceId,
+                            airavataServerHostPort.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT);
+                }else{
+                    logger.error(" Zookeeper is inconsistent state  !!!!!");
+                }
+                logger.info("Finished starting ZK: " + zk);
+            } catch (IOException e) {
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            } catch (KeeperException e) {
+                e.printStackTrace();
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        }
         try {
+            publisher = new MonitorPublisher(new EventBus());
             registry = RegistryFactory.getDefaultRegistry();
             setGatewayProperties();
+            BetterGfacImpl.startDaemonHandlers();
+            BetterGfacImpl.startStatusUpdators(registry,zk,publisher);
         }catch (Exception e){
            logger.error("Error initialising GFAC",e);
         }
     }
 
+    synchronized public void process(WatchedEvent watchedEvent) {
+        synchronized (mutex) {
+            Event.KeeperState state = watchedEvent.getState();
+            if (state == Event.KeeperState.SyncConnected) {
+                mutex.notify();
+                connected = true;
+            }
+        }
+    }
+
     public String getGFACServiceVersion() throws TException {
         return gfac_cpi_serviceConstants.GFAC_CPI_VERSION;
     }
@@ -114,9 +201,9 @@ public class GfacServerHandler implements GfacService.Iface {
 
     private GFac getGfac()throws TException{
         try {
-            return new GFacImpl(registry, null,
+            return new BetterGfacImpl(registry, null,
                                 AiravataRegistryFactory.getRegistry(new Gateway(getGatewayName()),
-                                        new AiravataUser(getAiravataUserName())));
+                                        new AiravataUser(getAiravataUserName())),zk,publisher);
         } catch (RegException e) {
             throw new TException("Error initializing gfac instance",e);
         } catch (AiravataConfigurationException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
deleted file mode 100644
index 3e48898..0000000
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/util/Constants.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.util;
-
-public class Constants {
-    public static final String GFAC_SERVER_PORT = "gfac.server.port";
-
-}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/pom.xml b/modules/gfac/gfac-core/pom.xml
index da86055..19d5f09 100644
--- a/modules/gfac/gfac-core/pom.xml
+++ b/modules/gfac/gfac-core/pom.xml
@@ -120,6 +120,12 @@
             <version>${xmlbeans.version}</version>
         </dependency>
         <!-- this is the dependency for amqp implementation -->
+        <!-- zookeeper dependencies -->
+        <dependency>
+        	<groupId>org.apache.zookeeper</groupId>
+        	<artifactId>zookeeper</artifactId>
+        	<version>3.4.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
index 86f4055..170c2c8 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/context/JobExecutionContext.java
@@ -21,6 +21,7 @@
 
 package org.apache.airavata.gfac.core.context;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,7 +38,7 @@ import org.apache.airavata.model.workspace.experiment.TaskDetails;
 import org.apache.airavata.model.workspace.experiment.WorkflowNodeDetails;
 import org.apache.airavata.registry.cpi.Registry;
 
-public class JobExecutionContext extends AbstractContext{
+public class JobExecutionContext extends AbstractContext implements Serializable{
 
     private GFacConfiguration gfacConfiguration;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
new file mode 100644
index 0000000..195bfc1
--- /dev/null
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -0,0 +1,504 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+*/
+package org.apache.airavata.gfac.core.cpi;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.eventbus.EventBus;
+
+import org.apache.airavata.client.api.AiravataAPI;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
+import org.apache.airavata.common.utils.ServerSettings;
+import org.apache.airavata.commons.gfac.type.ApplicationDescription;
+import org.apache.airavata.commons.gfac.type.HostDescription;
+import org.apache.airavata.commons.gfac.type.ServiceDescription;
+import org.apache.airavata.gfac.Constants;
+import org.apache.airavata.gfac.GFacConfiguration;
+import org.apache.airavata.gfac.GFacException;
+import org.apache.airavata.gfac.Scheduler;
+import org.apache.airavata.gfac.core.context.ApplicationContext;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.context.MessageContext;
+import org.apache.airavata.gfac.core.monitor.*;
+import org.apache.airavata.gfac.core.monitor.state.ExperimentStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
+import org.apache.airavata.gfac.core.monitor.state.TaskStatusChangeRequest;
+import org.apache.airavata.gfac.core.notification.MonitorPublisher;
+import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
+import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
+import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
+import org.apache.airavata.gfac.core.handler.GFacHandler;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.scheduler.HostScheduler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
+import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
+import org.apache.airavata.model.workspace.experiment.*;
+import org.apache.airavata.registry.api.AiravataRegistry2;
+import org.apache.airavata.registry.cpi.RegistryModelType;
+import org.apache.airavata.registry.cpi.Registry;
+import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.xpath.XPathExpressionException;
+
+/**
+ * This is the GFac CPI class for external usage, this simply have a single method to submit a job to
+ * the resource, required data for the job has to be stored in registry prior to invoke this object.
+ */
+public class BetterGfacImpl implements GFac {
+    private static final Logger log = LoggerFactory.getLogger(GFacImpl.class);
+    public static final String ERROR_SENT = "ErrorSent";
+
+    private Registry registry;
+
+    private AiravataAPI airavataAPI;
+
+    private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;                       // we are not storing zk instance in to jobExecution context
+    
+    private static List<ThreadedHandler> daemonHandlers = new ArrayList<ThreadedHandler>();
+
+    private static File gfacConfigFile;
+
+    private static List<AbstractActivityListener> activityListeners =  new ArrayList<AbstractActivityListener>();
+
+    private static MonitorPublisher monitorPublisher;
+
+    /**
+     * Constructor for GFac
+     *
+     * @param registry
+     * @param airavataAPI
+     * @param airavataRegistry2
+     * @param zooKeeper
+     */
+    public BetterGfacImpl(Registry registry, AiravataAPI airavataAPI, AiravataRegistry2 airavataRegistry2, ZooKeeper zooKeeper,
+                          MonitorPublisher publisher) {
+        this.registry = registry;
+        this.airavataAPI = airavataAPI;
+        this.airavataRegistry2 = airavataRegistry2;
+        monitorPublisher = publisher;     // This is a EventBus common for gfac
+        this.zk = zooKeeper;
+    }
+
+    public static void startStatusUpdators(Registry registry,ZooKeeper zk,MonitorPublisher publisher) {
+        try {
+            String[] listenerClassList = ServerSettings.getActivityListeners();
+            for (String listenerClass : listenerClassList) {
+                Class<? extends AbstractActivityListener> aClass = Class.forName(listenerClass).asSubclass(AbstractActivityListener.class);
+                AbstractActivityListener abstractActivityListener = aClass.newInstance();
+                activityListeners.add(abstractActivityListener);
+                abstractActivityListener.setup(publisher, registry,zk);
+                log.info("Registering listener: " + listenerClass);
+                publisher.registerListener(abstractActivityListener);
+            }
+        }catch (ClassNotFoundException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (InstantiationException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (IllegalAccessException e) {
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        } catch (ApplicationSettingsException e){
+            log.error("Error loading the listener classes configured in airavata-server.properties",e);
+        }
+    }
+    public static void startDaemonHandlers()  {
+        List<GFacHandlerConfig> daemonHandlerConfig = null;
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        gfacConfigFile = new File(resource.getPath());
+        try {
+            daemonHandlerConfig = GFacConfiguration.getDaemonHandlers(gfacConfigFile);
+        } catch (ParserConfigurationException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration",e);
+        } catch (IOException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (SAXException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        } catch (XPathExpressionException e) {
+            log.error("Error parsing gfac-config.xml, double check the xml configuration", e);
+        }
+
+        for(GFacHandlerConfig handlerConfig:daemonHandlerConfig){
+            String className = handlerConfig.getClassName();
+            try {
+                Class<?> aClass = Class.forName(className).asSubclass(ThreadedHandler.class);
+                ThreadedHandler threadedHandler = (ThreadedHandler) aClass.newInstance();
+                threadedHandler.initProperties(handlerConfig.getProperties());
+                daemonHandlers.add(threadedHandler);
+            }catch (ClassNotFoundException e){
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (InstantiationException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (IllegalAccessException e) {
+                log.error("Error initializing the handler: " + className);
+                log.error(className + " class has to implement " + ThreadedHandler.class);
+            } catch (GFacHandlerException e) {
+                log.error("Error initializing the handler " + className);
+            } catch (GFacException e) {
+                e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+            }
+        }
+        for(ThreadedHandler tHandler:daemonHandlers){
+            (new Thread(tHandler)).start();
+        }
+    }
+
+    /**
+     * This can be used to submit jobs for testing purposes just by filling parameters by hand (JobExecutionContext)
+     */
+    public BetterGfacImpl() {
+        daemonHandlers = new ArrayList<ThreadedHandler>();
+        startDaemonHandlers();
+    }
+
+    /**
+     * This is the job launching method outsiders of GFac can use, this will invoke the GFac handler chain and providers
+     * And update the registry occordingly, so the users can query the database to retrieve status and output from Registry
+     *
+     * @param experimentID
+     * @return
+     * @throws GFacException
+     */
+    public boolean submitJob(String experimentID,String taskID) throws GFacException {
+        JobExecutionContext jobExecutionContext = null;
+        try {
+            jobExecutionContext = createJEC(experimentID, taskID);
+            return submitJob(jobExecutionContext);
+        } catch (Exception e) {
+            log.error("Error inovoking the job with experiment ID: " + experimentID);
+            throw new GFacException(e);
+        }
+    }
+
+    private JobExecutionContext createJEC(String experimentID, String taskID) throws Exception {
+        JobExecutionContext jobExecutionContext;
+        TaskDetails taskData = (TaskDetails) registry.get(RegistryModelType.TASK_DETAIL, taskID);
+
+        // this is wear our new model and old model is mapping (so serviceName in ExperimentData and service name in ServiceDescriptor
+        // has to be same.
+
+        // 1. Get the Task from the task ID and construct the Job object and save it in to registry
+        // 2. Add another property to jobExecutionContext and read them inside the provider and use it.
+        String serviceName = taskData.getApplicationId();
+        if (serviceName == null) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+       
+        ServiceDescription serviceDescription = airavataRegistry2.getServiceDescriptor(serviceName);
+        if (serviceDescription == null ) {
+            throw new GFacException("Error executing the job because there is not Application Name in this Experiment:  " + serviceName );
+        }
+        String hostName;
+        HostDescription hostDescription = null;
+        if(taskData.getTaskScheduling().getResourceHostId() != null){
+            hostName = taskData.getTaskScheduling().getResourceHostId();
+            hostDescription = airavataRegistry2.getHostDescriptor(hostName);
+        }else{
+        	  List<HostDescription> registeredHosts = new ArrayList<HostDescription>();
+              Map<String, ApplicationDescription> applicationDescriptors = airavataRegistry2.getApplicationDescriptors(serviceName);
+              for (String hostDescName : applicationDescriptors.keySet()) {
+                  registeredHosts.add(airavataRegistry2.getHostDescriptor(hostDescName));
+              }
+              Class<? extends HostScheduler> aClass = Class.forName(ServerSettings.getHostScheduler()).asSubclass(HostScheduler.class);
+             HostScheduler hostScheduler = aClass.newInstance();
+            hostDescription = hostScheduler.schedule(registeredHosts);
+        	hostName = hostDescription.getType().getHostName();
+        }
+        if(hostDescription == null){
+        	throw new GFacException("Error executing the job as the host is not registered " + hostName);	
+        }
+        ApplicationDescription applicationDescription = airavataRegistry2.getApplicationDescriptors(serviceName, hostName);
+        URL resource = GFacImpl.class.getClassLoader().getResource(org.apache.airavata.common.utils.Constants.GFAC_CONFIG_XML);
+        Properties configurationProperties = ServerSettings.getProperties();
+        GFacConfiguration gFacConfiguration = GFacConfiguration.create(new File(resource.getPath()), airavataAPI, configurationProperties);
+
+
+        // start constructing jobexecutioncontext
+        jobExecutionContext = new JobExecutionContext(gFacConfiguration, serviceName);
+
+        // setting experiment/task/workflownode related information
+        Experiment experiment = (Experiment) registry.get(RegistryModelType.EXPERIMENT, experimentID);
+        jobExecutionContext.setExperiment(experiment);
+        jobExecutionContext.setExperimentID(experimentID);
+        jobExecutionContext.setWorkflowNodeDetails(experiment.getWorkflowNodeDetailsList().get(0));
+        jobExecutionContext.setTaskData(taskData);
+
+        // setting the registry
+        jobExecutionContext.setRegistry(registry);
+
+        ApplicationContext applicationContext = new ApplicationContext();
+        applicationContext.setApplicationDeploymentDescription(applicationDescription);
+        applicationContext.setHostDescription(hostDescription);
+        applicationContext.setServiceDescription(serviceDescription);
+        jobExecutionContext.setApplicationContext(applicationContext);
+
+        List<DataObjectType> experimentInputs = taskData.getApplicationInputs();
+        jobExecutionContext.setInMessageContext(new MessageContext(GFacUtils.getMessageContext(experimentInputs,
+                serviceDescription.getType().getInputParametersArray())));
+
+        List<DataObjectType> outputData = taskData.getApplicationOutputs();
+        jobExecutionContext.setOutMessageContext(new MessageContext(GFacUtils.getMessageContext(outputData,
+                serviceDescription.getType().getOutputParametersArray())));
+
+        jobExecutionContext.setProperty(Constants.PROP_TOPIC, experimentID);
+
+        return jobExecutionContext;
+    }
+
+    public boolean submitJob(JobExecutionContext jobExecutionContext) throws GFacException {
+        // We need to check whether this job is submitted as a part of a large workflow. If yes,
+        // we need to setup workflow tracking listerner.
+        String workflowInstanceID = null;
+        if ((workflowInstanceID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_INSTANCE_ID)) != null) {
+            // This mean we need to register workflow tracking listener.
+            //todo implement WorkflowTrackingListener properly
+            registerWorkflowTrackingListener(workflowInstanceID, jobExecutionContext);
+        }
+        // Register log event listener. This is required in all scenarios.
+        jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
+        schedule(jobExecutionContext);
+        return true;
+    }
+
+    private void schedule(JobExecutionContext jobExecutionContext) throws GFacException {
+        // Scheduler will decide the execution flow of handlers and provider which handles
+        // the job.
+        String experimentID = jobExecutionContext.getExperimentID();
+        try {
+            Scheduler.schedule(jobExecutionContext);
+
+            // Executing in handlers in the order as they have configured in GFac configuration
+            invokeInFlowHandlers(jobExecutionContext);
+//            if (experimentID != null){
+//                registry2.changeStatus(jobExecutionContext.getExperimentID(),AiravataJobState.State.INHANDLERSDONE);
+//            }
+
+            // After executing the in handlers provider instance should be set to job execution context.
+            // We get the provider instance and execute it.
+            GFacProvider provider = jobExecutionContext.getProvider();
+            if (provider != null) {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKING));
+                initProvider(provider, jobExecutionContext);
+                executeProvider(provider, jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.PROVIDERINVOKED));
+            }
+            if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+                invokeOutFlowHandlers(jobExecutionContext);
+            }
+        } catch (Exception e) {
+            try {
+                // we make the experiment as failed due to exception scenario
+                monitorPublisher.publish(new
+                        ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                        ExperimentState.FAILED));
+                // Updating the task status if there's any task associated
+                monitorPublisher.publish(new TaskStatusChangeRequest(
+                        new TaskIdentity(jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getTaskData().getTaskID()), TaskState.FAILED
+                ));
+                monitorPublisher.publish(new JobStatusChangeRequest(new MonitorID(jobExecutionContext),
+                        new JobIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getJobDetails().getJobID()), JobState.FAILED));
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.FAILED));
+            } catch (NullPointerException e1) {
+                log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, " +
+                        "NullPointerException occurred because at this point there might not have Job Created", e1, e);
+            }
+            jobExecutionContext.setProperty(ERROR_SENT, "true");
+            jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
+            throw new GFacException(e.getMessage(), e);
+        }
+    }
+
+    private void initProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.initialize(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while initializing provider " + provider.getClass().getName() + ".", e);
+        }
+    }
+
+    private void executeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+             provider.execute(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while executing provider " + provider.getClass().getName() + " functionality.", e);
+        }
+    }
+
+    private void disposeProvider(GFacProvider provider, JobExecutionContext jobExecutionContext) throws GFacException {
+        try {
+            provider.dispose(jobExecutionContext);
+        } catch (Exception e) {
+            throw new GFacException("Error while invoking provider " + provider.getClass().getName() + " dispose method.", e);
+        }
+    }
+
+    private void registerWorkflowTrackingListener(String workflowInstanceID, JobExecutionContext jobExecutionContext) {
+        String workflowNodeID = (String) jobExecutionContext.getProperty(Constants.PROP_WORKFLOW_NODE_ID);
+        String topic = (String) jobExecutionContext.getProperty(Constants.PROP_TOPIC);
+        String brokerUrl = (String) jobExecutionContext.getProperty(Constants.PROP_BROKER_URL);
+        jobExecutionContext.getNotificationService().registerListener(
+                new WorkflowTrackingListener(workflowInstanceID, workflowNodeID, brokerUrl, topic));
+
+    }
+
+    private void invokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (GFacHandlerException e) {
+                throw new GFacException("Error Executing a InFlow Handler", e.getCause());
+            }
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                           ,GfacExperimentState.INHANDLERSINVOKED));
+    }
+
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if(gFacConfiguration != null){
+         handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        }else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e) {
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
+            }
+            schedule(jobExecutionContext);
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.OUTHANDLERSINVOKED));
+        }
+
+        // At this point all the execution is finished so we update the task and experiment statuses.
+        // Handler authors does not have to worry about updating experiment or task statuses.
+        monitorPublisher.publish(new
+                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                ExperimentState.COMPLETED));
+        // Updating the task status if there's any task associated
+        monitorPublisher.publish(new TaskStatusChangeRequest(
+                new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+        ));
+
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext),GfacExperimentState.COMPLETED));
+    }
+
+
+    public AiravataAPI getAiravataAPI() {
+        return airavataAPI;
+    }
+
+    public AiravataRegistry2 getAiravataRegistry2() {
+        return airavataRegistry2;
+    }
+
+    public static List<ThreadedHandler> getDaemonHandlers() {
+        return daemonHandlers;
+    }
+
+    public static String getErrorSent() {
+        return ERROR_SENT;
+    }
+
+    public File getGfacConfigFile() {
+        return gfacConfigFile;
+    }
+
+    public static MonitorPublisher getMonitorPublisher() {
+        return monitorPublisher;
+    }
+
+    public Registry getRegistry() {
+        return registry;
+    }
+
+    public ZooKeeper getZk() {
+        return zk;
+    }
+
+    public void setZk(ZooKeeper zk) {
+        this.zk = zk;
+    }
+}

http://git-wip-us.apache.org/repos/asf/airavata/blob/362da4e8/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
index f1fa244..a6908ba 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/GFacImpl.java
@@ -58,11 +58,13 @@ import org.apache.airavata.gfac.core.handler.GFacHandlerConfig;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.gfac.workspace.experiment.GfacExperimentState;
 import org.apache.airavata.model.workspace.experiment.*;
 import org.apache.airavata.registry.api.AiravataRegistry2;
 import org.apache.airavata.registry.cpi.RegistryModelType;
 import org.apache.airavata.registry.cpi.Registry;
 import org.apache.tools.ant.taskdefs.optional.j2ee.HotDeploymentTool;
+import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.xml.sax.SAXException;
@@ -83,6 +85,8 @@ public class GFacImpl implements GFac {
     private AiravataAPI airavataAPI;
 
     private AiravataRegistry2 airavataRegistry2;
+
+    private ZooKeeper zk;
     
     private static List<ThreadedHandler> daemonHandlers;
 
@@ -436,6 +440,8 @@ public class GFacImpl implements GFac {
                 throw new GFacException("Error Executing a OutFlow Handler", e);
             }
         }
+
+        monitorPublisher.publish(GfacExperimentState.COMPLETED);
         // At this point all the execution is finished so we update the task and experiment statuses.
         // Handler authors does not have to worry about updating experiment or task statuses.
         monitorPublisher.publish(new