You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/25 22:17:27 UTC

[1/2] git commit: Committing the version 1.0 of Zookeeper implementation

Repository: airavata
Updated Branches:
  refs/heads/master 441bf7658 -> 109b679ad


Committing the version 1.0 of Zookeeper implementation


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c05fd14b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c05fd14b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c05fd14b

Branch: refs/heads/master
Commit: c05fd14b01589d0d69b90fbf1b2d7a21667f205e
Parents: 236c04a
Author: lahiru <la...@apache.org>
Authored: Wed Jun 25 16:15:49 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Jun 25 16:15:49 2014 -0400

----------------------------------------------------------------------
 .../main/resources/airavata-server.properties   |   4 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 310 ++++++++++---------
 .../core/handler/AppDescriptorCheckHandler.java |  38 +--
 .../core/provider/GFacRecoverableProvider.java  |   3 +-
 .../gfac/core/states/GfacPluginState.java       |   4 +-
 .../airavata/gfac/core/utils/GFacUtils.java     | 136 +++++---
 .../gsissh/provider/impl/GSISSHProvider.java    | 161 +++++++---
 .../util/OrchestratorRecoveryHandler.java       |   2 +-
 8 files changed, 381 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 726225a..ecc0932 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -279,8 +279,8 @@ connection.name=xsede
 activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
 
 ###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
 job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
 submitter.interval=10000
 threadpool.size=10

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 4bfbb68..dc635d1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -53,6 +53,7 @@ import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
 import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
 import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
 import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider;
 import org.apache.airavata.gfac.core.scheduler.HostScheduler;
 import org.apache.airavata.gfac.core.states.GfacPluginState;
@@ -302,9 +303,10 @@ public class BetterGfacImpl implements GFac {
             } else if (stateVal >= 8) {
                 log.info("There is nothing to recover in this job so we do not re-submit");
                 ZKUtil.deleteRecursive(zk,
-                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(),jobExecutionContext.getTaskData().getTaskID()));
+                        AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()));
             } else {
                 // Now we know this is an old Job, so we have to handle things gracefully
+                log.info("Re-launching the job in GFac because this is re-submitted to GFac");
                 reLaunch(jobExecutionContext, stateVal);
             }
             return true;
@@ -318,7 +320,7 @@ public class BetterGfacImpl implements GFac {
         return true;
     }
 
-    private void reLaunch(JobExecutionContext jobExecutionContext,int stateVal) throws GFacException {
+    private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException {
         // Scheduler will decide the execution flow of handlers and provider which handles
         // the job.
         String experimentID = jobExecutionContext.getExperimentID();
@@ -332,17 +334,17 @@ public class BetterGfacImpl implements GFac {
 
             // After executing the in handlers provider instance should be set to job execution context.
             // We get the provider instance and execute it.
-            if(stateVal == 2 || stateVal == 3){
+            if (stateVal == 2 || stateVal == 3) {
                 invokeProvider(jobExecutionContext);     // provider never ran in previous invocation
-            }else if(stateVal == 4){   // whether sync or async job have to invoke the recovering because it crashed in the Handler
+            } else if (stateVal == 4) {   // whether sync or async job have to invoke the recovering because it crashed in the Handler
                 reInvokeProvider(jobExecutionContext);
-            }else if(stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)){
+            } else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)) {
                 // In this case we do nothing because provider ran successfully, no need to re-run the job
                 log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
-            } else if(stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)){
+            } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) {
                 // this is async mode where monitoring of jobs is hapenning, we have to recover
                 reInvokeProvider(jobExecutionContext);
-            } else{
+            } else {
                 log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!");
                 log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
             }
@@ -419,13 +421,15 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
-    private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+    private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+            GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
             initProvider(provider, jobExecutionContext);
             executeProvider(provider, jobExecutionContext);
             disposeProvider(provider, jobExecutionContext);
+            GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
         if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -433,22 +437,33 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
