You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 16:51:35 UTC

[1/2] airavata git commit: fixing zookeeper issues with max connections

Repository: airavata
Updated Branches:
  refs/heads/master e5f16f2f5 -> bcc5f583e


fixing zookeeper issues with max connections


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2f319c14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2f319c14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2f319c14

Branch: refs/heads/master
Commit: 2f319c143194dfabbe76378b1ad41f0f85ac4dd3
Parents: 851aa62
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Sun Apr 26 01:15:09 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Sun Apr 26 01:15:09 2015 -0400

----------------------------------------------------------------------
 .../AiravataExperimentStatusUpdator.java        |  1 -
 .../airavata/gfac/server/GfacServerHandler.java |  1 +
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 55 ++++++++++++++++----
 .../core/monitor/GfacInternalStatusUpdator.java |  1 +
 .../server/OrchestratorServerHandler.java       |  2 +
 .../util/OrchestratorRecoveryHandler.java       |  1 +
 .../core/impl/GFACPassiveJobSubmitter.java      |  2 +
 7 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index a516eab..0d779b4 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -88,7 +88,6 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
 	            	}else{
 	                state = ExperimentState.EXECUTING; updateExperimentStatus = true;
 	                }
-
                     cleanup(nodeStatus, experimentNode, experimentPath);
 	                break;
 	            case INVOKED:

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index befda78..836f04d 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -102,6 +102,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
             zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);   // no watcher is required, this will only use to store some data
             gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
             gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+            logger.info("Waiting for zookeeper to connect to the server");
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 12533bb..69ed97e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -661,8 +661,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 				// no need to re-run the job
 				log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
 			} else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) {
-				// this is async mode where monitoring of jobs is hapenning, we
-				// have to recover
+				// this is async mode where monitoring of jobs is hapenning, we  have to recover
 				reInvokeProviderExecute(jobExecutionContext);
 			} else if (stateVal == 6) {
 				reInvokeOutFlowHandlers(jobExecutionContext);
@@ -958,18 +957,34 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        String experimentPath = null;
         try {
-            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+             experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage(), e);
+            return;
+        }
+
+        try {
+            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+            log.info("Waiting until zookeeper client connect to the server...");
             synchronized (mutex) {
                 mutex.wait();  // waiting for the syncConnected event
             }
+            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+                log.error("Experiment is already finalized so no output handlers will be invoked");
+                return;
+            }
         } catch (IOException e) {
             log.error(e.getMessage(), e);
         } catch (ApplicationSettingsException e) {
             log.error(e.getMessage(), e);
         } catch (InterruptedException e) {
             log.error(e.getMessage(), e);
+        } catch (KeeperException e) {
+            log.error(e.getMessage(), e);
         }
