You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 17:37:37 UTC
airavata git commit: fixing zookeeper connection closing issues
Repository: airavata
Updated Branches:
refs/heads/master bcc5f583e -> b90498f15
fixing zookeeper connection closing issues
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/b90498f1
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/b90498f1
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/b90498f1
Branch: refs/heads/master
Commit: b90498f1559012c0587de3c9125d756e56bbef3c
Parents: bcc5f58
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 11:37:29 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 11:37:29 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 18 +--
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 121 ++++++++++---------
.../core/monitor/GfacInternalStatusUpdator.java | 2 +-
.../server/OrchestratorServerHandler.java | 6 +-
.../util/OrchestratorRecoveryHandler.java | 2 +-
.../core/impl/GFACEmbeddedJobSubmitter.java | 13 +-
.../core/impl/GFACPassiveJobSubmitter.java | 4 +-
.../engine/concurrent/PredicatedTaskRunner.java | 2 -
8 files changed, 93 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 0d1c8d3..9b282db 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -201,7 +201,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
logger.error(e.getMessage(), e);
}
// synchronized (mutex) {
-// mutex.wait(); // waiting for the syncConnected event
+// mutex.wait(5000); // waiting for the syncConnected event
// }
}
}
@@ -302,13 +302,17 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
private GFac getGfac() throws TException {
try {
- if (zk == null || !zk.getState().isConnected()) {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- }
- return new BetterGfacImpl(registry, appCatalog, zk, publisher);
- } catch (Exception e) {
- throw new TException("Error initializing gfac instance", e);
+ return new BetterGfacImpl(registry, appCatalog,null , publisher);
+
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
}
+ return null;
+
}
private static class TestHandler implements MessageHandler{
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index c080312..318d5bd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -109,10 +109,14 @@ public class BetterGfacImpl implements GFac,Watcher {
* @param zooKeeper
*/
public BetterGfacImpl(Registry registry, AppCatalog appCatalog, ZooKeeper zooKeeper,
- MonitorPublisher publisher) {
+ MonitorPublisher publisher) throws ApplicationSettingsException, IOException, InterruptedException {
this.registry = registry;
monitorPublisher = publisher; // This is a EventBus common for gfac
- this.zk = zooKeeper;
+ this.zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+ log.info("Waiting until zookeeper client connect to the server...");
+ synchronized (mutex) {
+ mutex.wait(5000); // waiting for the syncConnected event
+ }
this.appCatalog = appCatalog;
}
@@ -215,9 +219,10 @@ public class BetterGfacImpl implements GFac,Watcher {
log.error("Error inovoking the job with experiment ID: " + experimentID);
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
- closeZK(jobExecutionContext);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacException(e);
+ }finally {
+ closeZK(jobExecutionContext);
}
}
@@ -512,15 +517,12 @@ public class BetterGfacImpl implements GFac,Watcher {
return true;
} catch (ApplicationSettingsException e) {
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- closeZK(jobExecutionContext);
throw new GFacException("Error launching the Job", e);
} catch (KeeperException e) {
GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- closeZK(jobExecutionContext);
throw new GFacException("Error launching the Job", e);
} catch (InterruptedException e) {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
- closeZK(jobExecutionContext);
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacException("Error launching the Job",e);
}
}
@@ -531,8 +533,7 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
return cancel(jobExecutionContext);
} catch (Exception e) {
- GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR );
- closeZK(jobExecutionContext);
+ GFacUtils.saveErrorDetails(jobExecutionContext, e.getCause().toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
log.error("Error inovoking the job with experiment ID: " + experimentID);
throw new GFacException(e);
}
@@ -613,7 +614,6 @@ public class BetterGfacImpl implements GFac,Watcher {
}
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
- closeZK(jobExecutionContext);
throw new GFacException(e.getMessage(), e);
}
// }
@@ -626,8 +626,9 @@ public class BetterGfacImpl implements GFac,Watcher {
// throw new GFacException(e.getMessage(), e);
} catch (Exception e) {
log.error("Error occured while cancelling job for experiment : " + jobExecutionContext.getExperimentID(), e);
- closeZK(jobExecutionContext);
throw new GFacException(e.getMessage(), e);
+ }finally {
+ closeZK(jobExecutionContext);
}
}
@@ -708,9 +709,8 @@ public class BetterGfacImpl implements GFac,Watcher {
}
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
- closeZK(jobExecutionContext);
throw new GFacException(e.getMessage(), e);
- }
+ }
}
private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
@@ -781,7 +781,6 @@ public class BetterGfacImpl implements GFac,Watcher {
}
jobExecutionContext.setProperty(ERROR_SENT, "true");
jobExecutionContext.getNotifier().publish(new ExecutionFailEvent(e.getCause()));
- closeZK(jobExecutionContext);
throw new GFacException(e.getMessage(), e);
}
}
@@ -991,53 +990,55 @@ public class BetterGfacImpl implements GFac,Watcher {
throw new GFacException(e);
}
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- if (!isCancelled()) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (Exception e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
- handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
- } catch (Exception e) {
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ if (!isCancelled()) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
}
- throw new GFacException(e);
- } finally {
- closeZK(jobExecutionContext);
+ try {
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } catch (Exception e) {
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacException(e);
+ }
+ } else {
+ log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
+ break;
}
- } else {
- log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
- break;
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ }catch (Exception e) {
+ throw new GFacException("Cannot invoke OutHandlers\n"+e.getMessage(), e);
+ }finally {
+ closeZK(jobExecutionContext);
}
// At this point all the execution is finished so we update the task and experiment statuses.
@@ -1138,7 +1139,7 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
log.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
- mutex.wait(); // waiting for the syncConnected event
+ mutex.wait(5000); // waiting for the syncConnected event
}
if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
log.error("Experiment is already finalized so no output handlers will be invoked");
@@ -1322,7 +1323,7 @@ public class BetterGfacImpl implements GFac,Watcher {
log.error(e.getMessage(), e);
}
// synchronized (mutex) {
-// mutex.wait(); // waiting for the syncConnected event
+// mutex.wait(5000); // waiting for the syncConnected event
// }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index beb5b7a..836e2c6 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -63,7 +63,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
- mutex.wait();
+ mutex.wait(5000);
}
}
exists = zk.exists(experimentPath, false);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 16070e9..18443c1 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -145,7 +145,7 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
String OrchServer = ServerSettings
.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
synchronized (mutex) {
- mutex.wait(); // waiting for the syncConnected event
+ mutex.wait(5000); // waiting for the syncConnected event
}
registerOrchestratorService(airavataServerHostPort, OrchServer);
// creating a watch in orchestrator to monitor the gfac
@@ -517,6 +517,10 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
if (zk == null || !zk.getState().isConnected()){
zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
}
+ log.info("Waiting for zookeeper to connect to the server");
+ synchronized (mutex){
+ mutex.wait(5000);
+ }
if (experiment == null) {
log.errorId(experimentId, "Error retrieving the Experiment by the given experimentID: {}.", experimentId);
throw new OrchestratorException("Error retrieving the Experiment by the given experimentID: " + experimentId);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index f90d452..9d40557 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -73,7 +73,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
log.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
- mutex.wait();
+ mutex.wait(5000);
}
List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+ File.separator + gfacId, false);
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
index 3008c63..9f4d919 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACEmbeddedJobSubmitter.java
@@ -21,6 +21,7 @@
package org.apache.airavata.orchestrator.core.impl;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
@@ -36,6 +37,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.eventbus.EventBus;
+import java.io.IOException;
+
/**
* This is the simplest implementation for JobSubmitter,
* This is calling gfac invocation methods to invoke the gfac embedded mode,so this does not really implement
@@ -51,7 +54,15 @@ public class GFACEmbeddedJobSubmitter implements JobSubmitter {
public void initialize(OrchestratorContext orchestratorContext) throws OrchestratorException {
this.orchestratorContext = orchestratorContext;
- gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus()));
+ try {
+ gfac = new BetterGfacImpl(orchestratorContext.getNewRegistry(),null, null, new MonitorPublisher(new EventBus()));
+ } catch (ApplicationSettingsException e) {
+ logger.error(e.getMessage(), e);
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
}
public GFACInstance selectGFACInstance() throws OrchestratorException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index ac76618..88b9633 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -109,7 +109,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
- mutex.wait();
+ mutex.wait(5000);
}
}
String gatewayId = null;
@@ -160,7 +160,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
- mutex.wait();
+ mutex.wait(5000);
}
}
String gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
http://git-wip-us.apache.org/repos/asf/airavata/blob/b90498f1/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
----------------------------------------------------------------------
diff --git a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
index 6912113..5ce7b1b 100644
--- a/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
+++ b/modules/workflow-model/workflow-engine/src/main/java/org/apache/airavata/workflow/engine/concurrent/PredicatedTaskRunner.java
@@ -100,8 +100,6 @@ public class PredicatedTaskRunner {
while (!stop) {
try {
-
-
synchronized (jobQueue) {
while(jobQueue.size() == 0 || allTasksAreWaiting(jobQueue)){
jobQueue.wait(50);