-    private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+    private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            if (provider instanceof GFacRecoverableProvider) {
-                ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
+            String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
+            if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
+                if (provider instanceof GFacRecoverableProvider) {
+                    GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+                    ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
+                    GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
+                }
             } else {
+                GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
                 initProvider(provider, jobExecutionContext);
                 executeProvider(provider, jobExecutionContext);
                 disposeProvider(provider, jobExecutionContext);
+                GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
             }
             monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
-        if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+
+        if (GFacUtils.isSynchronousMode(jobExecutionContext))
+
+        {
             invokeOutFlowHandlers(jobExecutionContext);
         }
+
     }
 
 
@@ -507,7 +522,7 @@ public class BetterGfacImpl implements GFac {
                 }
                 try {
                     handler.invoke(jobExecutionContext);
-                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
+                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
                     // if exception thrown before that we do not make it finished
                 } catch (GFacHandlerException e) {
                     throw new GFacException("Error Executing a InFlow Handler", e.getCause());
@@ -520,9 +535,69 @@ public class BetterGfacImpl implements GFac {
         }
     }
 
+    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if (gFacConfiguration != null) {
+            handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        } else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e) {
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
+            }
+            launch(jobExecutionContext);
+        }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                handler.initProperties(handlerClassName.getProperties());
+            } catch (ClassNotFoundException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (Exception e) {
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            }
+            try {
+                handler.invoke(jobExecutionContext);
+                GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
+            }
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+        }
+
+        // At this point all the execution is finished so we update the task and experiment statuses.
+        // Handler authors does not have to worry about updating experiment or task statuses.
+        monitorPublisher.publish(new
+                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                ExperimentState.COMPLETED));
+        // Updating the task status if there's any task associated
+        monitorPublisher.publish(new TaskStatusChangeRequest(
+                new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+        ));
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+    }
+
     /**
      * If handlers ran successfully we re-run only recoverable handlers
      * If handler never ran we run the normal invoke method
+     *
      * @param jobExecutionContext
      * @throws GFacException
      */