+
         GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
         List<GFacHandlerConfig> handlers = null;
         if (gFacConfiguration != null) {
@@ -985,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
         for (GFacHandlerConfig handlerClassName : handlers) {
-            if(!isCancelled()) {
+            if (!isCancelled()) {
                 Class<? extends GFacHandler> handlerClass;
                 GFacHandler handler;
                 try {
@@ -1017,15 +1032,15 @@ public class BetterGfacImpl implements GFac,Watcher {
                     try {
                         StringWriter errors = new StringWriter();
                         e.printStackTrace(new PrintWriter(errors));
-                        GFacUtils.saveErrorDetails(jobExecutionContext,  errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                        GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
                     } catch (GFacException e1) {
                         log.error(e1.getLocalizedMessage());
                     }
                     throw new GFacException(e);
-                }finally {
+                } finally {
                     closeZK(jobExecutionContext);
                 }
-            }else{
+            } else {
                 log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
                 break;
             }
@@ -1044,6 +1059,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                 jobExecutionContext.getGatewayID());
         monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
         monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+
     }
 
     private void closeZK(JobExecutionContext jobExecutionContext) {
@@ -1123,14 +1139,35 @@ public class BetterGfacImpl implements GFac,Watcher {
     }
 
     public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+        String experimentPath = null;
         try {
-            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+            experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+        } catch (ApplicationSettingsException e) {
+            log.error(e.getMessage(), e);
+            return;
+        }
 
+        try {
+            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+            log.info("Waiting for zookeeper to connect to the server");
+            synchronized (mutex) {
+                mutex.wait();  // waiting for the syncConnected event
+            }
+            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+                log.error("Experiment is already finalized so no output handlers will be invoked");
+                return;
+            }
         } catch (IOException e) {
             log.error(e.getMessage(), e);
         } catch (ApplicationSettingsException e) {
             log.error(e.getMessage(), e);
+        } catch (InterruptedException e) {
+            log.error(e.getMessage(), e);
+        } catch (KeeperException e) {
+            log.error(e.getMessage(), e);
         }
+
+
         GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
         List<GFacHandlerConfig> handlers = null;
         if (gFacConfiguration != null) {
@@ -1164,7 +1201,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
                     }
                 } else {
-                    log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+                    log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
                     GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
                     handler.initProperties(handlerClassName.getProperties());
                     handler.invoke(jobExecutionContext);

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 4457cac..beb5b7a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -61,6 +61,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                 if (!zk.getState().isConnected()) {
                     String zkhostPort = AiravataZKUtils.getZKhostPort();
                     zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                    logger.info("Waiting for zookeeper to connect to the server");
                     synchronized (mutex) {
                         mutex.wait();
                     }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 06dafb6..dc7d71c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -140,6 +140,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 					// required, this
 					// will only use to
 					// store some data
+					log.info("Waiting for zookeeper to connect to the server");
+
 					String OrchServer = ServerSettings
 							.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
 					synchronized (mutex) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index c0a8890..f90d452 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -71,6 +71,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
     public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
         String zkhostPort = AiravataZKUtils.getZKhostPort();
         zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+        log.info("Waiting for zookeeper to connect to the server");
         synchronized (mutex) {
             mutex.wait();
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 915bddf..ac76618 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -107,6 +107,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
                     mutex.wait();
                 }
@@ -157,6 +158,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             if (zk == null || !zk.getState().isConnected()) {
                 String zkhostPort = AiravataZKUtils.getZKhostPort();
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+                logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
                     mutex.wait();
                 }


[2/2] airavata git commit: fixing zookeeper wait with a timeout

Posted by la...@apache.org.
fixing zookeeper wait with a timeout


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/bcc5f583
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/bcc5f583
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/bcc5f583

Branch: refs/heads/master
Commit: bcc5f583e22db7823b30b982b79fe50e80798c8a
Parents: 2f319c1 e5f16f2
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 10:51:28 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 10:51:28 2015 -0400

----------------------------------------------------------------------
 .../server/src/main/resources/SGETemplate.xslt  |  10 +-
 .../main/resources/airavata-server.properties   |   1 +
 .../airavata/gfac/server/GfacServerHandler.java |   7 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  |  17 +-
 .../gfac/monitor/email/EmailBasedMonitor.java   |  92 +++---
 .../monitor/email/parser/LSFEmailParser.java    |   4 +-
 .../monitor/email/parser/PBSEmailParser.java    |   4 +-
 .../monitor/email/parser/SLURMEmailParser.java  |   6 +-
 .../monitor/email/parser/UGEEmailParser.java    |  99 +++++++
 .../server/OrchestratorServerHandler.java       |   3 +
 .../validator/impl/SimpleAppDataValidator.java  |   2 +-
 .../persistance/registry/jpa/JPAConstants.java  |   1 +
 .../persistance/registry/jpa/ResourceUtils.java |   4 +-
 .../registry/jpa/impl/ExperimentRegistry.java   |  31 +-
 .../registry/jpa/impl/LoggingRegistryImpl.java  |   5 +
 .../registry/jpa/impl/ProjectRegistry.java      |  54 +++-
 .../registry/jpa/impl/RegistryImpl.java         |  34 ++-
 .../jpa/resources/AbstractResource.java         |   7 +-
 .../registry/jpa/resources/Utils.java           |   9 +
 .../registry/jpa/resources/WorkerResource.java  |  95 +++++-
 .../registry/jpa/RegistryUseCaseTest.java       | 290 +++++++++++++++++++
 .../apache/airavata/registry/cpi/Registry.java  |  16 +
 .../airavata/registry/cpi/ResultOrderType.java  |  29 ++
 .../gsissh/src/main/resources/SGETemplate.xslt  |   7 +-
 24 files changed, 729 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --cc modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 836f04d,efd2005..0d1c8d3
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@@ -102,9 -102,8 +102,9 @@@ public class GfacServerHandler implemen
              zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);   // no watcher is required, this will only use to store some data
              gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
              gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
 +            logger.info("Waiting for zookeeper to connect to the server");
              synchronized (mutex) {
--                mutex.wait();  // waiting for the syncConnected event
++                mutex.wait(5000);  // waiting for the syncConnected event
              }
              storeServerConfig();
              logger.info("Finished starting ZK: " + zk);

http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 69ed97e,30889b8..c080312
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@@ -957,23 -958,13 +957,16 @@@ public class BetterGfacImpl implements 
      }
  
      public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
 +        String experimentPath = null;
          try {
-              experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
-         } catch (ApplicationSettingsException e) {
-             log.error(e.getMessage(), e);
-             return;
-         }
- 
-         try {
 -            if (zk == null || !zk.getState().isConnected()){
 -                zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
 -            }
 -            jobExecutionContext.setZk(zk);
 +            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
 +            log.info("Waiting until zookeeper client connect to the server...");
              synchronized (mutex) {
--                mutex.wait();  // waiting for the syncConnected event
++                mutex.wait(5000);  // waiting for the syncConnected event
 +            }
 +            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
 +                log.error("Experiment is already finalized so no output handlers will be invoked");
 +                return;
              }
          } catch (IOException e) {
              log.error(e.getMessage(), e);
@@@ -1139,24 -1127,12 +1133,17 @@@
      }
  
      public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
 +        String experimentPath = null;
          try {
-             experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
-         } catch (ApplicationSettingsException e) {
-             log.error(e.getMessage(), e);
-             return;
-         }
- 
-         try {
 -            if (zk == null || !zk.getState().isConnected()){
 -                zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
 +            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
 +            log.info("Waiting for zookeeper to connect to the server");
 +            synchronized (mutex) {
 +                mutex.wait();  // waiting for the syncConnected event
 +            }
 +            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
 +                log.error("Experiment is already finalized so no output handlers will be invoked");
 +                return;
              }
 -            jobExecutionContext.setZk(zk);
 -
          } catch (IOException e) {
              log.error(e.getMessage(), e);
          } catch (ApplicationSettingsException e) {

http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------