You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 17:37:37 UTC

airavata git commit: fixing zookeeper connection closing issues

Repository: airavata
Updated Branches:
  refs/heads/master bcc5f583e -> b90498f15


fixing zookeeper connection closing issues


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b90498f1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b90498f1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b90498f1

Branch: refs/heads/master
Commit: b90498f1559012c0587de3c9125d756e56bbef3c
Parents: bcc5f58
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 11:37:29 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 11:37:29 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java |  18 +--
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 121 ++++++++++---------
 .../core/monitor/GfacInternalStatusUpdator.java |   2 +-
 .../server/OrchestratorServerHandler.java       |   6 +-
 .../util/OrchestratorRecoveryHandler.java       |   2 +-
 .../core/impl/GFACEmbeddedJobSubmitter.java     |  13 +-
 .../core/impl/GFACPassiveJobSubmitter.java      |   4 +-
 .../engine/concurrent/PredicatedTaskRunner.java |   2 -
 8 files changed, 93 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 0d1c8d3..9b282db 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -201,7 +201,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                         logger.error(e.getMessage(), e);
                     }
 //                    synchronized (mutex) {
-//                        mutex.wait();  // waiting for the syncConnected event
+//                        mutex.wait(5000);  // waiting for the syncConnected event
 //                    }
             }
         }
@@ -302,13 +302,17 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
 
     private GFac getGfac() throws TException {
         try {
-            if (zk == null || !zk.getState().isConnected()) {
-                zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-            }
-            return new BetterGfacImpl(registry, appCatalog, zk, publisher);
-        } catch (Exception e) {
-            throw new TException("Error initializing gfac instance", e);
+            return new BetterGfacImpl(registry, appCatalog,null , publisher);
+
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
         }
+        return null;
+
     }
 
     private static  class TestHandler implements MessageHandler{

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index c080312..318d5bd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -109,10 +109,14 @@ public class BetterGfacImpl implements GFac,Watcher {
      * @param zooKeeper
      */
     public BetterGfacImpl(Registry registry,  AppCatalog appCatalog, ZooKeeper zooKeeper,
-                          MonitorPublisher publisher) {
+                          MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
         this.registry = registry;
         monitorPublisher = publisher;     // This is a EventBus common for gfac
-        this.zk = zooKeeper;
+        this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+        log.info("Waiting until zookeeper client connect to the server...");
+        synchronized (mutex) {
+            mutex.wait(5000);  // waiting for the syncConnected event
+        }
         this.appCatalog = appCatalog;
     }
 
@@ -215,9 +219,10 @@ public class BetterGfacImpl implements GFac,Watcher {
             log.error("Error inovoking the job with experiment ID: " + experimentID);
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
-            GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
-            closeZK(jobExecutionContext);
+            GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             throw new GFacException(e);
+        }finally {
+            closeZK(jobExecutionContext);
         }
     }
 
@@ -512,15 +517,12 @@ public class BetterGfacImpl implements GFac,Watcher {
             return true;
         } catch (ApplicationSettingsException e) {
             GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            closeZK(jobExecutionContext);
             throw new GFacException("Error launching the Job", e);
         } catch (KeeperException e) {
             GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-            closeZK(jobExecutionContext);
             throw new GFacException("Error launching the Job", e);
         } catch (InterruptedException e) {
-            GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
-            closeZK(jobExecutionContext);
+            GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             throw new GFacException("Error launching the Job",e);
         }
     }
@@ -531,8 +533,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
             return cancel(jobExecutionContext);
         } catch (Exception e) {
-            GFacUtils.saveErrorDetails(jobExecutionContext,  e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
-            closeZK(jobExecutionContext);
+            GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
             log.error("Error inovoking the job with experiment ID: " + experimentID);
             throw new GFacException(e);
         }
@@ -613,7 +614,6 @@ public class BetterGfacImpl implements GFac,Watcher {
                     }
                     jobExecutionContext.setProperty(ERROR_SENT, "true");
                     jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-                    closeZK(jobExecutionContext);
                     throw new GFacException(e.getMessage(), e);
                 }
 //            }
@@ -626,8 +626,9 @@ public class BetterGfacImpl implements GFac,Watcher {
 //            throw new GFacException(e.getMessage(), e);
         } catch (Exception e) {
             log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
-            closeZK(jobExecutionContext);
             throw new GFacException(e.getMessage(), e);
+        }finally {
+            closeZK(jobExecutionContext);
         }
     }
 
@@ -708,9 +709,8 @@ public class BetterGfacImpl implements GFac,Watcher {
 			}
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-            closeZK(jobExecutionContext);
             throw new GFacException(e.getMessage(), e);
-		}
+        }
     }
 
 	private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
@@ -781,7 +781,6 @@ public class BetterGfacImpl implements GFac,Watcher {
 			}
 			jobExecutionContext.setProperty(ERROR_SENT, "true");
 			jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
-            closeZK(jobExecutionContext);
             throw new GFacException(e.getMessage(), e);
 		}
     }
@@ -991,53 +990,55 @@ public class BetterGfacImpl implements GFac,Watcher {
                 throw new GFacException(e);
             }
         }