@@ -537,17 +612,23 @@ public class BetterGfacImpl implements GFac {
                 try {
                     handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
                     handler = handlerClass.newInstance();
-                    if (!GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName())) {
-                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
-                        handler.initProperties(handlerClassName.getProperties());
-                        handler.invoke(jobExecutionContext);
-                        GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
-                    } else {
+                    String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+                    if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {
                         if (handler instanceof GFacRecoverableHandler) {
                             // if these already ran we re-run only recoverable handlers
+                            log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+                            GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
                             ((GFacRecoverableHandler) handler).recover(jobExecutionContext);
-                            GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
+                            GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+                        } else {
+                            log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
                         }
+                    } else {
+                        log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+                        handler.initProperties(handlerClassName.getProperties());
+                        handler.invoke(jobExecutionContext);
+                        GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
                     }
                 } catch (GFacHandlerException e) {
                     throw new GFacException("Error Executing a InFlow Handler", e.getCause());
@@ -567,141 +648,74 @@ public class BetterGfacImpl implements GFac {
     }
 
     public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        int stateVal = -1;
-        try {
-            stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
-        } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        if (stateVal >= 0 && stateVal < 6) {
-            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-            List<GFacHandlerConfig> handlers = null;
-            if (gFacConfiguration != null) {
-                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-            } else {
-                try {
-                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
-                } catch (Exception e) {
-                    log.error("Error constructing job execution context during outhandler invocation");
-                    throw new GFacException(e);
-                }
-                launch(jobExecutionContext);
-            }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-            for (GFacHandlerConfig handlerClassName : handlers) {
-                Class<? extends GFacHandler> handlerClass;
-                GFacHandler handler;
-                try {
-                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                    handler = handlerClass.newInstance();
-                    handler.initProperties(handlerClassName.getProperties());
-                } catch (ClassNotFoundException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                } catch (InstantiationException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (IllegalAccessException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                }
-                try {
-                    handler.invoke(jobExecutionContext);
-                } catch (Exception e) {
-                    // TODO: Better error reporting.
-                    throw new GFacException("Error Executing a OutFlow Handler", e);
-                }
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+        List<GFacHandlerConfig> handlers = null;
+        if (gFacConfiguration != null) {
+            handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+        } else {
+            try {
+                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+            } catch (Exception e) {
+                log.error("Error constructing job execution context during outhandler invocation");
+                throw new GFacException(e);
             }
-
-            // At this point all the execution is finished so we update the task and experiment statuses.
-            // Handler authors does not have to worry about updating experiment or task statuses.
-            monitorPublisher.publish(new
-                    ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                    ExperimentState.COMPLETED));
-            // Updating the task status if there's any task associated
-            monitorPublisher.publish(new TaskStatusChangeRequest(
-                    new TaskIdentity(jobExecutionContext.getExperimentID(),
-                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                            jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-            ));
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
-        }
-    }
-
-    public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
-        int stateVal = -1;
-        try {
-            stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
-        } catch (ApplicationSettingsException e) {
-            e.printStackTrace();
-        } catch (KeeperException e) {
-            e.printStackTrace();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+            launch(jobExecutionContext);
         }
-        if (stateVal >= 0 && stateVal < 6) {
-            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-            List<GFacHandlerConfig> handlers = null;
-            if (gFacConfiguration != null) {
-                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-            } else {
-                try {
-                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
-                } catch (Exception e) {
-                    log.error("Error constructing job execution context during outhandler invocation");
-                    throw new GFacException(e);
-                }
-                launch(jobExecutionContext);
-            }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-            for (GFacHandlerConfig handlerClassName : handlers) {
-                Class<? extends GFacHandler> handlerClass;
-                GFacHandler handler;
-                try {
-                    GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
-                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                    handler = handlerClass.newInstance();
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+        for (GFacHandlerConfig handlerClassName : handlers) {
+            Class<? extends GFacHandler> handlerClass;
+            GFacHandler handler;
+            try {
+                handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                handler = handlerClass.newInstance();
+                String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+                if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {
+                    if (handler instanceof GFacRecoverableHandler) {
+                        // if these already ran we re-run only recoverable handlers
+                        log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+                        ((GFacRecoverableHandler) handler).recover(jobExecutionContext);
+                        GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+                    } else {
+                        log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
+                    }
+                } else {
+                    log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+                    GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
                     handler.initProperties(handlerClassName.getProperties());
-                } catch (ClassNotFoundException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                } catch (InstantiationException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (IllegalAccessException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (Exception e) {
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                }
-                try {
                     handler.invoke(jobExecutionContext);
-                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
-                } catch (Exception e) {
-                    // TODO: Better error reporting.
-                    throw new GFacException("Error Executing a OutFlow Handler", e);
+                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
                 }
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+                handler.initProperties(handlerClassName.getProperties());
+                handler.invoke(jobExecutionContext);
+            } catch (ClassNotFoundException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot load handler class " + handlerClassName, e);
+            } catch (InstantiationException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (IllegalAccessException e) {
+                log.error(e.getMessage());
+                throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+            } catch (Exception e) {
+                // TODO: Better error reporting.
+                throw new GFacException("Error Executing a OutFlow Handler", e);
             }
-
-            // At this point all the execution is finished so we update the task and experiment statuses.
-            // Handler authors does not have to worry about updating experiment or task statuses.
-            monitorPublisher.publish(new
-                    ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
-                    ExperimentState.COMPLETED));
-            // Updating the task status if there's any task associated
-            monitorPublisher.publish(new TaskStatusChangeRequest(
-                    new TaskIdentity(jobExecutionContext.getExperimentID(),
-                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                            jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
-            ));
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
         }
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+
+        // At this point all the execution is finished so we update the task and experiment statuses.
+        // Handler authors does not have to worry about updating experiment or task statuses.
+        monitorPublisher.publish(new
+                ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+                ExperimentState.COMPLETED));
+        // Updating the task status if there's any task associated
+        monitorPublisher.publish(new TaskStatusChangeRequest(
+                new TaskIdentity(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                        jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+        ));
+        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
     }
 
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 86b89c1..2183f59 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataZKUtils;
 import org.apache.airavata.commons.gfac.type.ApplicationDescription;
 import org.apache.airavata.gfac.Constants;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
@@ -91,20 +92,11 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler {
 
 
         logger.info("Recoverable data is saving to zk: " + data.toString());
-        try {
-            ZooKeeper zk = jobExecutionContext.getZk();
-            if (zk != null) {
-                String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getTaskData().getTaskID(), this.getClass().getName());
-                Stat exists = zk.exists(expZnodeHandlerPath, false);
-                jobExecutionContext.getZk().setData(expZnodeHandlerPath, data.toString().getBytes(), exists.getVersion());
-            }
-        } catch (Exception e) {
-            throw new GFacHandlerException(e);
-        }
-
+        GFacUtils.savePluginData(jobExecutionContext, data,this.getClass().getName());
     }
 
+
+
     public void initProperties(Properties properties) throws GFacHandlerException {
 
     }
@@ -113,20 +105,14 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler {
         ApplicationDescription app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
         ApplicationDeploymentDescriptionType appDesc = app.getType();
         try {
-            ZooKeeper zk = jobExecutionContext.getZk();
-            if (zk != null) {
-                String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getTaskData().getTaskID(), this.getClass().getName());
-                Stat exists = zk.exists(expZnodeHandlerPath, false);
-                String s = new String(jobExecutionContext.getZk().getData(expZnodeHandlerPath, false, exists));
-                String[] split = s.split(",");
-                appDesc.setScratchWorkingDirectory(split[0]);
-                appDesc.setStaticWorkingDirectory(split[1]);
-                appDesc.setInputDataDirectory(split[2]);
-                appDesc.setOutputDataDirectory(split[3]);
-                appDesc.setStandardOutput(split[4]);
-                appDesc.setStandardError(split[5]);
-            }
+            String s = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+            String[] split = s.split(",");
+            appDesc.setScratchWorkingDirectory(split[0]);
+            appDesc.setStaticWorkingDirectory(split[1]);
+            appDesc.setInputDataDirectory(split[2]);
+            appDesc.setOutputDataDirectory(split[3]);
+            appDesc.setStandardOutput(split[4]);
+            appDesc.setStandardError(split[5]);
         } catch (Exception e) {
             throw new GFacHandlerException(e);
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
index 6f98cf6..6cc9820 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
@@ -20,6 +20,7 @@
 */
 package org.apache.airavata.gfac.core.provider;
 
+import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 
 /**
@@ -39,5 +40,5 @@ public interface GFacRecoverableProvider extends GFacProvider {
      *
      * @param jobExecutionContext
      */
-    public void recover(JobExecutionContext jobExecutionContext);
+    public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
index 53df1d4..b934303 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
@@ -22,8 +22,8 @@ package org.apache.airavata.gfac.core.states;
 
 public enum GfacPluginState {
     INVOKING(0),
-
-    INVOKED(1);
+    INVOKED(1),
+    COMPLETED(2);
 
     private final int value;
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index b3c6211..e2594bf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -37,6 +37,7 @@ import org.apache.airavata.gfac.ExecutionMode;
 import org.apache.airavata.gfac.GFacConfiguration;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.states.GfacPluginState;
 import org.apache.airavata.model.workspace.experiment.*;
@@ -702,41 +703,42 @@ public class GFacUtils {
 
         exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
         if (exists != null) {
-            zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, GfacPluginState.INVOKING.toString().getBytes(),
+            zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(GfacPluginState.INVOKING.getValue()).getBytes(),
                     exists.getVersion());
         }
         return true;
     }
 
     public static boolean createPluginZnode(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
-                                            GfacPluginState state)throws ApplicationSettingsException, KeeperException, InterruptedException {
-            String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
-                    jobExecutionContext.getTaskData().getTaskID(), className);
-            Stat exists = zk.exists(expState, false);
-            if (exists == null) {
-                zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                        CreateMode.PERSISTENT);
+                                            GfacPluginState state) throws ApplicationSettingsException, KeeperException, InterruptedException {
+        String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+                jobExecutionContext.getTaskData().getTaskID(), className);
+        Stat exists = zk.exists(expState, false);
+        if (exists == null) {
+            zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
 
+            zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+                    new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } else {
+            exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+            if (exists == null) {
                 zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
                         new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
-            } else {
-                exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
-                if (exists == null) {
-                    zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
-                            new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                            CreateMode.PERSISTENT);
-                }
             }
+        }
 
-            exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
-            if (exists != null) {
-                zk.setData(expState + File.separator +
-                                AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
-                        exists.getVersion());
-            }
-            return true;
+        exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+        if (exists != null) {
+            zk.setData(expState + File.separator +
+                            AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
+                    exists.getVersion()
+            );
         }
+        return true;
+    }
 
     public static boolean updatePluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
                                             GfacPluginState state)
@@ -748,25 +750,25 @@ public class GFacUtils {
         if (exists != null) {
             zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
                     exists.getVersion());
-        }else {
-            createPluginZnode(zk, jobExecutionContext, className,state);
+        } else {
+            createPluginZnode(zk, jobExecutionContext, className, state);
         }
         return true;
     }
 
-    public static boolean getPluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) {
+    public static String getPluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) {
         try {
             String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
                     jobExecutionContext.getTaskData().getTaskID(), className);
 
             Stat exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
             if (exists != null) {
-                return Boolean.valueOf(new String(zk.getData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false, exists)));
+                return new String(zk.getData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false, exists));
             }
-            return false;        // if the node doesn't exist or any other error we return false
+            return null;        // if the node doesn't exist or any other error we return false
         } catch (Exception e) {
             e.printStackTrace();
-            return false;
+            return null;
         }
     }
 
