You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2015/04/29 18:24:50 UTC
airavata git commit: fixing more in out handlers
Repository: airavata
Updated Branches:
refs/heads/master b90498f15 -> 8ef27824c
fixing more in out handlers
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/8ef27824
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/8ef27824
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/8ef27824
Branch: refs/heads/master
Commit: 8ef27824c71ecbb467f767d964bf7fc330c67272
Parents: b90498f
Author: Lahiru Gunathilake <gl...@gmail.com>
Authored: Wed Apr 29 12:24:34 2015 -0400
Committer: Lahiru Gunathilake <gl...@gmail.com>
Committed: Wed Apr 29 12:24:34 2015 -0400
----------------------------------------------------------------------
.../client/samples/CreateLaunchExperiment.java | 6 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 148 ++++++++++---------
.../gfac/core/utils/OutHandlerWorker.java | 1 +
3 files changed, 80 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
----------------------------------------------------------------------
diff --git a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
index 1d5de23..51cc760 100644
--- a/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
+++ b/airavata-api/airavata-client-sdks/java-client-samples/src/main/java/org/apache/airavata/client/samples/CreateLaunchExperiment.java
@@ -61,7 +61,7 @@ public class CreateLaunchExperiment {
private static String echoAppId = "Echo_a3c69094-2591-47d9-b5c4-e14c9fa5abf5";
private static String mpiAppId = "HelloMPI_bfd56d58-6085-4b7f-89fc-646576830518";
private static String wrfAppId = "WRF_7ad5da38-c08b-417c-a9ea-da9298839762";
- private static String amberAppId = "Amber_dda3d2bb-e128-4814-9ade-7de9070169be";
+ private static String amberAppId = "Amber_cb54b269-cf79-4276-8dbb-2ec16b759cc6";
private static String gromacsAppId = "GROMACS_05622038-9edd-4cb1-824e-0b7cb993364b";
private static String espressoAppId = "ESPRESSO_10cc2820-5d0b-4c63-9546-8a8b595593c1";
private static String lammpsAppId = "LAMMPS_2472685b-8acf-497e-aafe-cc66fe5f4cb6";
@@ -167,13 +167,13 @@ public class CreateLaunchExperiment {
// final String expId = createEchoExperimentForFSD(airavataClient);
// final String expId = createMPIExperimentForFSD(airavataClient);
// final String expId = createEchoExperimentForStampede(airavataClient);
- final String expId = createEchoExperimentForTrestles(airavataClient);
+// final String expId = createEchoExperimentForTrestles(airavataClient);
// final String expId = createExperimentEchoForLocalHost(airavataClient);
// final String expId = createExperimentWRFTrestles(airavataClient);
// final String expId = createExperimentForBR2(airavataClient);
// final String expId = createExperimentForBR2Amber(airavataClient);
// final String expId = createExperimentWRFStampede(airavataClient);
-// final String expId = createExperimentForStampedeAmber(airavataClient);
+ final String expId = createExperimentForStampedeAmber(airavataClient);
// String expId = createExperimentForTrestlesAmber(airavataClient);
// final String expId = createExperimentGROMACSStampede(airavataClient);
// final String expId = createExperimentESPRESSOStampede(airavataClient);
http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 318d5bd..d64cf42 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -958,86 +958,90 @@ public class BetterGfacImpl implements GFac,Watcher {
public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
String experimentPath = null;
try {
- jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
- log.info("Waiting until zookeeper client connect to the server...");
- synchronized (mutex) {
- mutex.wait(5000); // waiting for the syncConnected event
- }
- if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
- log.error("Experiment is already finalized so no output handlers will be invoked");
- return;
+ try {
+ jobExecutionContext.setZk(new ZooKeeper(AiravataZKUtils.getZKhostPort(), AiravataZKUtils.getZKTimeout(), this));
+ log.info("Waiting until zookeeper client connect to the server...");
+ synchronized (mutex) {
+ mutex.wait(5000); // waiting for the syncConnected event
+ }
+ if (jobExecutionContext.getZk().exists(experimentPath, false) == null) {
+ log.error("Experiment is already finalized so no output handlers will be invoked");
+ return;
+ }
+ } catch (IOException e) {
+ log.error(e.getMessage(), e);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage(), e);
+ } catch (InterruptedException e) {
+ log.error(e.getMessage(), e);
+ } catch (KeeperException e) {
+ log.error(e.getMessage(), e);
}
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- } catch (ApplicationSettingsException e) {
- log.error(e.getMessage(), e);
- } catch (InterruptedException e) {
- log.error(e.getMessage(), e);
- } catch (KeeperException e) {
- log.error(e.getMessage(), e);
- }
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- List<GFacHandlerConfig> handlers = null;
- if (gFacConfiguration != null) {
- handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
- } else {
- try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
- } catch (Exception e) {
- log.error("Error constructing job execution context during outhandler invocation");
- throw new GFacException(e);
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), jobExecutionContext.getGatewayID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
}
- }
- try {
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- if (!isCancelled()) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
- handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
- } catch (Exception e) {
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+ try {
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ if (!isCancelled()) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
try {
- StringWriter errors = new StringWriter();
- e.printStackTrace(new PrintWriter(errors));
- GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
- } catch (GFacException e1) {
- log.error(e1.getLocalizedMessage());
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } catch (Exception e) {
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentity));
+ try {
+ StringWriter errors = new StringWriter();
+ e.printStackTrace(new PrintWriter(errors));
+ GFacUtils.saveErrorDetails(jobExecutionContext, errors.toString(), CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ } catch (GFacException e1) {
+ log.error(e1.getLocalizedMessage());
+ }
+ throw new GFacException(e);
}
- throw new GFacException(e);
+ } else {
+ log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
+ break;
}
- } else {
- log.info("Experiment execution is cancelled, so OutHandler invocation is going to stop");
- break;
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ } catch (Exception e) {
+ throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
}
- }catch (Exception e) {
- throw new GFacException("Cannot invoke OutHandlers\n"+e.getMessage(), e);
- }finally {
+ }catch (Exception e){
+ throw new GFacException("Cannot invoke OutHandlers\n" + e.getMessage(), e);
+ } finally{
closeZK(jobExecutionContext);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/8ef27824/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
index 25905e4..f992f10 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/OutHandlerWorker.java
@@ -65,6 +65,7 @@ public class OutHandlerWorker implements Runnable {
// gfac.invokeOutFlowHandlers(monitorID.getJobExecutionContext());
gfac.invokeOutFlowHandlers(jEC);
} catch (Exception e) {
+ logger.error(e.getMessage(),e);
TaskIdentifier taskIdentifier = new TaskIdentifier(monitorID.getTaskID(), monitorID.getWorkflowNodeID(),monitorID.getExperimentID(), monitorID.getJobExecutionContext().getGatewayID());
//FIXME this is a case where the output retrieving fails even if the job execution was a success. Thus updating the task status
monitorPublisher.publish(new TaskStatusChangeRequestEvent(TaskState.FAILED, taskIdentifier));