You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by la...@apache.org on 2014/06/25 22:17:27 UTC
[1/2] git commit: Committing the version 1.0 of Zookeeper
implementation
Repository: airavata
Updated Branches:
refs/heads/master 441bf7658 -> 109b679ad
Committing the version 1.0 of Zookeeper implementation
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c05fd14b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c05fd14b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c05fd14b
Branch: refs/heads/master
Commit: c05fd14b01589d0d69b90fbf1b2d7a21667f205e
Parents: 236c04a
Author: lahiru <la...@apache.org>
Authored: Wed Jun 25 16:15:49 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Jun 25 16:15:49 2014 -0400
----------------------------------------------------------------------
.../main/resources/airavata-server.properties | 4 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 310 ++++++++++---------
.../core/handler/AppDescriptorCheckHandler.java | 38 +--
.../core/provider/GFacRecoverableProvider.java | 3 +-
.../gfac/core/states/GfacPluginState.java | 4 +-
.../airavata/gfac/core/utils/GFacUtils.java | 136 +++++---
.../gsissh/provider/impl/GSISSHProvider.java | 161 +++++++---
.../util/OrchestratorRecoveryHandler.java | 2 +-
8 files changed, 381 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/configuration/server/src/main/resources/airavata-server.properties
----------------------------------------------------------------------
diff --git a/modules/configuration/server/src/main/resources/airavata-server.properties b/modules/configuration/server/src/main/resources/airavata-server.properties
index 726225a..ecc0932 100644
--- a/modules/configuration/server/src/main/resources/airavata-server.properties
+++ b/modules/configuration/server/src/main/resources/airavata-server.properties
@@ -279,8 +279,8 @@ connection.name=xsede
activity.listeners=org.apache.airavata.gfac.core.monitor.AiravataJobStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataTaskStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataWorkflowNodeStatusUpdator,org.apache.airavata.gfac.core.monitor.AiravataExperimentStatusUpdator,org.apache.airavata.gfac.core.monitor.GfacInternalStatusUpdator
###---------------------------Orchestrator module Configurations---------------------------###
-job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
-#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
+#job.submitter=org.apache.airavata.orchestrator.core.impl.GFACEmbeddedJobSubmitter
+job.submitter=org.apache.airavata.orchestrator.core.impl.GFACServiceJobSubmitter
job.validators=org.apache.airavata.orchestrator.core.validator.impl.SimpleAppDataValidator,org.apache.airavata.orchestrator.core.validator.impl.ExperimentStatusValidator
submitter.interval=10000
threadpool.size=10
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 4bfbb68..dc635d1 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -53,6 +53,7 @@ import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.core.notification.listeners.WorkflowTrackingListener;
import org.apache.airavata.gfac.core.provider.GFacProvider;
+import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider;
import org.apache.airavata.gfac.core.scheduler.HostScheduler;
import org.apache.airavata.gfac.core.states.GfacPluginState;
@@ -302,9 +303,10 @@ public class BetterGfacImpl implements GFac {
} else if (stateVal >= 8) {
log.info("There is nothing to recover in this job so we do not re-submit");
ZKUtil.deleteRecursive(zk,
- AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(),jobExecutionContext.getTaskData().getTaskID()));
+ AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID()));
} else {
// Now we know this is an old Job, so we have to handle things gracefully
+ log.info("Re-launching the job in GFac because this is re-submitted to GFac");
reLaunch(jobExecutionContext, stateVal);
}
return true;
@@ -318,7 +320,7 @@ public class BetterGfacImpl implements GFac {
return true;
}
- private void reLaunch(JobExecutionContext jobExecutionContext,int stateVal) throws GFacException {
+ private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider which handles
// the job.
String experimentID = jobExecutionContext.getExperimentID();
@@ -332,17 +334,17 @@ public class BetterGfacImpl implements GFac {
// After executing the in handlers provider instance should be set to job execution context.
// We get the provider instance and execute it.
- if(stateVal == 2 || stateVal == 3){
+ if (stateVal == 2 || stateVal == 3) {
invokeProvider(jobExecutionContext); // provider never ran in previous invocation
- }else if(stateVal == 4){ // whether sync or async job have to invoke the recovering because it crashed in the Handler
+ } else if (stateVal == 4) { // whether sync or async job have to invoke the recovering because it crashed in the Handler
reInvokeProvider(jobExecutionContext);
- }else if(stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)){
+ } else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)) {
// In this case we do nothing because provider ran successfully, no need to re-run the job
log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
- } else if(stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)){
+ } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) {
// this is async mode where monitoring of jobs is hapenning, we have to recover
reInvokeProvider(jobExecutionContext);
- } else{
+ } else {
log.info("We skip invoking Handler, because the experiment state is beyond the Provider Invocation !!!");
log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
}
@@ -419,13 +421,15 @@ public class BetterGfacImpl implements GFac {
}
}
- private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void invokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, ApplicationSettingsException, InterruptedException, KeeperException {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
@@ -433,22 +437,33 @@ public class BetterGfacImpl implements GFac {
}
}
- private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void reInvokeProvider(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- if (provider instanceof GFacRecoverableProvider) {
- ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
+ String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
+ if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
+ if (provider instanceof GFacRecoverableProvider) {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+ ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
+ }
} else {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
}
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
- if (GFacUtils.isSynchronousMode(jobExecutionContext)) {
+
+ if (GFacUtils.isSynchronousMode(jobExecutionContext))
+
+ {
invokeOutFlowHandlers(jobExecutionContext);
}
+
}
@@ -507,7 +522,7 @@ public class BetterGfacImpl implements GFac {
}
try {
handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
// if exception thrown before that we do not make it finished
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
@@ -520,9 +535,69 @@ public class BetterGfacImpl implements GFac {
}
}
+ public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
+ }
+ launch(jobExecutionContext);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ handler.initProperties(handlerClassName.getProperties());
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (Exception e) {
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ }
+ try {
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
+ }
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ }
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
+ }
+
/**
* If handlers ran successfully we re-run only recoverable handlers
* If handler never ran we run the normal invoke method
+ *
* @param jobExecutionContext
* @throws GFacException
*/
@@ -537,17 +612,23 @@ public class BetterGfacImpl implements GFac {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- if (!GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName())) {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
- handler.initProperties(handlerClassName.getProperties());
- handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
- } else {
+ String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+ if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {
if (handler instanceof GFacRecoverableHandler) {
// if these already ran we re-run only recoverable handlers
+ log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
((GFacRecoverableHandler) handler).recover(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } else {
+ log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
}
+ } else {
+ log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+ handler.initProperties(handlerClassName.getProperties());
+ handler.invoke(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
}
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
@@ -567,141 +648,74 @@ public class BetterGfacImpl implements GFac {
}
public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
- int stateVal = -1;
- try {
- stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (stateVal >= 0 && stateVal < 6) {
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- List<GFacHandlerConfig> handlers = null;
- if (gFacConfiguration != null) {
- handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
- } else {
- try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- log.error("Error constructing job execution context during outhandler invocation");
- throw new GFacException(e);
- }
- launch(jobExecutionContext);
- }
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
- handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
- handler.invoke(jobExecutionContext);
- } catch (Exception e) {
- // TODO: Better error reporting.
- throw new GFacException("Error Executing a OutFlow Handler", e);
- }
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
+ List<GFacHandlerConfig> handlers = null;
+ if (gFacConfiguration != null) {
+ handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
+ } else {
+ try {
+ jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
+ } catch (Exception e) {
+ log.error("Error constructing job execution context during outhandler invocation");
+ throw new GFacException(e);
}
-
- // At this point all the execution is finished so we update the task and experiment statuses.
- // Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
- // Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
- ));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
- }
- }
-
- public void invokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
- int stateVal = -1;
- try {
- stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext);
- } catch (ApplicationSettingsException e) {
- e.printStackTrace();
- } catch (KeeperException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ launch(jobExecutionContext);
}
- if (stateVal >= 0 && stateVal < 6) {
- GFacConfiguration gFacConfiguration = jobExecutionContext.getGFacConfiguration();
- List<GFacHandlerConfig> handlers = null;
- if (gFacConfiguration != null) {
- handlers = jobExecutionContext.getGFacConfiguration().getOutHandlers();
- } else {
- try {
- jobExecutionContext = createJEC(jobExecutionContext.getExperimentID(), jobExecutionContext.getTaskData().getTaskID());
- } catch (Exception e) {
- log.error("Error constructing job execution context during outhandler invocation");
- throw new GFacException(e);
- }
- launch(jobExecutionContext);
- }
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
- for (GFacHandlerConfig handlerClassName : handlers) {
- Class<? extends GFacHandler> handlerClass;
- GFacHandler handler;
- try {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName());
- handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
- handler = handlerClass.newInstance();
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKING));
+ for (GFacHandlerConfig handlerClassName : handlers) {
+ Class<? extends GFacHandler> handlerClass;
+ GFacHandler handler;
+ try {
+ handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
+ handler = handlerClass.newInstance();
+ String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+ if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {
+ if (handler instanceof GFacRecoverableHandler) {
+ // if these already ran we re-run only recoverable handlers
+ log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+ ((GFacRecoverableHandler) handler).recover(jobExecutionContext);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } else {
+ log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
+ }
+ } else {
+ log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
handler.initProperties(handlerClassName.getProperties());
- } catch (ClassNotFoundException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot load handler class " + handlerClassName, e);
- } catch (InstantiationException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (IllegalAccessException e) {
- log.error(e.getMessage());
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- } catch (Exception e) {
- throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
- }
- try {
handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKED);
- } catch (Exception e) {
- // TODO: Better error reporting.
- throw new GFacException("Error Executing a OutFlow Handler", e);
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
}
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+ handler.initProperties(handlerClassName.getProperties());
+ handler.invoke(jobExecutionContext);
+ } catch (ClassNotFoundException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot load handler class " + handlerClassName, e);
+ } catch (InstantiationException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (IllegalAccessException e) {
+ log.error(e.getMessage());
+ throw new GFacException("Cannot instantiate handler class " + handlerClassName, e);
+ } catch (Exception e) {
+ // TODO: Better error reporting.
+ throw new GFacException("Error Executing a OutFlow Handler", e);
}
-
- // At this point all the execution is finished so we update the task and experiment statuses.
- // Handler authors does not have to worry about updating experiment or task statuses.
- monitorPublisher.publish(new
- ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
- ExperimentState.COMPLETED));
- // Updating the task status if there's any task associated
- monitorPublisher.publish(new TaskStatusChangeRequest(
- new TaskIdentity(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
- ));
- monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.OUTHANDLERSINVOKED));
+
+ // At this point all the execution is finished so we update the task and experiment statuses.
+ // Handler authors does not have to worry about updating experiment or task statuses.
+ monitorPublisher.publish(new
+ ExperimentStatusChangeRequest(new ExperimentIdentity(jobExecutionContext.getExperimentID()),
+ ExperimentState.COMPLETED));
+ // Updating the task status if there's any task associated
+ monitorPublisher.publish(new TaskStatusChangeRequest(
+ new TaskIdentity(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getTaskData().getTaskID()), TaskState.COMPLETED
+ ));
+ monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.COMPLETED));
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 86b89c1..2183f59 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -25,6 +25,7 @@ import org.apache.airavata.common.utils.AiravataZKUtils;
import org.apache.airavata.commons.gfac.type.ApplicationDescription;
import org.apache.airavata.gfac.Constants;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.schemas.gfac.ApplicationDeploymentDescriptionType;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -91,20 +92,11 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler {
logger.info("Recoverable data is saving to zk: " + data.toString());
- try {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
- String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getTaskData().getTaskID(), this.getClass().getName());
- Stat exists = zk.exists(expZnodeHandlerPath, false);
- jobExecutionContext.getZk().setData(expZnodeHandlerPath, data.toString().getBytes(), exists.getVersion());
- }
- } catch (Exception e) {
- throw new GFacHandlerException(e);
- }
-
+ GFacUtils.savePluginData(jobExecutionContext, data,this.getClass().getName());
}
+
+
public void initProperties(Properties properties) throws GFacHandlerException {
}
@@ -113,20 +105,14 @@ public class AppDescriptorCheckHandler implements GFacRecoverableHandler {
ApplicationDescription app = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription();
ApplicationDeploymentDescriptionType appDesc = app.getType();
try {
- ZooKeeper zk = jobExecutionContext.getZk();
- if (zk != null) {
- String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getTaskData().getTaskID(), this.getClass().getName());
- Stat exists = zk.exists(expZnodeHandlerPath, false);
- String s = new String(jobExecutionContext.getZk().getData(expZnodeHandlerPath, false, exists));
- String[] split = s.split(",");
- appDesc.setScratchWorkingDirectory(split[0]);
- appDesc.setStaticWorkingDirectory(split[1]);
- appDesc.setInputDataDirectory(split[2]);
- appDesc.setOutputDataDirectory(split[3]);
- appDesc.setStandardOutput(split[4]);
- appDesc.setStandardError(split[5]);
- }
+ String s = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+ String[] split = s.split(",");
+ appDesc.setScratchWorkingDirectory(split[0]);
+ appDesc.setStaticWorkingDirectory(split[1]);
+ appDesc.setInputDataDirectory(split[2]);
+ appDesc.setOutputDataDirectory(split[3]);
+ appDesc.setStandardOutput(split[4]);
+ appDesc.setStandardError(split[5]);
} catch (Exception e) {
throw new GFacHandlerException(e);
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
index 6f98cf6..6cc9820 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.gfac.core.provider;
+import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
/**
@@ -39,5 +40,5 @@ public interface GFacRecoverableProvider extends GFacProvider {
*
* @param jobExecutionContext
*/
- public void recover(JobExecutionContext jobExecutionContext);
+ public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
index 53df1d4..b934303 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
@@ -22,8 +22,8 @@ package org.apache.airavata.gfac.core.states;
public enum GfacPluginState {
INVOKING(0),
-
- INVOKED(1);
+ INVOKED(1),
+ COMPLETED(2);
private final int value;
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index b3c6211..e2594bf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -37,6 +37,7 @@ import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacConfiguration;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.model.workspace.experiment.*;
@@ -702,41 +703,42 @@ public class GFacUtils {
exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
if (exists != null) {
- zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, GfacPluginState.INVOKING.toString().getBytes(),
+ zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(GfacPluginState.INVOKING.getValue()).getBytes(),
exists.getVersion());
}
return true;
}
public static boolean createPluginZnode(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
- GfacPluginState state)throws ApplicationSettingsException, KeeperException, InterruptedException {
- String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
- jobExecutionContext.getTaskData().getTaskID(), className);
- Stat exists = zk.exists(expState, false);
- if (exists == null) {
- zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ GfacPluginState state) throws ApplicationSettingsException, KeeperException, InterruptedException {
+ String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+ Stat exists = zk.exists(expState, false);
+ if (exists == null) {
+ zk.create(expState, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
+ new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists == null) {
zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
- } else {
- exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
- if (exists == null) {
- zk.create(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE,
- new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
}
+ }
- exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
- if (exists != null) {
- zk.setData(expState + File.separator +
- AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
- exists.getVersion());
- }
- return true;
+ exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
+ if (exists != null) {
+ zk.setData(expState + File.separator +
+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
+ exists.getVersion()
+ );
}
+ return true;
+ }
public static boolean updatePluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className,
GfacPluginState state)
@@ -748,25 +750,25 @@ public class GFacUtils {
if (exists != null) {
zk.setData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, String.valueOf(state.getValue()).getBytes(),
exists.getVersion());
- }else {
- createPluginZnode(zk, jobExecutionContext, className,state);
+ } else {
+ createPluginZnode(zk, jobExecutionContext, className, state);
}
return true;
}
- public static boolean getPluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) {
+ public static String getPluginState(ZooKeeper zk, JobExecutionContext jobExecutionContext, String className) {
try {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
jobExecutionContext.getTaskData().getTaskID(), className);
Stat exists = zk.exists(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
if (exists != null) {
- return Boolean.valueOf(new String(zk.getData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false, exists)));
+ return new String(zk.getData(expState + File.separator + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false, exists));
}
- return false; // if the node doesn't exist or any other error we return false
+ return null; // if the node doesn't exist or any other error we return false
} catch (Exception e) {
e.printStackTrace();
- return false;
+ return null;
}
}
@@ -781,42 +783,51 @@ public class GFacUtils {
for (String gfacServerNode : runningGfacNodeNames) {
if (!gfacServerNode.equals(pickedChild)) {
foundExperimentPath = experimentNode + File.separator + gfacServerNode +
- File.separator + experimentID + "+" + taskID;
+ File.separator + experimentID + "+" + taskID;
exists1 = zk.exists(foundExperimentPath, false);
- break;
+ if(exists1 != null) { // when the experiment is found we break the loop
+ break;
+ }
}
}
if (exists1 == null) { // OK this is a pretty new experiment so we are going to create a new node
+ log.info("This is a new Job, so creating all the experiment docs from the scratch");
zk.create(newExpNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
zk.create(newExpNode + File.separator + "state", String.valueOf(GfacExperimentState.LAUNCHED.getValue()).getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }else {
+ } else {
// ohhh this node exists in some other failed gfac folder, we have to move it to this gfac experiment list,safely
- zk.create(newExpNode, zk.getData(foundExperimentPath,false,exists1), ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
+ log.info("This is an old Job, so copying data from old experiment location");
+ zk.create(newExpNode, zk.getData(foundExperimentPath, false, exists1), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
List<String> children = zk.getChildren(foundExperimentPath, false);
- for(String childNode1:children){
- String level1 = foundExperimentPath+File.separator+childNode1;
- Stat exists2 = zk.exists(level1,false); // no need to check exists
- String newLeve1 = newExpNode+File.separator+childNode1;
- zk.create(newLeve1,zk.getData(
- level1,false,exists2),ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- for(String childNode2:zk.getChildren(level1,false)){
- String level2 = level1+File.separator+childNode2;
- Stat exists3 = zk.exists(level2,false); // no need to check exists
- String newLeve2 = newLeve1+File.separator+childNode2;
- zk.create(newLeve2,zk.getData(level2,false,exists3),ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- zk.delete(level2, exists3.getVersion());
+ for (String childNode1 : children) {
+ String level1 = foundExperimentPath + File.separator + childNode1;
+ Stat exists2 = zk.exists(level1, false); // no need to check exists
+ String newLeve1 = newExpNode + File.separator + childNode1;
+ log.info("Creating new znode: " + newLeve1); // these has to be info logs
+ zk.create(newLeve1, zk.getData(
+ level1, false, exists2), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT
+ );
+ for (String childNode2 : zk.getChildren(level1, false)) {
+ String level2 = level1 + File.separator + childNode2;
+ Stat exists3 = zk.exists(level2, false); // no need to check exists
+ String newLeve2 = newLeve1 + File.separator + childNode2;
+ log.info("Creating new znode: " + newLeve2);
+ zk.create(newLeve2, zk.getData(level2, false, exists3), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
}
- zk.delete(level1,exists2.getVersion());
}
- zk.delete(foundExperimentPath,exists1.getVersion());
+ // After all the files are successfully transfered we delete the old experiment,otherwise we do
+ // not delete a single file
+ log.info("After a successful copying of experiment data for an old experiment we delete the old data");
+ log.info("Deleting experiment data: " + foundExperimentPath);
+ ZKUtil.deleteRecursive(zk,foundExperimentPath);
}
} else {
log.error("ExperimentID: " + experimentID + " taskID: " + taskID + " is already running by this Gfac instance");
@@ -828,10 +839,35 @@ public class GFacUtils {
break;
}
}
- ZKUtil.deleteRecursive(zk,foundExperimentPath);
+ ZKUtil.deleteRecursive(zk, foundExperimentPath);
return false;
}
return true;
}
+ public static void savePluginData(JobExecutionContext jobExecutionContext, StringBuffer data, String className) throws GFacHandlerException {
+ try {
+ ZooKeeper zk = jobExecutionContext.getZk();
+ if (zk != null) {
+ String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+ Stat exists = zk.exists(expZnodeHandlerPath, false);
+ zk.setData(expZnodeHandlerPath, data.toString().getBytes(), exists.getVersion());
+ }
+ } catch (Exception e) {
+ throw new GFacHandlerException(e);
+ }
+ }
+
+ public static String getPluginData(JobExecutionContext jobExecutionContext, String className) throws ApplicationSettingsException, KeeperException, InterruptedException {
+ ZooKeeper zk = jobExecutionContext.getZk();
+ if (zk != null) {
+ String expZnodeHandlerPath = AiravataZKUtils.getExpZnodeHandlerPath(jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getTaskData().getTaskID(), className);
+ Stat exists = zk.exists(expZnodeHandlerPath, false);
+ return new String(jobExecutionContext.getZk().getData(expZnodeHandlerPath, false, exists));
+ }
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 262b832..6b39819 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -20,16 +20,19 @@
*/
package org.apache.airavata.gfac.gsissh.provider.impl;
+import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.cpi.GFacImpl;
import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider;
+import org.apache.airavata.gfac.core.provider.GFacProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -44,6 +47,7 @@ import org.apache.airavata.model.workspace.experiment.JobState;
import org.apache.airavata.schemas.gfac.GsisshHostType;
import org.apache.airavata.schemas.gfac.HostDescriptionType;
import org.apache.airavata.schemas.gfac.HpcApplicationDeploymentType;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,18 +62,18 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
}
public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- super.initialize(jobExecutionContext);
+ super.initialize(jobExecutionContext);
}
public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
log.info("Invoking GSISSH Provider Invoke ...");
+ StringBuffer data = new StringBuffer();
jobExecutionContext.getNotifier().publish(new StartExecutionEvent());
HostDescriptionType host = jobExecutionContext.getApplicationContext().
getHostDescription().getType();
HpcApplicationDeploymentType app = (HpcApplicationDeploymentType) jobExecutionContext.getApplicationContext().
getApplicationDeploymentDescription().getType();
JobDetails jobDetails = new JobDetails();
- String taskID = jobExecutionContext.getTaskData().getTaskID();
try {
Cluster cluster = null;
if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) != null) {
@@ -84,70 +88,78 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
JobDescriptor jobDescriptor = GFACGSISSHUtils.createJobDescriptor(jobExecutionContext, app, cluster);
log.info(jobDescriptor.toXML());
-
+ data.append("jobDesc=").append(jobDescriptor.toXML());
jobDetails.setJobDescription(jobDescriptor.toXML());
-
+
String jobID = cluster.submitBatchJob(jobDescriptor);
jobExecutionContext.setJobDetails(jobDetails);
- if(jobID == null){
+ if (jobID == null) {
jobDetails.setJobID("none");
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
- }else{
+ } else {
jobDetails.setJobID(jobID);
GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED);
}
-
+ data.append(",jobId=").append(jobDetails.getJobID());
// Now job has submitted to the resource, its up to the Provider to parse the information to daemon handler
// to perform monitoring, daemon handlers can be accessed from anywhere
- List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
- if(daemonHandlers == null){
- daemonHandlers = BetterGfacImpl.getDaemonHandlers();
- }
- ThreadedHandler pullMonitorHandler = null;
- ThreadedHandler pushMonitorHandler = null;
- String monitorMode = ((GsisshHostType) host).getMonitorMode();
- for(ThreadedHandler threadedHandler:daemonHandlers){
- if("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())){
- pullMonitorHandler = threadedHandler;
- if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)){
- log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
- pullMonitorHandler.invoke(jobExecutionContext);
- }else{
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
- " to handle by the GridPullMonitorHandler");
- }
- }else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())){
- pushMonitorHandler = threadedHandler;
- if("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)){
- log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
- pushMonitorHandler.invoke(jobExecutionContext);
- }else{
- log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
- " to handle by the GridPushMonitorHandler");
- }
- }
- // have to handle the GridPushMonitorHandler logic
- }
- if(pullMonitorHandler == null && pushMonitorHandler==null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())){
- log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
- ", execution is configured as asynchronous, so Outhandler will not be invoked");
- }
+ delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobID);
// we know this host is type GsiSSHHostType
} catch (SSHApiException e) {
String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
} catch (Exception e) {
- String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
- log.error(error);
+ String error = "Error submitting the job to host " + host.getHostAddress() + " message: " + e.getMessage();
+ log.error(error);
jobDetails.setJobID("none");
- GFacUtils.saveJobStatus(jobExecutionContext, jobDetails,JobState.FAILED);
- GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
+ GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.FAILED);
+ GFacUtils.saveErrorDetails(jobExecutionContext, error, CorrectiveAction.CONTACT_SUPPORT, ErrorCategory.AIRAVATA_INTERNAL_ERROR);
throw new GFacProviderException(error, e);
+ } finally {
+ log.info("Saving data for future recovery: ");
+ log.info(data.toString());
+ GFacUtils.savePluginData(jobExecutionContext, data, this.getClass().getName());
+ }
+ }
+
+ public void delegateToMonitorHandlers(JobExecutionContext jobExecutionContext, GsisshHostType host, String jobID) throws GFacHandlerException {
+ List<ThreadedHandler> daemonHandlers = GFacImpl.getDaemonHandlers();
+ if (daemonHandlers == null) {
+ daemonHandlers = BetterGfacImpl.getDaemonHandlers();
+ }
+ ThreadedHandler pullMonitorHandler = null;
+ ThreadedHandler pushMonitorHandler = null;
+ String monitorMode = ((GsisshHostType) host).getMonitorMode();
+ for (ThreadedHandler threadedHandler : daemonHandlers) {
+ if ("org.apache.airavata.gfac.monitor.handlers.GridPullMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pullMonitorHandler = threadedHandler;
+ if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PULL.equals(monitorMode)) {
+ log.info("Job is launched successfully now parsing it to monitoring in pull mode, JobID Returned: " + jobID);
+ pullMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PULL" +
+ " to handle by the GridPullMonitorHandler");
+ }
+ } else if ("org.apache.airavata.gfac.monitor.handlers.GridPushMonitorHandler".equals(threadedHandler.getClass().getName())) {
+ pushMonitorHandler = threadedHandler;
+ if ("".equals(monitorMode) || monitorMode == null || org.apache.airavata.common.utils.Constants.PUSH.equals(monitorMode)) {
+ log.info("Job is launched successfully now parsing it to monitoring in push mode, JobID Returned: " + jobID);
+ pushMonitorHandler.invoke(jobExecutionContext);
+ } else {
+ log.error("Currently we only support Pull and Push monitoring and monitorMode should be PUSH" +
+ " to handle by the GridPushMonitorHandler");
+ }
+ }
+ // have to handle the GridPushMonitorHandler logic
+ }
+ if (pullMonitorHandler == null && pushMonitorHandler == null && ExecutionMode.ASYNCHRONOUS.equals(jobExecutionContext.getGFacConfiguration().getExecutionMode())) {
+ log.error("No Daemon handler is configured in gfac-config.xml, either pull or push, so monitoring will not invoked" +
+ ", execution is configured as asynchronous, so Outhandler will not be invoked");
}
}
@@ -159,8 +171,63 @@ public class GSISSHProvider extends AbstractRecoverableProvider {
//To change body of implemented methods use File | Settings | File Templates.
}
- public void recover(JobExecutionContext jobExecutionContext) {
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException,GFacException {
// have to implement the logic to recover a gfac failure
log.info("Invoking Recovering for the Experiment: " + jobExecutionContext.getExperimentID());
+ HostDescriptionType host = jobExecutionContext.getApplicationContext().
+ getHostDescription().getType();
+ String jobId = "";
+ String jobDesc = "";
+ try {
+ String pluginData = GFacUtils.getPluginData(jobExecutionContext, this.getClass().getName());
+ String[] split = pluginData.split(",");
+ if (split.length < 2) {
+ try {
+ this.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException("Error recovering provider", e);
+ }
+ return;
+ }
+ jobDesc = split[0].substring(7);
+ jobId = split[1].substring(6);
+
+ log.info("Following data have recovered: ");
+ log.info("Job Description: " + jobDesc);
+ log.info("Job Id: " + jobId);
+ if (jobId == null || "none".equals(jobId) ||
+ "".equals(jobId)) {
+ try {
+ this.execute(jobExecutionContext);
+ } catch (GFacException e) {
+ throw new GFacProviderException("Error recovering provider", e);
+ }
+ return;
+ }
+ } catch (ApplicationSettingsException e) {
+ e.printStackTrace();
+ } catch (KeeperException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ try {
+ // Now we are we have enough data to recover
+ JobDetails jobDetails = new JobDetails();
+ jobDetails.setJobDescription(jobDesc);
+ jobDetails.setJobID(jobId);
+ jobExecutionContext.setJobDetails(jobDetails);
+ if (jobExecutionContext.getSecurityContext(GSISecurityContext.GSI_SECURITY_CONTEXT) == null) {
+ try {
+ GFACGSISSHUtils.addSecurityContext(jobExecutionContext);
+ } catch (ApplicationSettingsException e) {
+ log.error(e.getMessage());
+ throw new GFacHandlerException("Error while creating SSHSecurityContext", e, e.getLocalizedMessage());
+ }
+ }
+ delegateToMonitorHandlers(jobExecutionContext, (GsisshHostType) host, jobId);
+ } catch (GFacHandlerException e) {
+ throw new GFacProviderException("Error delegating already ran job to Monitoring", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/c05fd14b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
----------------------------------------------------------------------
diff --git a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
index 4c4be1a..4d8b7c4 100644
--- a/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
+++ b/modules/orchestrator/airavata-orchestrator-service/src/main/java/org/apache/airavata/orchestrator/util/OrchestratorRecoveryHandler.java
@@ -75,7 +75,7 @@ public class OrchestratorRecoveryHandler implements Watcher {
}
List<String> children = zk.getChildren(ServerSettings.getSetting(Constants.ZOOKEEPER_GFAC_EXPERIMENT_NODE)
+ File.separator + gfacId, false);
- log.info("------------------ Recovering Experiments started ------------------------ ");
+ log.info("------------------ Recovering Experiments started ----------------------------------");
for (String expId : children) {
log.info("Recovering Experiment: " + expId.split("\\+")[0]);
log.info("------------------------------------------------------------------------------------");
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by la...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/109b679a
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/109b679a
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/109b679a
Branch: refs/heads/master
Commit: 109b679ad94775136d29b050578b70eac690fb47
Parents: c05fd14 441bf76
Author: lahiru <la...@apache.org>
Authored: Wed Jun 25 16:16:45 2014 -0400
Committer: lahiru <la...@apache.org>
Committed: Wed Jun 25 16:16:45 2014 -0400
----------------------------------------------------------------------
.../resources/lib/experimentModel_types.cpp | 74 +-
.../main/resources/lib/experimentModel_types.h | 40 +-
.../Model/Workspace/Experiment/Types.php | 90 +-
.../model/workspace/experiment/TaskDetails.java | 314 +++++--
airavata-api/generate-thrift-files.sh | 2 +-
.../airavataAPI.thrift | 1 -
.../applicationCatalogAPI.thrift | 36 +-
.../applicationCatalogDataModel.thrift | 204 -----
.../applicationDeploymentModel.thrift | 204 +++++
.../applicationInterfaceModel.thrift | 204 +++++
.../computeHostModel.thrift | 204 +++++
.../experimentModel.thrift | 22 +-
.../gatewayProfileModel.thrift | 204 +++++
.../orchestrator/cpi/OrchestratorService.java | 846 +++++++++++++++++++
.../server/OrchestratorServerHandler.java | 7 +
.../orchestrator.cpi.service.thrift | 12 +-
tools/gsissh/src/main/java/SSHDemo.java | 2 +-
.../impl/DefaultSSHApiTestWithMyProxyAuth.java | 202 +----
.../gsissh/src/test/resources/log4j.properties | 34 +
19 files changed, 2169 insertions(+), 533 deletions(-)
----------------------------------------------------------------------