@@ -781,42 +783,51 @@ public class GFacUtils {
             for (String gfacServerNode : runningGfacNodeNames) {
                 if (!gfacServerNode.equals(pickedChild)) {
                     foundExperimentPath = experimentNode + File.separator + gfacServerNode +
-                                                File.separator + experimentID + "+" + taskID;
+                            File.separator + experimentID + "+" + taskID;
                     exists1 = zk.exists(foundExperimentPath, false);
-                    break;
+                    if(exists1 != null) {               // when the experiment is found we break the loop
+                        break;
+                    }
                 }
             }
             if (exists1 == null) {  // OK this is a pretty new experiment so we are going to create a new node
+                log.info("This is a new Job, so creating all the experiment docs from the scratch");
                 zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
 
                 zk.create(newExpNode + File.separator + "state", String.valueOf(GfacExperimentState.LAUNCHED.getValue()).getBytes(),
                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
-            }else {
+            } else {
                 // ohhh this node exists in some other failed gfac folder, we have to move it to this gfac experiment list,safely
-                zk.create(newExpNode, zk.getData(foundExperimentPath,false,exists1), ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                        CreateMode.PERSISTENT);
+                log.info("This is an old Job, so copying data from old experiment location");
+                zk.create(newExpNode, zk.getData(foundExperimentPath, false, exists1), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
 
                 List<String> children = zk.getChildren(foundExperimentPath, false);
-                for(String childNode1:children){
-                    String level1 = foundExperimentPath+File.separator+childNode1;
-                    Stat exists2 = zk.exists(level1,false);             // no need to check exists
-                    String newLeve1 = newExpNode+File.separator+childNode1;
-                    zk.create(newLeve1,zk.getData(
-                                    level1,false,exists2),ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                                            CreateMode.PERSISTENT);
-                    for(String childNode2:zk.getChildren(level1,false)){
-                        String level2 = level1+File.separator+childNode2;
-                        Stat exists3 = zk.exists(level2,false);             // no need to check exists
-                        String newLeve2 = newLeve1+File.separator+childNode2;
-                        zk.create(newLeve2,zk.getData(level2,false,exists3),ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                                                                                    CreateMode.PERSISTENT);
-                        zk.delete(level2, exists3.getVersion());
+                for (String childNode1 : children) {
+                    String level1 = foundExperimentPath + File.separator + childNode1;
+                    Stat exists2 = zk.exists(level1, false);             // no need to check exists
+                    String newLeve1 = newExpNode + File.separator + childNode1;
+                    log.info("Creating new znode: " + newLeve1);       // these has to be info logs
+                    zk.create(newLeve1, zk.getData(
+                                    level1, false, exists2), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                            CreateMode.PERSISTENT
+                    );
+                    for (String childNode2 : zk.getChildren(level1, false)) {
+                        String level2 = level1 + File.separator + childNode2;
+                        Stat exists3 = zk.exists(level2, false);             // no need to check exists
+                        String newLeve2 = newLeve1 + File.separator + childNode2;
+                        log.info("Creating new znode: " + newLeve2);
+                        zk.create(newLeve2, zk.getData(level2, false, exists3), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                                CreateMode.PERSISTENT);
                     }
-                    zk.delete(level1,exists2.getVersion());
                 }
-                zk.delete(foundExperimentPath,exists1.getVersion());
+                // After all the files are successfully transfered we delete the old experiment,otherwise we do
+                // not delete a single file
+                log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+                log.info("Deleting experiment data: " + foundExperimentPath);
+                ZKUtil.deleteRecursive(zk,foundExperimentPath);
             }
         } else {
             log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " is already running by this Gfac instance");
@@ -828,10 +839,35 @@ public class GFacUtils {
                     break;
                 }
             }
-            ZKUtil.deleteRecursive(zk,foundExperimentPath);
+            ZKUtil.deleteRecursive(zk, foundExperimentPath);
             return false;
         }
         return true;
     }
 
+    public static void savePluginData(JobExecutionContext jobExecutionContext, StringBuffer data, String className) throws GFacHandlerException {
+        try {
+            ZooKeeper zk = jobExecutionContext.getZk();
+            if (zk != null) {
+                String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+                        jobExecutionContext.getTaskData().getTaskID(), className);
+                Stat exists = zk.exists(expZnodeHandlerPath, false);
+                zk.setData(expZnodeHandlerPath, data.toString().getBytes(), exists.getVersion());
+            }
+        } catch (Exception e) {
+            throw new GFacHandlerException(e);
+        }
+    }
+
+    public static String getPluginData(JobExecutionContext jobExecutionContext, String className) throws ApplicationSettingsException, KeeperException, InterruptedException {
+        ZooKeeper zk = jobExecutionContext.getZk();
+        if (zk != null) {
+            String expZnodeHandlerPath =  AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+                    jobExecutionContext.getTaskData().getTaskID(), className);
+            Stat exists = zk.exists(expZnodeHandlerPath, false);
+            return new String(jobExecutionContext.getZk().getData(expZnodeHandlerPath, false, exists));
+        }
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 262b832..6b39819 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -20,16 +20,19 @@
 */
 package org.apache.airavata.gfac.gsissh.provider.impl;
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.gfac.ExecutionMode;
 import org.apache.airavata.gfac.GFacException;
 import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.cpi.GFacImpl;
 import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -44,6 +47,7 @@ import org.apache.airavata.model.workspace.experiment.JobState;
 import org.apache.airavata.schemas.gfac.GsisshHostType;
 import org.apache.airavata.schemas.gfac.HostDescriptionType;
 import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,18 +62,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
     }
 
     public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
