You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 20:01:07 UTC
airavata git commit: fixing zk client connection closing
Repository: airavata
Updated Branches:
refs/heads/master fc1cffb22 -> 1e040e42b
fixing zk client connection closing
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1e040e42
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1e040e42
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1e040e42
Branch: refs/heads/master
Commit: 1e040e42bc5c62d76e77ecd2babf0f0353ba0231
Parents: fc1cffb
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 14:01:00 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 14:01:00 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/server/GfacServerHandler.java | 16 +++++-----
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 24 +++++++++------
.../core/impl/GFACPassiveJobSubmitter.java | 31 +++++++-------------
3 files changed, 34 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9b282db..799bff0 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -192,14 +192,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
mutex.notify();
break;
case Expired:case Disconnected:
- logger.info("ZK Connection is "+ state.toString());
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- }
+// logger.info("ZK Connection is "+ state.toString());
+// try {
+// zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+// } catch (IOException e) {
+// logger.error(e.getMessage(), e);
+// } catch (ApplicationSettingsException e) {
+// logger.error(e.getMessage(), e);
+// }
// synchronized (mutex) {
// mutex.wait(5000); // waiting for the syncConnected event
// }
http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index df7b840..515b51d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -216,10 +216,13 @@ public class BetterGfacImpl implements GFac,Watcher {
jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
return submitJob(jobExecutionContext);
} catch (Exception e) {
- log.error("Error inovoking the job with experiment ID: " + experimentID);
+ log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
StringWriter errors = new StringWriter();
e.printStackTrace(new PrintWriter(errors));
GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ if(jobExecutionContext!=null){
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+ }
throw new GFacException(e);
}finally {
closeZK(jobExecutionContext);
@@ -959,6 +962,9 @@ public class BetterGfacImpl implements GFac,Watcher {
String experimentPath = null;
try {
try {
+ if(jobExecutionContext.getZk()!=null){
+ closeZK(jobExecutionContext);
+ }
jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
log.info("Waiting until zookeeper client connect to the server...");
synchronized (mutex) {
@@ -1063,7 +1069,7 @@ public class BetterGfacImpl implements GFac,Watcher {
private void closeZK(JobExecutionContext jobExecutionContext) {
try {
- if(jobExecutionContext.getZk()!=null) {
+ if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
jobExecutionContext.getZk().close();
}
} catch (InterruptedException e) {
@@ -1320,13 +1326,13 @@ public class BetterGfacImpl implements GFac,Watcher {
case Expired:
case Disconnected:
log.info("ZK Connection is " + state.toString());
- try {
- zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- }
+// try {
+// zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+// } catch (IOException e) {
+// log.error(e.getMessage(), e);
+// } catch (ApplicationSettingsException e) {
+// log.error(e.getMessage(), e);
+// }
// synchronized (mutex) {
// mutex.wait(5000); // waiting for the syncConnected event
// }
http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 88b9633..cd5b45b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.credential.store.store.CredentialReader;
import org.apache.airavata.gfac.client.GFACInstance;
import org.apache.airavata.gfac.client.GFacClientFactory;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.cpi.GfacService;
import org.apache.airavata.messaging.core.MessageContext;
@@ -101,17 +102,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
* @throws OrchestratorException
*/
public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
-
- ZooKeeper zk = orchestratorContext.getZk();
try {
- if (zk == null || !zk.getState().isConnected()) {
- String zkhostPort = AiravataZKUtils.getZKhostPort();
- zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
- logger.info("Waiting for zookeeper to connect to the server");
- synchronized (mutex) {
- mutex.wait(5000);
- }
- }
String gatewayId = null;
CredentialReader credentialReader = GFacUtils.getCredentialReader();
if (credentialReader != null) {
@@ -128,15 +119,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
publisher.publish(messageContext);
- } catch (InterruptedException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
- } catch (ApplicationSettingsException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- throw new OrchestratorException(e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
@@ -211,12 +193,21 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
logger.error(e.getMessage(), e);
throw new OrchestratorException(e);
}finally {
-
+ closeZK(orchestratorContext);
}
return false;
}
+ private void closeZK(OrchestratorContext orchestratorContext) {
+ try {
+ if(orchestratorContext!=null && orchestratorContext.getZk()!=null) {
+ orchestratorContext.getZk().close();
+ }
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
synchronized public void process(WatchedEvent event) {
logger.info(getClass().getName() + event.getPath());
logger.info(getClass().getName()+event.getType());