You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 16:51:35 UTC
[1/2] airavata git commit: fixing zookeeper issues with max
connections
Repository: airavata
Updated Branches:
refs/heads/master e5f16f2f5 -> bcc5f583e
fixing zookeeper issues with max connections
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/2f319c14
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/2f319c14
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/2f319c14
Branch: refs/heads/master
Commit: 2f319c143194dfabbe76378b1ad41f0f85ac4dd3
Parents: 851aa62
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Sun Apr 26 01:15:09 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Sun Apr 26 01:15:09 2015 -0400
----------------------------------------------------------------------
.../AiravataExperimentStatusUpdator.java | 1 -
.../airavata/gfac/server/GfacServerHandler.java | 1 +
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 55 ++++++++++++++++----
.../core/monitor/GfacInternalStatusUpdator.java | 1 +
.../server/OrchestratorServerHandler.java | 2 +
.../util/OrchestratorRecoveryHandler.java | 1 +
.../core/impl/GFACPassiveJobSubmitter.java | 2 +
7 files changed, 53 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
index a516eab..0d779b4 100644
--- a/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
+++ b/airavata-api/airavata-api-server/src/main/java/org/apache/airavata/api/server/listener/AiravataExperimentStatusUpdator.java
@@ -88,7 +88,6 @@ public class AiravataExperimentStatusUpdator implements AbstractActivityListener
}else{
state = ExperimentState.EXECUTING; updateExperimentStatus = true;
}
-
cleanup(nodeStatus, experimentNode, experimentPath);
break;
case INVOKED:
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index befda78..836f04d 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -102,6 +102,7 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 12533bb..69ed97e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -661,8 +661,7 @@ public class BetterGfacImpl implements GFac,Watcher {
// no need to re-run the job
log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
} else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) {
- // this is async mode where monitoring of jobs is hapenning, we
- // have to recover
+ // this is async mode where monitoring of jobs is hapenning, we have to recover
reInvokeProviderExecute(jobExecutionContext);
} else if (stateVal == 6) {
reInvokeOutFlowHandlers(jobExecutionContext);
@@ -958,18 +957,34 @@ public class BetterGfacImpl implements GFac,Watcher {
}
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ String experimentPath = null;
try {
- jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+ experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage(), e);
+ return;
+ }
+
+ try {
+ jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+ log.info("Waiting until zookeeper client connect to the server...");
synchronized (mutex) {
mutex.wait(); // waiting for the syncConnected event
}
+ if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
+ }
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (ApplicationSettingsException e) {
log.error(e.getMessage(), e);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
+ } catch (KeeperException e) {
+ log.error(e.getMessage(), e);
}
+
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
List<GFacHandlerConfig> handlers = null;
if (gFacConfiguration != null) {
@@ -985,7 +1000,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
for (GFacHandlerConfig handlerClassName : handlers) {
- if(!isCancelled()) {
+ if (!isCancelled()) {
Class<? extends GFacHandler> handlerClass;
GFacHandler handler;
try {
@@ -1017,15 +1032,15 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
} catch (GFacException e1) {
log.error(e1.getLocalizedMessage());
}
throw new GFacException(e);
- }finally {
+ } finally {
closeZK(jobExecutionContext);
}
- }else{
+ } else {
log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
break;
}
@@ -1044,6 +1059,7 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext.getGatewayID());
monitorPublisher.publish(new TaskStatusChangeEvent(TaskState.COMPLETED, taskIdentity));
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+
}
private void closeZK(JobExecutionContext jobExecutionContext) {
@@ -1123,14 +1139,35 @@ public class BetterGfacImpl implements GFac,Watcher {
}
public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ String experimentPath = null;
try {
- jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this));
+ experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage(), e);
+ return;
+ }
+ try {
+ jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+ log.info("Waiting for zookeeper to connect to the server");
+ synchronized (mutex) {
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
+ }
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (ApplicationSettingsException e) {
log.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ log.error(e.getMessage(), e);
+ } catch (KeeperException e) {
+ log.error(e.getMessage(), e);
}
+
+
GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
List<GFacHandlerConfig> handlers = null;
if (gFacConfiguration != null) {
@@ -1164,7 +1201,7 @@ public class BetterGfacImpl implements GFac,Watcher {
log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
}
} else {
- log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+ log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
handler.initProperties(handlerClassName.getProperties());
handler.invoke(jobExecutionContext);
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
index 4457cac..beb5b7a 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/monitor/GfacInternalStatusUpdator.java
@@ -61,6 +61,7 @@ public class GfacInternalStatusUpdator implements AbstractActivityListener, Watc
if (!zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+ logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
mutex.wait();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
index 06dafb6..dc7d71c 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
@@ -140,6 +140,8 @@ public class OrchestratorServerHandler implements OrchestratorService.Iface,
// required, this
// will only use to
// store some data
+ log.info("Waiting for zookeeper to connect to the server");
+
String OrchServer = ServerSettings
.getSetting(org.apache.airavata.common.utils.Constants.ZOOKEEPER_ORCHESTRATOR_SERVER_NODE);
synchronized (mutex) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index c0a8890..f90d452 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -71,6 +71,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
public void recover() throws OrchestratorException, ApplicationSettingsException, IOException, KeeperException, InterruptedException {
String zkhostPort = AiravataZKUtils.getZKhostPort();
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+ log.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
mutex.wait();
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/2f319c14/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 915bddf..ac76618 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -107,6 +107,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+ logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
mutex.wait();
}
@@ -157,6 +158,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
if (zk == null || !zk.getState().isConnected()) {
String zkhostPort = AiravataZKUtils.getZKhostPort();
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
+ logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
mutex.wait();
}
[2/2] airavata git commit: fixing zookeeper wait with a timeout
Posted by la...@apache.org.
fixing zookeeper wait with a timeout
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/bcc5f583
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/bcc5f583
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/bcc5f583
Branch: refs/heads/master
Commit: bcc5f583e22db7823b30b982b79fe50e80798c8a
Parents: 2f319c1 e5f16f2
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 10:51:28 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 10:51:28 2015 -0400
----------------------------------------------------------------------
.../server/src/main/resources/SGETemplate.xslt | 10 +-
.../main/resources/airavata-server.properties | 1 +
.../airavata/gfac/server/GfacServerHandler.java | 7 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 17 +-
.../gfac/monitor/email/EmailBasedMonitor.java | 92 +++---
.../monitor/email/parser/LSFEmailParser.java | 4 +-
.../monitor/email/parser/PBSEmailParser.java | 4 +-
.../monitor/email/parser/SLURMEmailParser.java | 6 +-
.../monitor/email/parser/UGEEmailParser.java | 99 +++++++
.../server/OrchestratorServerHandler.java | 3 +
.../validator/impl/SimpleAppDataValidator.java | 2 +-
.../persistance/registry/jpa/JPAConstants.java | 1 +
.../persistance/registry/jpa/ResourceUtils.java | 4 +-
.../registry/jpa/impl/ExperimentRegistry.java | 31 +-
.../registry/jpa/impl/LoggingRegistryImpl.java | 5 +
.../registry/jpa/impl/ProjectRegistry.java | 54 +++-
.../registry/jpa/impl/RegistryImpl.java | 34 ++-
.../jpa/resources/AbstractResource.java | 7 +-
.../registry/jpa/resources/Utils.java | 9 +
.../registry/jpa/resources/WorkerResource.java | 95 +++++-
.../registry/jpa/RegistryUseCaseTest.java | 290 +++++++++++++++++++
.../apache/airavata/registry/cpi/Registry.java | 16 +
.../airavata/registry/cpi/ResultOrderType.java | 29 ++
.../gsissh/src/main/resources/SGETemplate.xslt | 7 +-
24 files changed, 729 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --cc modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 836f04d,efd2005..0d1c8d3
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@@ -102,9 -102,8 +102,9 @@@ public class GfacServerHandler implemen
zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this); // no watcher is required, this will only use to store some data
gfacServer = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_SERVER_NODE, "/gfac-server");
gfacExperiments = ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE, "/gfac-experiments");
+ logger.info("Waiting for zookeeper to connect to the server");
synchronized (mutex) {
-- mutex.wait(); // waiting for the syncConnected event
++ mutex.wait(5000); // waiting for the syncConnected event
}
storeServerConfig();
logger.info("Finished starting ZK: " + zk);
http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --cc modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 69ed97e,30889b8..c080312
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@@ -957,23 -958,13 +957,16 @@@ public class BetterGfacImpl implements
}
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ String experimentPath = null;
try {
- experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- return;
- }
-
- try {
- if (zk == null || !zk.getState().isConnected()){
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
- }
- jobExecutionContext.setZk(zk);
+ jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+ log.info("Waiting until zookeeper client connect to the server...");
synchronized (mutex) {
-- mutex.wait(); // waiting for the syncConnected event
++ mutex.wait(5000); // waiting for the syncConnected event
+ }
+ if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
}
} catch (IOException e) {
log.error(e.getMessage(), e);
@@@ -1139,24 -1127,12 +1133,17 @@@
}
public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ String experimentPath = null;
try {
- experimentPath = AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID());
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- return;
- }
-
- try {
- if (zk == null || !zk.getState().isConnected()){
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(),this);
+ jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+ log.info("Waiting for zookeeper to connect to the server");
+ synchronized (mutex) {
+ mutex.wait(); // waiting for the syncConnected event
+ }
+ if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
}
- jobExecutionContext.setZk(zk);
-
} catch (IOException e) {
log.error(e.getMessage(), e);
} catch (ApplicationSettingsException e) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/bcc5f583/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/server/OrchestratorServerHandler.java
----------------------------------------------------------------------