-    	super.initialize(jobExecutionContext);
+        super.initialize(jobExecutionContext);
     }
 
     public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
         log.info("Invoking GSISSH Provider Invoke ...");
+        StringBuffer data = new StringBuffer();
         jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
         HostDescriptionType host = jobExecutionContext.getApplicationContext().
                 getHostDescription().getType();
         HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
                 getApplicationDeploymentDescription().getType();
         JobDetails jobDetails = new JobDetails();
-     	String taskID = jobExecutionContext.getTaskData().getTaskID();
         try {
             Cluster cluster = null;
             if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
@@ -84,70 +88,78 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
             JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
 
             log.info(jobDescriptor.toXML());
-            
+            data.append("jobDesc=").append(jobDescriptor.toXML());
             jobDetails.setJobDescription(jobDescriptor.toXML());
-            
+
             String jobID = cluster.submitBatchJob(jobDescriptor);
             jobExecutionContext.setJobDetails(jobDetails);
-            if(jobID == null){
+            if (jobID == null) {
                 jobDetails.setJobID("none");
                 GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
-            }else{
+            } else {
                 jobDetails.setJobID(jobID);
                 GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
             }
-
+            data.append(",jobId=").append(jobDetails.getJobID());
 
             // Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
             // to perform monitoring, daemon handlers can be accessed from anywhere
-            List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
-            if(daemonHandlers == null){
-                daemonHandlers = BetterGfacImpl.getDaemonHandlers();
-            }
-            ThreadedHandler pullMonitorHandler = null;
-            ThreadedHandler pushMonitorHandler = null;
-            String monitorMode = ((GsisshHostType) host).getMonitorMode();
-            for(ThreadedHandler threadedHandler:daemonHandlers){
-                if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
-                    pullMonitorHandler = threadedHandler;
-                    if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
-                        log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
-                        pullMonitorHandler.invoke(jobExecutionContext);
-                    }else{
-                        log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
-                                " to handle by the GridPullMonitorHandler");
-                    }
-                }else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())){
-                    pushMonitorHandler = threadedHandler;
-                    if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)){
-                        log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
-                        pushMonitorHandler.invoke(jobExecutionContext);
-                    }else{
-                        log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
-                                " to handle by the GridPushMonitorHandler");
-                    }
-                }
-                // have to handle the GridPushMonitorHandler logic
-            }
-            if(pullMonitorHandler == null && pushMonitorHandler==null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())){
-                log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
-                        ", execution is configured as asynchronous, so Outhandler will not be invoked");
-            }
+            delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobID);
             // we know this host is type GsiSSHHostType
         } catch (SSHApiException e) {
             String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
             log.error(error);
             jobDetails.setJobID("none");
-        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
-         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             throw new GFacProviderException(error, e);
         } catch (Exception e) {
-        	String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
-         	log.error(error);
+            String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+            log.error(error);
             jobDetails.setJobID("none");
-        	GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
-         	GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+            GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             throw new GFacProviderException(error, e);
+        } finally {
+            log.info("Saving data for future recovery: ");
+            log.info(data.toString());
+            GFacUtils.savePluginData(jobExecutionContext, data, this.getClass().getName());
+        }
+    }
+
+    public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+        List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+        if (daemonHandlers == null) {
+            daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+        }
+        ThreadedHandler pullMonitorHandler = null;
+        ThreadedHandler pushMonitorHandler = null;
+        String monitorMode = ((GsisshHostType) host).getMonitorMode();
+        for (ThreadedHandler threadedHandler : daemonHandlers) {
+            if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pullMonitorHandler = threadedHandler;
+                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+                    log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned:  " + jobID);
+                    pullMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+                            " to handle by the GridPullMonitorHandler");
+                }
+            } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+                pushMonitorHandler = threadedHandler;
+                if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+                    log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned:  " + jobID);
+                    pushMonitorHandler.invoke(jobExecutionContext);
+                } else {
+                    log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+                            " to handle by the GridPushMonitorHandler");
+                }
+            }
+            // have to handle the GridPushMonitorHandler logic
+        }
+        if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+            log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+                    ", execution is configured as asynchronous, so Outhandler will not be invoked");
         }
     }
 
