You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 20:01:07 UTC

airavata git commit: fixing zk client connection closing

Repository: airavata
Updated Branches:
  refs/heads/master fc1cffb22 -> 1e040e42b


fixing zk client connection closing


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/1e040e42
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/1e040e42
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/1e040e42

Branch: refs/heads/master
Commit: 1e040e42bc5c62d76e77ecd2babf0f0353ba0231
Parents: fc1cffb
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 14:01:00 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 14:01:00 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/server/GfacServerHandler.java | 16 +++++-----
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 24 +++++++++------
 .../core/impl/GFACPassiveJobSubmitter.java      | 31 +++++++-------------
 3 files changed, 34 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
index 9b282db..799bff0 100644
--- a/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
+++ b/modules/gfac/airavata-gfac-service/src/main/java/org/apache/airavata/gfac/server/GfacServerHandler.java
@@ -192,14 +192,14 @@ public class GfacServerHandler implements GfacService.Iface, Watcher {
                     mutex.notify();
                     break;
                 case Expired:case Disconnected:
-                   logger.info("ZK Connection is "+ state.toString());
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        logger.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        logger.error(e.getMessage(), e);
-                    }
+//                   logger.info("ZK Connection is "+ state.toString());
+//                    try {
+//                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+//                    } catch (IOException e) {
+//                        logger.error(e.getMessage(), e);
+//                    } catch (ApplicationSettingsException e) {
+//                        logger.error(e.getMessage(), e);
+//                    }
 //                    synchronized (mutex) {
 //                        mutex.wait(5000);  // waiting for the syncConnected event
 //                    }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index df7b840..515b51d 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -216,10 +216,13 @@ public class BetterGfacImpl implements GFac,Watcher {
             jobExecutionContext = createJEC(experimentID, taskID, gatewayID);
             return submitJob(jobExecutionContext);
         } catch (Exception e) {
-            log.error("Error inovoking the job with experiment ID: " + experimentID);
+            log.error("Error inovoking the job with experiment ID: " + experimentID + ":"+e.getMessage());
             StringWriter errors = new StringWriter();
             e.printStackTrace(new PrintWriter(errors));
             GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+            if(jobExecutionContext!=null){
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
+            }
             throw new GFacException(e);
         }finally {
             closeZK(jobExecutionContext);
@@ -959,6 +962,9 @@ public class BetterGfacImpl implements GFac,Watcher {
         String experimentPath = null;
         try {
             try {
+                if(jobExecutionContext.getZk()!=null){
+                    closeZK(jobExecutionContext);
+                }
                 jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
                 log.info("Waiting until zookeeper client connect to the server...");
                 synchronized (mutex) {
@@ -1063,7 +1069,7 @@ public class BetterGfacImpl implements GFac,Watcher {
 
     private void closeZK(JobExecutionContext jobExecutionContext) {
         try {
-            if(jobExecutionContext.getZk()!=null) {
+            if(jobExecutionContext!=null && jobExecutionContext.getZk()!=null) {
                 jobExecutionContext.getZk().close();
             }
         } catch (InterruptedException e) {
@@ -1320,13 +1326,13 @@ public class BetterGfacImpl implements GFac,Watcher {
                 case Expired:
                 case Disconnected:
                     log.info("ZK Connection is " + state.toString());
-                    try {
-                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
-                    } catch (IOException e) {
-                        log.error(e.getMessage(), e);
-                    } catch (ApplicationSettingsException e) {
-                        log.error(e.getMessage(), e);
-                    }
+//                    try {
+//                        zk = new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this);
+//                    } catch (IOException e) {
+//                        log.error(e.getMessage(), e);
+//                    } catch (ApplicationSettingsException e) {
+//                        log.error(e.getMessage(), e);
+//                    }
 //                    synchronized (mutex) {
 //                        mutex.wait(5000);  // waiting for the syncConnected event
 //                    }

http://git-wip-us.apache.org/repos/asf/airavata/blob/1e040e42/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
index 88b9633..cd5b45b 100644
--- a/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
+++ b/modules/orchestrator/orchestrator-core/src/main/java/org/apache/airavata/orchestrator/core/impl/GFACPassiveJobSubmitter.java
@@ -29,6 +29,7 @@ import org.apache.airavata.common.utils.ServerSettings;
 import org.apache.airavata.credential.store.store.CredentialReader;
 import org.apache.airavata.gfac.client.GFACInstance;
 import org.apache.airavata.gfac.client.GFacClientFactory;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.cpi.GfacService;
 import org.apache.airavata.messaging.core.MessageContext;
@@ -101,17 +102,7 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
      * @throws OrchestratorException
      */
     public boolean submit(String experimentID, String taskID, String tokenId) throws OrchestratorException {
-
-        ZooKeeper zk = orchestratorContext.getZk();
         try {
-            if (zk == null || !zk.getState().isConnected()) {
-                String zkhostPort = AiravataZKUtils.getZKhostPort();
-                zk = new ZooKeeper(zkhostPort, AiravataZKUtils.getZKTimeout(), this);
-                logger.info("Waiting for zookeeper to connect to the server");
-                synchronized (mutex) {
-                    mutex.wait(5000);
-                }
-            }
             String gatewayId = null;
             CredentialReader credentialReader = GFacUtils.getCredentialReader();
             if (credentialReader != null) {
@@ -128,15 +119,6 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             MessageContext messageContext = new MessageContext(taskSubmitEvent, MessageType.LAUNCHTASK, "LAUNCH.TASK-" + UUID.randomUUID().toString(), gatewayId);
             messageContext.setUpdatedTime(AiravataUtils.getCurrentTimestamp());
             publisher.publish(messageContext);
-        } catch (InterruptedException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (ApplicationSettingsException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-            throw new OrchestratorException(e);
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             throw new OrchestratorException(e);
@@ -211,12 +193,21 @@ public class GFACPassiveJobSubmitter implements JobSubmitter,Watcher {
             logger.error(e.getMessage(), e);
             throw new OrchestratorException(e);
         }finally {
-
+            closeZK(orchestratorContext);
         }
         return false;
 
     }
 
+    private void closeZK(OrchestratorContext orchestratorContext) {
+        try {
+            if(orchestratorContext!=null && orchestratorContext.getZk()!=null) {
+                orchestratorContext.getZk().close();
+            }
+        } catch (InterruptedException e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
     synchronized public void process(WatchedEvent event) {
         logger.info(getClass().getName() + event.getPath());
         logger.info(getClass().getName()+event.getType());