-        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-        for (GFacHandlerConfig handlerClassName : handlers) {
-            if (!isCancelled()) {
-                Class<? extends GFacHandler> handlerClass;
-                GFacHandler handler;
-                try {
-                    GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
-                    handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                    handler = handlerClass.newInstance();
-                    handler.initProperties(handlerClassName.getProperties());
-                } catch (ClassNotFoundException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                } catch (InstantiationException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (IllegalAccessException e) {
-                    log.error(e.getMessage());
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                } catch (Exception e) {
-                    throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                }
-                try {
-                    handler.invoke(jobExecutionContext);
-                    GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
-                } catch (Exception e) {
-                    TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                            jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                            jobExecutionContext.getExperimentID(),
-                            jobExecutionContext.getGatewayID());
-                    monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+        try {
+            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+            for (GFacHandlerConfig handlerClassName : handlers) {
+                if (!isCancelled()) {
+                    Class<? extends GFacHandler> handlerClass;
+                    GFacHandler handler;
                     try {
-                        StringWriter errors = new StringWriter();
-                        e.printStackTrace(new PrintWriter(errors));
-                        GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                    } catch (GFacException e1) {
-                        log.error(e1.getLocalizedMessage());
+                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                        handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                        handler = handlerClass.newInstance();
+                        handler.initProperties(handlerClassName.getProperties());
+                    } catch (ClassNotFoundException e) {
+                        log.error(e.getMessage());
+                        throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                    } catch (InstantiationException e) {
+                        log.error(e.getMessage());
+                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                    } catch (IllegalAccessException e) {
+                        log.error(e.getMessage());
+                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
                     }
-                    throw new GFacException(e);
-                } finally {
-                    closeZK(jobExecutionContext);
+                    try {
+                        handler.invoke(jobExecutionContext);
+                        GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+                    } catch (Exception e) {
+                        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                jobExecutionContext.getExperimentID(),
+                                jobExecutionContext.getGatewayID());
+                        monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+                        try {
+                            StringWriter errors = new StringWriter();
+                            e.printStackTrace(new PrintWriter(errors));
+                            GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                        } catch (GFacException e1) {
+                            log.error(e1.getLocalizedMessage());
+                        }
+                        throw new GFacException(e);
+                    }
+                } else {
+                    log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
+                    break;
                 }
-            } else {
-                log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
-                break;
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
             }
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+        }catch (Exception e) {
+            throw new GFacException("Cannot invoke OutHandlers\n"+e.getMessage(), e);
+        }finally {
+            closeZK(jobExecutionContext);
         }
 
         // At this point all the execution is finished so we update the task and experiment statuses.
@@ -1138,7 +1139,7 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
             log.info("Waiting for zookeeper to connect to the server");
             synchronized (mutex) {
-                mutex.wait();  // waiting for the syncConnected event
+                mutex.wait(5000);  // waiting for the syncConnected event
             }
             if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
                 log.error("Experiment is already finalized so no output handlers will be invoked");
@@ -1322,7 +1323,7 @@ public class BetterGfacImpl implements GFac,Watcher {
                         log.error(e.getMessage(), e);
                     }
 //                    synchronized (mutex) {
-//                        mutex.wait();  // waiting for the syncConnected event
+//                        mutex.wait(5000);  // waiting for the syncConnected event
 //                    }
             }
         }

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index beb5b7a..836e2c6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -63,7 +63,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
                     zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
                     logger.info("Waiting for zookeeper to connect to the server");
                     synchronized (mutex) {
-                        mutex.wait();
+                        mutex.wait(5000);
                     }
                 }
                 exists = zk.exists(experimentPath, false);

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 16070e9..18443c1 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -145,7 +145,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
 					String OrchServer = ServerSettings
 							.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
 					synchronized (mutex) {
-						mutex.wait(); // waiting for the syncConnected event
+						mutex.wait(5000); // waiting for the syncConnected event
 					}
 					registerOrchestratorService(airavataServerHostPort, OrchServer);
 					// creating a watch in orchestrator to monitor the gfac
@@ -517,6 +517,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
             if (zk == null || !zk.getState().isConnected()){
                 zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
             }
+			log.info("Waiting for zookeeper to connect to the server");
+			synchronized (mutex){
+				mutex.wait(5000);
+			}
             if (experiment == null) {
                 log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId);
                 throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index f90d452..9d40557 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -73,7 +73,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
         zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
         log.info("Waiting for zookeeper to connect to the server");
         synchronized (mutex) {
-            mutex.wait();
+            mutex.wait(5000);
         }
         List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
                 + File.separator + gfacId, false);

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index 3008c63..9f4d919 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -21,6 +21,7 @@
 package org.apache.airavata.orchestrator.core.impl;
 
 
+import org.apache.airavata.common.exception.ApplicationSettingsException;
 import org.apache.airavata.common.utils.MonitorPublisher;
 import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.store.CredentialReader;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.eventbus.EventBus;
 
+import java.io.IOException;
+
 /**
  * This is the simplest implementation for JobSubmitter,
  * This is calling gfac invocation methods to invoke the gfac embedded mode,so this does not really implement
@@ -51,7 +54,15 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
 
     public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
         this.orchestratorContext = orchestratorContext;
-        gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus()));
+        try {
+            gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus()));
+        } catch (ApplicationSettingsException e) {
+            logger.error(e.getMessage(), e);
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+        }
     }
 
     public GFACInstance selectGFACInstance() throws OrchestratorException {

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index ac76618..88b9633 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -109,7 +109,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
                 logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
-                    mutex.wait();
+                    mutex.wait(5000);
                 }
             }
             String gatewayId = null;
@@ -160,7 +160,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
                 zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
                 logger.info("Waiting for zookeeper to connect to the server");
                 synchronized (mutex) {
-                    mutex.wait();
+                    mutex.wait(5000);
                 }
             }
             String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");

http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
index 6912113..5ce7b1b 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
@@ -100,8 +100,6 @@ public class PredicatedTaskRunner {
 
 				while (!stop) {
 					try {
-
-
 						synchronized (jobQueue) {
 							while(jobQueue.size() == 0 || allTasksAreWaiting(jobQueue)){
 								jobQueue.wait(50);