@@ -159,8 +171,63 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
         //To change body of implemented methods use File | Settings | File Templates.
     }
 
-    public void recover(JobExecutionContext jobExecutionContext) {
+    public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
         // have to implement the logic to recover a gfac failure
         log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
+        HostDescriptionType host = jobExecutionContext.getApplicationContext().
+                getHostDescription().getType();
+        String jobId = "";
+        String jobDesc = "";
+        try {
+            String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+            String[] split = pluginData.split(",");
+            if (split.length < 2) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    throw new GFacProviderException("Error recovering provider", e);
+                }
+                return;
+            }
+            jobDesc = split[0].substring(7);
+            jobId = split[1].substring(6);
+
+            log.info("Following data have recovered: ");
+            log.info("Job Description: " + jobDesc);
+            log.info("Job Id: " + jobId);
+            if (jobId == null || "none".equals(jobId) ||
+                    "".equals(jobId)) {
+                try {
+                    this.execute(jobExecutionContext);
+                } catch (GFacException e) {
+                    throw new GFacProviderException("Error recovering provider", e);
+                }
+                return;
+            }
+        } catch (ApplicationSettingsException e) {
+            e.printStackTrace();
+        } catch (KeeperException e) {
+            e.printStackTrace();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        try {
+            // Now we are we have enough data to recover
+            JobDetails jobDetails = new JobDetails();
+            jobDetails.setJobDescription(jobDesc);
+            jobDetails.setJobID(jobId);
+            jobExecutionContext.setJobDetails(jobDetails);
+            if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
+                try {
+                    GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+                } catch (ApplicationSettingsException e) {
+                    log.error(e.getMessage());
+                    throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+                }
+            }
+            delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobId);
+        } catch (GFacHandlerException e) {
+            throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index 4c4be1a..4d8b7c4 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -75,7 +75,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
         }
         List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
                 + File.separator + gfacId, false);
