You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 18:24:50 UTC

airavata git commit: fixing more in out handlers

Repository: airavata
Updated Branches:
  refs/heads/master b90498f15 -> 8ef27824c


fixing more in out handlers


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8ef27824
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8ef27824
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8ef27824

Branch: refs/heads/master
Commit: 8ef27824c71ecbb467f767d964bf7fc330c67272
Parents: b90498f
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 12:24:34 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 12:24:34 2015 -0400

----------------------------------------------------------------------
 .../client/samples/CreateLaunchExperiment.java  |   6 +-
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 148 ++++++++++---------
 .../gfac/core/utils/OutHandlerWorker.java       |   1 +
 3 files changed, 80 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 1d5de23..51cc760 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
     private static String echoAppId = "Echo_a3c69094-2591-47d9-b5c4-e14c9fa5abf5";
     private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
     private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
-    private static String amberAppId = "Amber_dda3d2bb-e128-4814-9ade-7de9070169be";
+    private static String amberAppId = "Amber_cb54b269-cf79-4276-8dbb-2ec16b759cc6";
     private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
     private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
     private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -167,13 +167,13 @@ public class CreateLaunchExperiment {
 //                final String expId = createEchoExperimentForFSD(airavataClient);
 //                final String expId = createMPIExperimentForFSD(airavataClient);
 //               final String expId = createEchoExperimentForStampede(airavataClient);
-                final String expId = createEchoExperimentForTrestles(airavataClient);
+//                final String expId = createEchoExperimentForTrestles(airavataClient);
 //                final String expId = createExperimentEchoForLocalHost(airavataClient);
 //                final String expId = createExperimentWRFTrestles(airavataClient);
 //                final String expId = createExperimentForBR2(airavataClient);
 //                final String expId = createExperimentForBR2Amber(airavataClient);
 //                final String expId = createExperimentWRFStampede(airavataClient);
-//                final String expId = createExperimentForStampedeAmber(airavataClient);
+                final String expId = createExperimentForStampedeAmber(airavataClient);
 //                String expId = createExperimentForTrestlesAmber(airavataClient);
 //                final String expId = createExperimentGROMACSStampede(airavataClient);
 //                final String expId = createExperimentESPRESSOStampede(airavataClient);

http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 318d5bd..d64cf42 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -958,86 +958,90 @@ public class BetterGfacImpl implements GFac,Watcher {
     public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
         String experimentPath = null;
         try {
-            jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
-            log.info("Waiting until zookeeper client connect to the server...");
-            synchronized (mutex) {
-                mutex.wait(5000);  // waiting for the syncConnected event
-            }
-            if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
-                log.error("Experiment is already finalized so no output handlers will be invoked");
-                return;
+            try {
+                jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+                log.info("Waiting until zookeeper client connect to the server...");
+                synchronized (mutex) {
+                    mutex.wait(5000);  // waiting for the syncConnected event
+                }
+                if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+                    log.error("Experiment is already finalized so no output handlers will be invoked");
+                    return;
+                }
+            } catch (IOException e) {
+                log.error(e.getMessage(), e);
+            } catch (ApplicationSettingsException e) {
+                log.error(e.getMessage(), e);
+            } catch (InterruptedException e) {
+                log.error(e.getMessage(), e);
+            } catch (KeeperException e) {
+                log.error(e.getMessage(), e);
             }
-        } catch (IOException e) {
-            log.error(e.getMessage(), e);
-        } catch (ApplicationSettingsException e) {
-            log.error(e.getMessage(), e);
-        } catch (InterruptedException e) {
-            log.error(e.getMessage(), e);
-        } catch (KeeperException e) {
-            log.error(e.getMessage(), e);
-        }
 
-        GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
-        List<GFacHandlerConfig> handlers = null;
-        if (gFacConfiguration != null) {
-            handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
-        } else {
-            try {
-                jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
-                        jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
-            } catch (Exception e) {
-                log.error("Error constructing job execution context during outhandler invocation");
-                throw new GFacException(e);
+            GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+            List<GFacHandlerConfig> handlers = null;
+            if (gFacConfiguration != null) {
+                handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+            } else {
+                try {
+                    jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+                            jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+                } catch (Exception e) {
+                    log.error("Error constructing job execution context during outhandler invocation");
+                    throw new GFacException(e);
+                }
             }
-        }
-        try {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
-            for (GFacHandlerConfig handlerClassName : handlers) {
-                if (!isCancelled()) {
-                    Class<? extends GFacHandler> handlerClass;
-                    GFacHandler handler;
-                    try {
-                        GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
-                        handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
-                        handler = handlerClass.newInstance();
-                        handler.initProperties(handlerClassName.getProperties());
-                    } catch (ClassNotFoundException e) {
-                        log.error(e.getMessage());
-                        throw new GFacException("Cannot load handler class " + handlerClassName, e);
-                    } catch (InstantiationException e) {
-                        log.error(e.getMessage());
-                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                    } catch (IllegalAccessException e) {
-                        log.error(e.getMessage());
-                        throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
-                    }
-                    try {
-                        handler.invoke(jobExecutionContext);
-                        GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
-                    } catch (Exception e) {
-                        TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
-                                jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
-                                jobExecutionContext.getExperimentID(),
-                                jobExecutionContext.getGatewayID());
-                        monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+            try {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+                for (GFacHandlerConfig handlerClassName : handlers) {
+                    if (!isCancelled()) {
+                        Class<? extends GFacHandler> handlerClass;
+                        GFacHandler handler;
+                        try {
+                            GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+                            handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+                            handler = handlerClass.newInstance();
+                            handler.initProperties(handlerClassName.getProperties());
+                        } catch (ClassNotFoundException e) {
+                            log.error(e.getMessage());
+                            throw new GFacException("Cannot load handler class " + handlerClassName, e);
+                        } catch (InstantiationException e) {
+                            log.error(e.getMessage());
+                            throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                        } catch (IllegalAccessException e) {
+                            log.error(e.getMessage());
+                            throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+                        }
                         try {
-                            StringWriter errors = new StringWriter();
-                            e.printStackTrace(new PrintWriter(errors));
-                            GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
-                        } catch (GFacException e1) {
-                            log.error(e1.getLocalizedMessage());
+                            handler.invoke(jobExecutionContext);
+                            GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+                        } catch (Exception e) {
+                            TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+                                    jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+                                    jobExecutionContext.getExperimentID(),
+                                    jobExecutionContext.getGatewayID());
+                            monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+                            try {
+                                StringWriter errors = new StringWriter();
+                                e.printStackTrace(new PrintWriter(errors));
+                                GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+                            } catch (GFacException e1) {
+                                log.error(e1.getLocalizedMessage());
+                            }
+                            throw new GFacException(e);
                         }
-                        throw new GFacException(e);
+                    } else {
+                        log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
+                        break;
                     }
-                } else {
-                    log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
-                    break;
+                    monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
                 }
-                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+            } catch (Exception e) {
+                throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
             }
-        }catch (Exception e) {
-            throw new GFacException("Cannot invoke OutHandlers\n"+e.getMessage(), e);
-        }finally {
+        }catch (Exception e){
+            throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
+        } finally{
             closeZK(jobExecutionContext);
         }
 

http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
index 25905e4..f992f10 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -65,6 +65,7 @@ public class OutHandlerWorker implements Runnable {
 //            gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
             gfac.invokeOutFlowHandlers(jEC);
         } catch (Exception e) {
+            logger.error(e.getMessage(),e);
             TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
             //FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
             monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));