-        log.info("------------------ Recovering Experiments started ------------------------ ");
+            log.info("------------------ Recovering Experiments started ----------------------------------");
         for (String expId : children) {
             log.info("Recovering Experiment: " + expId.split("\\+")[0]);
             log.info("------------------------------------------------------------------------------------");


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata

Posted by la...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/109b679a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/109b679a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/109b679a

Branch: refs/heads/master
Commit: 109b679ad94775136d29b050578b70eac690fb47
Parents: c05fd14 441bf76
Author: lahiru <la...@apache.org>
Authored: Wed Jun 25 16:16:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Jun 25 16:16:45 2014 -0400

----------------------------------------------------------------------
 .../resources/lib/experimentModel_types.cpp     |  74 +-
 .../main/resources/lib/experimentModel_types.h  |  40 +-
 .../Model/Workspace/Experiment/Types.php        |  90 +-
 .../model/workspace/experiment/TaskDetails.java | 314 +++++--
 airavata-api/generate-thrift-files.sh           |   2 +-
 .../airavataAPI.thrift                          |   1 -
 .../applicationCatalogAPI.thrift                |  36 +-
 .../applicationCatalogDataModel.thrift          | 204 -----
 .../applicationDeploymentModel.thrift           | 204 +++++
 .../applicationInterfaceModel.thrift            | 204 +++++
 .../computeHostModel.thrift                     | 204 +++++
 .../experimentModel.thrift                      |  22 +-
 .../gatewayProfileModel.thrift                  | 204 +++++
 .../orchestrator/cpi/OrchestratorService.java   | 846 +++++++++++++++++++
 .../server/OrchestratorServerHandler.java       |   7 +
 .../orchestrator.cpi.service.thrift             |  12 +-
 tools/gsissh/src/main/java/SSHDemo.java         |   2 +-
 .../impl/DefaultSSHApiTestWithMyProxyAuth.java  | 202 +----
 .../gsissh/src/test/resources/log4j.properties  |  34 +
 19 files changed, 2169 insertions(+), 533 deletions(-)
----------------------------------------------------------------------