You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/10 22:41:09 UTC
[1/5] airavata git commit: refactored ,
submitJob and relaunch methods logic with states.
Repository: airavata
Updated Branches:
refs/heads/master 34a840108 -> e4be39e81
refactored , submitJob and relaunch methods logic with states.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4819dbb2
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4819dbb2
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4819dbb2
Branch: refs/heads/master
Commit: 4819dbb2c4f05f8ec4790bcbda641556e19944d1
Parents: 7023991
Author: shamrath <sh...@gmail.com>
Authored: Sun May 10 11:10:41 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Sun May 10 11:10:41 2015 -0400
----------------------------------------------------------------------
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 109 +++++++++++--------
1 file changed, 65 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4819dbb2/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index fcb1394..98ba942 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -494,7 +494,7 @@ public class BetterGfacImpl implements GFac,Watcher {
String experimentEntry = GFacUtils.findExperimentEntry(jobExecutionContext.getExperimentID(), zk);
Stat exists = zk.exists(experimentEntry + File.separator + "operation", false);
zk.getData(experimentEntry + File.separator + "operation", this, exists);
- int stateVal = GFacUtils.getZKExperimentStateValue(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
+ GfacExperimentState gfacExpState = GFacUtils.getZKExperimentState(zk, jobExecutionContext); // this is the original state came, if we query again it might be different,so we preserve this state in the environment
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
, GfacExperimentState.ACCEPTED)); // immediately we get the request we update the status
String workflowInstanceID = null;
@@ -505,17 +505,17 @@ public class BetterGfacImpl implements GFac,Watcher {
}
// Register log event listener. This is required in all scenarios.
jobExecutionContext.getNotificationService().registerListener(new LoggingListener());
- if (stateVal < 2) {
+ if (isNewJob(gfacExpState)) {
// In this scenario We do everything from the beginning
launch(jobExecutionContext);
- } else if (stateVal >= 8) {
+ } else if (isCompletedJob(gfacExpState)) {
log.info("There is nothing to recover in this job so we do not re-submit");
ZKUtil.deleteRecursive(zk,
AiravataZKUtils.getExpZnodePath(jobExecutionContext.getExperimentID()));
} else {
// Now we know this is an old Job, so we have to handle things gracefully
log.info("Re-launching the job in GFac because this is re-submitted to GFac");
- reLaunch(jobExecutionContext, stateVal);
+ reLaunch(jobExecutionContext, gfacExpState);
}
return true;
} catch (ApplicationSettingsException e) {
@@ -530,6 +530,27 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
+ private boolean isCompletedJob(GfacExperimentState gfacExpState) {
+ switch (gfacExpState) {
+ case COMPLETED:
+ case FAILED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ private boolean isNewJob(GfacExperimentState stateVal) {
+ switch (stateVal) {
+ case UNKNOWN:
+ case LAUNCHED:
+ case ACCEPTED:
+ return true;
+ default:
+ return false;
+ }
+ }
+
public boolean cancel(String experimentID, String taskID, String gatewayID) throws GFacException {
JobExecutionContext jobExecutionContext = null;
try {
@@ -635,7 +656,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void reLaunch(JobExecutionContext jobExecutionContext, int stateVal) throws GFacException {
+ private void reLaunch(JobExecutionContext jobExecutionContext, GfacExperimentState state) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider
// which handles
// the job.
@@ -648,47 +669,46 @@ public class BetterGfacImpl implements GFac,Watcher {
// here we do not skip handler if some handler does not have to be
// run again during re-run it can implement
// that logic in to the handler
- reInvokeInFlowHandlers(jobExecutionContext);
- // After executing the in handlers provider instance should be set
- // to job execution context.
- // We get the provider instance and execute it.
- if (stateVal == 2 || stateVal == 3) {
- invokeProviderExecute(jobExecutionContext); // provider never ran in
- // previous invocation
- } else if (stateVal == 4) { // whether sync or async job have to
- // invoke the recovering because it
- // crashed in the Handler
- reInvokeProviderExecute(jobExecutionContext);
- } else if (stateVal >= 5 && GFacUtils.isSynchronousMode(jobExecutionContext)) {
- // In this case we do nothing because provider ran successfully,
- // no need to re-run the job
- log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
- } else if (stateVal == 5 && !GFacUtils.isSynchronousMode(jobExecutionContext)) {
- // this is async mode where monitoring of jobs is hapenning, we have to recover
- reInvokeProviderExecute(jobExecutionContext);
- } else if (stateVal == 6) {
- reInvokeOutFlowHandlers(jobExecutionContext);
- } else {
- log.info("We skip invoking Handler, because the experiment:" + stateVal + " state is beyond the Provider Invocation !!!");
- log.info("ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
- }
+ // After executing the in handlers provider instance should be set
+ // to job execution context.
+ // We get the provider instance and execute it.
+ switch (state) {
+ case INHANDLERSINVOKING:
+ reInvokeInFlowHandlers(jobExecutionContext);
+ case INHANDLERSINVOKED:
+ invokeProviderExecute(jobExecutionContext);
+ break;
+ case PROVIDERINVOKING:
+ reInvokeProviderExecute(jobExecutionContext);
+ break;
+ case PROVIDERINVOKED:
+ // no need to re-run the job
+ log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
+ if (!GFacUtils.isSynchronousMode(jobExecutionContext)) {
+ monitorJob(jobExecutionContext);
+ } else {
+ // TODO - Need to handle this correctly , for now we will invoke ouput handlers.
+ invokeOutFlowHandlers(jobExecutionContext);
+ }
+ break;
+ case OUTHANDLERSINVOKING:
+ reInvokeOutFlowHandlers(jobExecutionContext);
+ break;
+ case OUTHANDLERSINVOKED:
+ case COMPLETED:
+ case FAILED:
+ case UNKNOWN:
+ log.info("All output handlers are invoked successfully, ExperimentId: " + experimentID + " taskId: " + jobExecutionContext.getTaskData().getTaskID());
+ break;
+ default:
+ throw new GFacException("Un-handled GfacExperimentState : " + state.name());
+ }
} catch (Exception e) {
log.error(e.getMessage(),e);
try {
// we make the experiment as failed due to exception scenario
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.FAILED));
- // monitorPublisher.publish(new
- // ExperimentStatusChangedEvent(new
- // ExperimentIdentity(jobExecutionContext.getExperimentID()),
- // ExperimentState.FAILED));
- // Updating the task status if there's any task associated
- // monitorPublisher.publish(new TaskStatusChangedEvent(
- // new TaskIdentity(jobExecutionContext.getExperimentID(),
- // jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- // jobExecutionContext.getTaskData().getTaskID()),
- // TaskState.FAILED
- // ));
JobIdentifier jobIdentity = new JobIdentifier(
jobExecutionContext.getJobDetails().getJobID(), jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
@@ -699,9 +719,6 @@ public class BetterGfacImpl implements GFac,Watcher {
} catch (NullPointerException e1) {
log.error("Error occured during updating the statuses of Experiments,tasks or Job statuses to failed, "
+ "NullPointerException occurred because at this point there might not have Job Created", e1, e);
-// monitorPublisher
-// .publish(new ExperimentStatusChangedEvent(new ExperimentIdentity(jobExecutionContext.getExperimentID()), ExperimentState.FAILED));
- // Updating the task status if there's any task associated
TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
jobExecutionContext.getExperimentID(),
@@ -716,7 +733,11 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
- private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
+ private void monitorJob(JobExecutionContext jobExecutionContext) {
+ // TODO - Auto generated message.
+ }
+
+ private void launch(JobExecutionContext jobExecutionContext) throws GFacException {
// Scheduler will decide the execution flow of handlers and provider
// which handles
// the job.
[3/5] airavata git commit: Simplified StandardOutReader.
Posted by sh...@apache.org.
Simplified StandardOutReader.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/fd6e0a0b
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/fd6e0a0b
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/fd6e0a0b
Branch: refs/heads/master
Commit: fd6e0a0be55ca20e811a8b6ccd4288d8ce330eae
Parents: a3487e7
Author: shamrath <sh...@gmail.com>
Authored: Sun May 10 16:15:30 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Sun May 10 16:15:30 2015 -0400
----------------------------------------------------------------------
.../apache/airavata/gsi/ssh/impl/StandardOutReader.java | 10 +---------
1 file changed, 1 insertion(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/fd6e0a0b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
index 00cbe01..767fc61 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
@@ -40,22 +40,14 @@ public class StandardOutReader implements CommandOutput {
public void onOutput(Channel channel) {
try {
StringBuffer pbsOutput = new StringBuffer("");
-// try {
-// Thread.sleep(1000);
-// } catch (InterruptedException e) {
-// e.printStackTrace();
-// }
InputStream inputStream = channel.getInputStream();
byte[] tmp = new byte[1024];
- while (true) {
+ while (!channel.isClosed()) {
while (inputStream.available() > 0) {
int i = inputStream.read(tmp, 0, 1024);
if (i < 0) break;
pbsOutput.append(new String(tmp, 0, i));
}
- if (channel.isClosed()) {
- break;
- }
}
String output = pbsOutput.toString();
this.setStdOutputString(output);
[4/5] airavata git commit: Try to read input stream first time and
then read until jsh channel is closed.
Posted by sh...@apache.org.
Try to read input stream first time and then read until jsh channel is closed.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/4de74c60
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/4de74c60
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/4de74c60
Branch: refs/heads/master
Commit: 4de74c6017ad2b6c3554a4956a4fd9db216ad2d3
Parents: fd6e0a0
Author: shamrath <sh...@gmail.com>
Authored: Sun May 10 16:39:57 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Sun May 10 16:39:57 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/4de74c60/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
----------------------------------------------------------------------
diff --git a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
index 767fc61..9f5fa4c 100644
--- a/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
+++ b/tools/gsissh/src/main/java/org/apache/airavata/gsi/ssh/impl/StandardOutReader.java
@@ -42,13 +42,13 @@ public class StandardOutReader implements CommandOutput {
StringBuffer pbsOutput = new StringBuffer("");
InputStream inputStream = channel.getInputStream();
byte[] tmp = new byte[1024];
- while (!channel.isClosed()) {
+ do {
while (inputStream.available() > 0) {
int i = inputStream.read(tmp, 0, 1024);
if (i < 0) break;
pbsOutput.append(new String(tmp, 0, i));
}
- }
+ } while (!channel.isClosed()) ;
String output = pbsOutput.toString();
this.setStdOutputString(output);
} catch (IOException e) {
[5/5] airavata git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/airavata
Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/e4be39e8
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/e4be39e8
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/e4be39e8
Branch: refs/heads/master
Commit: e4be39e812058d3dfb50aed87bdefee5d081b74d
Parents: 4de74c6 34a8401
Author: shamrath <sh...@gmail.com>
Authored: Sun May 10 16:40:03 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Sun May 10 16:40:03 2015 -0400
----------------------------------------------------------------------
.../experimentModel.thrift | 2 --
tools/rabbitmq-download-start.sh | 22 ++++++++++++++++++++
2 files changed, 22 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/5] airavata git commit: Refactored GFacHandler and GFacProvider
inheritance,
merged Recoverable interface with its correspond normal interface.
Posted by sh...@apache.org.
Refactored GFacHandler and GFacProvider inheritance, merged Recoverable interface with its correspond normal interface.
Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/a3487e77
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/a3487e77
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/a3487e77
Branch: refs/heads/master
Commit: a3487e770a29be9060ee72298fa91f9b0a71e4be
Parents: 4819dbb
Author: shamrath <sh...@gmail.com>
Authored: Sun May 10 15:07:37 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Sun May 10 15:07:37 2015 -0400
----------------------------------------------------------------------
.../gaussian/handler/GaussianHandler.java | 4 +-
.../gfac/bes/handlers/SMSByteIOInHandler.java | 9 +-
.../gfac/bes/handlers/SMSByteIOOutHandler.java | 5 ++
.../gfac/bes/provider/impl/BESProvider.java | 9 +-
.../airavata/gfac/core/cpi/BetterGfacImpl.java | 88 +++++++-------------
.../gfac/core/handler/AbstractHandler.java | 25 +++++-
.../handler/AbstractRecoverableHandler.java | 88 --------------------
.../core/handler/AppDescriptorCheckHandler.java | 2 +-
.../airavata/gfac/core/handler/GFacHandler.java | 8 ++
.../core/handler/GFacRecoverableHandler.java | 45 ----------
.../provider/AbstractRecoverableProvider.java | 75 -----------------
.../gfac/core/provider/GFacProvider.java | 7 ++
.../core/provider/GFacRecoverableProvider.java | 44 ----------
.../gfac/core/states/GfacPluginState.java | 7 +-
.../airavata/gfac/core/utils/GFacUtils.java | 20 +++--
.../apache/airavata/job/TestGlobalHandler.java | 6 ++
.../org/apache/airavata/job/TestInHandler.java | 6 ++
.../org/apache/airavata/job/TestOutHandler.java | 6 ++
.../org/apache/airavata/job/TestProvider.java | 5 ++
.../airavata/job/TestThreadedHandler.java | 6 ++
.../handler/GSISSHDirectorySetupHandler.java | 4 +-
.../gfac/gsissh/handler/GSISSHInputHandler.java | 4 +-
.../gsissh/handler/GSISSHOutputHandler.java | 4 +-
.../gsissh/handler/NewGSISSHOutputHandler.java | 7 +-
.../gsissh/provider/impl/GSISSHProvider.java | 5 +-
.../handler/LocalDirectorySetupHandler.java | 6 ++
.../gfac/local/handler/LocalInputHandler.java | 5 +-
.../gfac/local/provider/impl/LocalProvider.java | 5 ++
.../handlers/GridPullMonitorHandler.java | 6 ++
.../handlers/GridPushMonitorHandler.java | 5 ++
.../ssh/handler/AdvancedSCPInputHandler.java | 4 +-
.../ssh/handler/AdvancedSCPOutputHandler.java | 5 ++
.../gfac/ssh/handler/NewSSHOutputHandler.java | 7 +-
.../ssh/handler/SSHDirectorySetupHandler.java | 8 +-
.../gfac/ssh/handler/SSHInputHandler.java | 5 ++
.../gfac/ssh/handler/SSHOutputHandler.java | 5 ++
.../gfac/ssh/provider/impl/SSHProvider.java | 5 +-
37 files changed, 208 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-application-specific-handlers/src/main/java/org/apache/airavata/application/gaussian/handler/GaussianHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-application-specific-handlers/src/main/java/org/apache/airavata/application/gaussian/handler/GaussianHandler.java b/modules/gfac/gfac-application-specific-handlers/src/main/java/org/apache/airavata/application/gaussian/handler/GaussianHandler.java
index 048ade6..0d21665 100644
--- a/modules/gfac/gfac-application-specific-handlers/src/main/java/org/apache/airavata/application/gaussian/handler/GaussianHandler.java
+++ b/modules/gfac/gfac-application-specific-handlers/src/main/java/org/apache/airavata/application/gaussian/handler/GaussianHandler.java
@@ -22,7 +22,7 @@
package org.apache.airavata.application.gaussian.handler;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
import org.apache.airavata.model.workspace.experiment.ComputationalResourceScheduling;
@@ -39,7 +39,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
-public class GaussianHandler extends AbstractRecoverableHandler {
+public class GaussianHandler extends AbstractHandler {
private static final Logger logger = LoggerFactory.getLogger(GaussianHandler.class);
public static final String LINK_SECTION = "%";
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOInHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOInHandler.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOInHandler.java
index 6bc6183..add8cb1 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOInHandler.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOInHandler.java
@@ -45,6 +45,11 @@ public class SMSByteIOInHandler extends AbstractSMSHandler implements GFacHandle
}
}
-
-
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOOutHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOOutHandler.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOOutHandler.java
index 354abf6..1baa2d9 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOOutHandler.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/handlers/SMSByteIOOutHandler.java
@@ -84,5 +84,10 @@ public class SMSByteIOOutHandler extends AbstractSMSHandler implements GFacHandl
}
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
index b301f2b..d505671 100644
--- a/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
+++ b/modules/gfac/gfac-bes/src/main/java/org/apache/airavata/gfac/bes/provider/impl/BESProvider.java
@@ -412,8 +412,13 @@ public class BESProvider extends AbstractProvider implements GFacProvider,
throws GFacProviderException, GFacException {
// TODO Auto-generated method stub
}
-
- protected void waitUntilDone(FactoryClient factory, EndpointReferenceType activityEpr, JobDetails jobDetails) throws Exception {
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
+ protected void waitUntilDone(FactoryClient factory, EndpointReferenceType activityEpr, JobDetails jobDetails) throws Exception {
try {
while ((factory.getActivityStatus(activityEpr) != ActivityStateEnumeration.FINISHED)
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index 98ba942..d5e623e 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -43,7 +43,6 @@ import org.apache.airavata.gfac.core.notification.events.ExecutionFailEvent;
import org.apache.airavata.gfac.core.notification.listeners.LoggingListener;
import org.apache.airavata.gfac.core.provider.GFacProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
-import org.apache.airavata.gfac.core.provider.GFacRecoverableProvider;
import org.apache.airavata.gfac.core.states.GfacExperimentState;
import org.apache.airavata.gfac.core.states.GfacPluginState;
import org.apache.airavata.gfac.core.utils.GFacUtils;
@@ -829,20 +828,16 @@ public class BetterGfacImpl implements GFac,Watcher {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
- if (plState!=null && Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
- if (provider instanceof GFacRecoverableProvider) {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
- ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
- }
- } else {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacPluginState plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+ if (plState != null && plState == GfacPluginState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
executeProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
+ } else {
+ provider.recover(jobExecutionContext);
}
+ GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
@@ -868,24 +863,21 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
+ // TODO - Did refactoring, but need to recheck the logic again.
private void reInvokeProviderCancel(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
GFacProvider provider = jobExecutionContext.getProvider();
if (provider != null) {
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
- String plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
- if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
- if (provider instanceof GFacRecoverableProvider) {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
- ((GFacRecoverableProvider) provider).recover(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
- }
- } else {
- GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+ GfacPluginState plState = GFacUtils.getPluginState(zk, jobExecutionContext, provider.getClass().getName());
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, provider.getClass().getName());
+ if (plState == GfacPluginState.UNKNOWN || plState == GfacPluginState.INVOKING) { // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
initProvider(provider, jobExecutionContext);
cancelProvider(provider, jobExecutionContext);
disposeProvider(provider, jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
+ } else {
+ provider.recover(jobExecutionContext);
}
+ GFacUtils.updatePluginState(zk, jobExecutionContext, provider.getClass().getName(), GfacPluginState.COMPLETED);
monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
}
@@ -1106,6 +1098,7 @@ public class BetterGfacImpl implements GFac,Watcher {
* @param jobExecutionContext
* @throws GFacException
*/
+ // TODO - Did refactoring, but need to recheck the logic again.
private void reInvokeInFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
List<GFacHandlerConfig> handlers = jobExecutionContext.getGFacConfiguration().getInHandlers();
try {
@@ -1117,31 +1110,18 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
- int state = 0;
- try {
- state = Integer.valueOf(plState);
- } catch (NumberFormatException e) {
-
- }
- if (state >= GfacPluginState.INVOKED.getValue()) {
- if (handler instanceof GFacRecoverableHandler) {
- // if these already ran we re-run only recoverable handlers
- log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
- handler.initProperties(handlerClassName.getProperties());
- ((GFacRecoverableHandler) handler).recover(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
- } else {
- log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
- }
- } else {
+ GfacPluginState plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+ handler.initProperties(handlerClassName.getProperties());
+ if (plState == GfacPluginState.UNKNOWN || plState == GfacPluginState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this is normal mode");
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
- handler.initProperties(handlerClassName.getProperties());
handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } else {
+ // if these already ran we re-run only recoverable handlers
+ log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
+ handler.recover(jobExecutionContext);
}
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
} catch (GFacHandlerException e) {
throw new GFacException("Error Executing a InFlow Handler", e.getCause());
} catch (ClassNotFoundException e) {
@@ -1166,6 +1146,7 @@ public class BetterGfacImpl implements GFac,Watcher {
}
}
+ // TODO - Did refactoring, but need to recheck the logic again.
public void reInvokeOutFlowHandlers(JobExecutionContext jobExecutionContext) throws GFacException {
String experimentPath = null;
try {
@@ -1211,24 +1192,17 @@ public class BetterGfacImpl implements GFac,Watcher {
try {
handlerClass = Class.forName(handlerClassName.getClassName().trim()).asSubclass(GFacHandler.class);
handler = handlerClass.newInstance();
- String plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
- if (Integer.valueOf(plState) >= GfacPluginState.INVOKED.getValue()) {
- if (handler instanceof GFacRecoverableHandler) {
- // if these already ran we re-run only recoverable handlers
- log.info(handlerClassName.getClassName() + " is a recoverable handler so we recover the handler");
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
- ((GFacRecoverableHandler) handler).recover(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
- } else {
- log.info(handlerClassName.getClassName() + " is not a recoverable handler so we do not run because it already ran in last-run");
- }
- } else {
+ GfacPluginState plState = GFacUtils.getPluginState(zk, jobExecutionContext, handlerClassName.getClassName());
+ GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
+ if (plState == GfacPluginState.UNKNOWN || plState == GfacPluginState.INVOKING) {
log.info(handlerClassName.getClassName() + " never ran so we run this in normal mode");
- GFacUtils.createPluginZnode(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.INVOKING);
handler.initProperties(handlerClassName.getProperties());
handler.invoke(jobExecutionContext);
- GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
+ } else {
+ // if these already ran we re-run only recoverable handlers
+ handler.recover(jobExecutionContext);
}
+ GFacUtils.updatePluginState(zk, jobExecutionContext, handlerClassName.getClassName(), GfacPluginState.COMPLETED);
} catch (ClassNotFoundException e) {
try {
StringWriter errors = new StringWriter();
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
index b6cc359..1bdfa97 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractHandler.java
@@ -23,12 +23,22 @@ package org.apache.airavata.gfac.core.handler;
import org.apache.airavata.common.utils.MonitorPublisher;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
+import org.apache.airavata.gfac.core.states.GfacPluginState;
+import org.apache.airavata.gfac.core.utils.GFacUtils;
+import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
+import org.apache.airavata.model.messaging.event.TaskIdentifier;
+import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
import org.apache.airavata.registry.cpi.Registry;
import org.apache.airavata.registry.cpi.RegistryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
public abstract class AbstractHandler implements GFacHandler {
- protected Registry registry = null;
+ private static final Logger logger = LoggerFactory.getLogger(AbstractHandler.class);
+ protected Registry registry = null;
protected MonitorPublisher publisher = null;
@@ -37,6 +47,11 @@ public abstract class AbstractHandler implements GFacHandler {
}
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ try {
+ GFacUtils.updatePluginState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacPluginState.INVOKED);
+ } catch (Exception e) {
+ logger.error("Error saving Recoverable provider state", e);
+ }
registry = jobExecutionContext.getRegistry();
if(registry == null){
try {
@@ -61,4 +76,12 @@ public abstract class AbstractHandler implements GFacHandler {
public void setRegistry(Registry registry) {
this.registry = registry;
}
+
+ protected void fireTaskOutputChangeEvent(JobExecutionContext jobExecutionContext, List<OutputDataObjectType> outputArray) {
+ TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
+ jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
+ jobExecutionContext.getExperimentID(),
+ jobExecutionContext.getGatewayID());
+ publisher.publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
deleted file mode 100644
index 566a6ba..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AbstractRecoverableHandler.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.handler;
-
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.gfac.core.states.GfacPluginState;
-import org.apache.airavata.gfac.core.utils.GFacUtils;
-import org.apache.airavata.model.appcatalog.appinterface.OutputDataObjectType;
-import org.apache.airavata.model.messaging.event.TaskIdentifier;
-import org.apache.airavata.model.messaging.event.TaskOutputChangeEvent;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-public abstract class AbstractRecoverableHandler implements GFacRecoverableHandler {
- private static final Logger logger = LoggerFactory.getLogger(AppDescriptorCheckHandler.class);
- protected Registry registry = null;
-
- protected MonitorPublisher publisher = null;
-
- protected AbstractRecoverableHandler() {
- publisher = BetterGfacImpl.getMonitorPublisher(); // This will not be null because this will be initialize in GFacIml
- }
-
- public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
- try {
- GFacUtils.updatePluginState(jobExecutionContext.getZk(), jobExecutionContext, this.getClass().getName(), GfacPluginState.INVOKED);
- } catch (Exception e) {
- logger.error("Error saving Recoverable provider state", e);
- }
- registry = jobExecutionContext.getRegistry();
- if (registry == null) {
- try {
- registry = RegistryFactory.getDefaultRegistry();
- } catch (RegistryException e) {
- throw new GFacHandlerException("unable to create registry instance", e);
- }
- }
- }
-
- public MonitorPublisher getPublisher() {
- return publisher;
- }
-
- public void setPublisher(MonitorPublisher publisher) {
- this.publisher = publisher;
- }
-
- public Registry getRegistry() {
- return registry;
- }
-
- public void setRegistry(Registry registry) {
- this.registry = registry;
- }
-
- protected void fireTaskOutputChangeEvent(JobExecutionContext jobExecutionContext, List<OutputDataObjectType> outputArray) {
- TaskIdentifier taskIdentity = new TaskIdentifier(jobExecutionContext.getTaskData().getTaskID(),
- jobExecutionContext.getWorkflowNodeDetails().getNodeInstanceId(),
- jobExecutionContext.getExperimentID(),
- jobExecutionContext.getGatewayID());
- publisher.publish(new TaskOutputChangeEvent(outputArray, taskIdentity));
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
index 34ea139..7302fcf 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/AppDescriptorCheckHandler.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
import java.util.Properties;
-public class AppDescriptorCheckHandler implements GFacRecoverableHandler {
+public class AppDescriptorCheckHandler implements GFacHandler {
private static final Logger logger = LoggerFactory.getLogger(AppDescriptorCheckHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacHandler.java
index f820e29..15b86dd 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacHandler.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacHandler.java
@@ -49,4 +49,12 @@ public interface GFacHandler {
* @throws GFacHandlerException
*/
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException;
+
+
+ /**
+ * This method can be used to implement recovering part of the stateful handler
+ * If you do not want to recover an already ran handler you leave this recover method empty.
+ * @param jobExecutionContext
+ */
+ public void recover(JobExecutionContext jobExecutionContext)throws GFacHandlerException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
deleted file mode 100644
index ab778b8..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/handler/GFacRecoverableHandler.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.handler;
-
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-
-/**
- * This handler type can be used to implement stateful Operation and
- * we recommend to use the ZK client to store and retrieve the states
- * of the handler implementation. Framework level we use
- * ZK to decide handler ran successfully or not so each handler
- * execution details can be found in following zk path
- * /gfac-experiment/<gfac-node-name>/experimentId+taskId/full-qualified-handlername/state
- * ex: /gfac-experiment/gfac-node0/echoExperiment_2c6c11b8-dea0-4ec8-9832-f3e69fe2e6bb+IDontNeedaNode_682faa66-6218-4897-9271-656bfb8b2bd1/org.apache.airavata.gfac.handlers.Test/state
- */
-public interface GFacRecoverableHandler extends GFacHandler {
-
-
- /**
- * This method can be used to implement recovering part of the stateful handler
- * If you do not want to recover an already ran handler you can simply implement
- * GfacAbstract Handler or GFacHandler or leave this recover method empty.
- * @param jobExecutionContext
- */
- public void recover(JobExecutionContext jobExecutionContext)throws GFacHandlerException;
-
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
deleted file mode 100644
index c6a105e..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/AbstractRecoverableProvider.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.provider;
-
-import org.apache.airavata.common.utils.MonitorPublisher;
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
-import org.apache.airavata.model.workspace.experiment.JobDetails;
-import org.apache.airavata.model.workspace.experiment.JobStatus;
-import org.apache.airavata.persistance.registry.jpa.impl.RegistryFactory;
-import org.apache.airavata.registry.cpi.Registry;
-import org.apache.airavata.registry.cpi.RegistryException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public abstract class AbstractRecoverableProvider implements GFacRecoverableProvider {
- private final Logger log = LoggerFactory.getLogger(this.getClass());
-
- protected Registry registry = null;
- protected JobDetails details; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
- protected JobStatus status; //todo we need to remove this and add methods to fill Job details, this is not a property of a provider
- protected JobExecutionContext jobExecutionContext;
-
- protected MonitorPublisher monitorPublisher;
-
- protected AbstractRecoverableProvider() { //todo this has to be fixed
- this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
- if (this.monitorPublisher == null) {
- this.monitorPublisher = BetterGfacImpl.getMonitorPublisher();
- }
- }
-
- public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
- log.debug("Initializing " + this.getClass().getName());
- if (jobExecutionContext.getRegistry() == null) {
- try {
- registry = RegistryFactory.getDefaultRegistry();
- } catch (RegistryException e) {
- throw new GFacException("Unable to create registry instance", e);
- }
- } else {
- registry = jobExecutionContext.getRegistry();
- }
- details = new JobDetails();
- status = new JobStatus();
- this.jobExecutionContext = jobExecutionContext;
- }
-
- public MonitorPublisher getMonitorPublisher() {
- return monitorPublisher;
- }
-
- public void setMonitorPublisher(MonitorPublisher monitorPublisher) {
- this.monitorPublisher = monitorPublisher;
- }
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
index 031cf77..6006309 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacProvider.java
@@ -59,4 +59,11 @@ public interface GFacProvider{
public void cancelJob(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException;
+ /**
+ * This method can be used to implement recovering part of the stateful handler
+ * If you do not want to recover an already ran handler you leave this recover method empty.
+ *
+ * @param jobExecutionContext
+ */
+ public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
deleted file mode 100644
index 6cc9820..0000000
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/provider/GFacRecoverableProvider.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
-*/
-package org.apache.airavata.gfac.core.provider;
-
-import org.apache.airavata.gfac.GFacException;
-import org.apache.airavata.gfac.core.context.JobExecutionContext;
-
-/**
- * This provider type can be used to implement stateful Operation and
- * we recommend to use the ZK client to store and retrieve the states
- * of the handler implementation. Framework level we use
- * ZK to decide handler ran successfully or not so each handler
- * execution details can be found in following zk path
- * /gfac-experiment/<gfac-node-name>/experimentId+taskId/full-qualified-handlername/state
- * ex: /gfac-experiment/gfac-node0/echoExperiment_2c6c11b8-dea0-4ec8-9832-f3e69fe2e6bb+IDontNeedaNode_682faa66-6218-4897-9271-656bfb8b2bd1/org.apache.airavata.gfac.handlers.Test/state
- */
-public interface GFacRecoverableProvider extends GFacProvider {
- /**
- * This method can be used to implement recovering part of the stateful handler
- * If you do not want to recover an already ran handler you can simply implement
- * GfacAbstract Handler or GFacHandler or leave this recover method empty.
- *
- * @param jobExecutionContext
- */
- public void recover(JobExecutionContext jobExecutionContext)throws GFacProviderException,GFacException;
-}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
index b934303..818a144 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/states/GfacPluginState.java
@@ -23,7 +23,8 @@ package org.apache.airavata.gfac.core.states;
public enum GfacPluginState {
INVOKING(0),
INVOKED(1),
- COMPLETED(2);
+ COMPLETED(2),
+ UNKNOWN(3);
private final int value;
@@ -49,8 +50,10 @@ public enum GfacPluginState {
return INVOKING;
case 1:
return INVOKED;
+ case 2:
+ return COMPLETED;
default:
- return null;
+ return UNKNOWN;
}
}
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
index 2e2d501..434f6d4 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/utils/GFacUtils.java
@@ -900,7 +900,10 @@ public class GFacUtils {
InterruptedException {
String expState = AiravataZKUtils.getExpState(zk, jobExecutionContext
.getExperimentID());
- return GfacExperimentState.findByValue(Integer.parseInt(expState));
+ if (expState == null || expState.isEmpty()) {
+ return GfacExperimentState.UNKNOWN;
+ }
+ return GfacExperimentState.findByValue(Integer.valueOf(expState));
}
public static int getZKExperimentStateValue(ZooKeeper zk,
@@ -1019,8 +1022,8 @@ public class GFacUtils {
return false;
}
- public static String getPluginState(ZooKeeper zk,
- JobExecutionContext jobExecutionContext, String className) {
+ public static GfacPluginState getPluginState(ZooKeeper zk,
+ JobExecutionContext jobExecutionContext, String className) {
try {
String expState = AiravataZKUtils.getExpZnodeHandlerPath(
jobExecutionContext.getExperimentID(), className);
@@ -1028,11 +1031,12 @@ public class GFacUtils {
Stat exists = zk.exists(expState + File.separator
+ AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false);
if (exists != null) {
- return new String(zk.getData(expState + File.separator
- + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false,
- exists));
- }
- return null; // if the node doesn't exist or any other error we
+ String stateVal = new String(zk.getData(expState + File.separator
+ + AiravataZKUtils.ZK_EXPERIMENT_STATE_NODE, false,
+ exists));
+ return GfacPluginState.findByValue(Integer.valueOf(stateVal));
+ }
+ return GfacPluginState.UNKNOWN; // if the node doesn't exist or any other error we
// return false
} catch (Exception e) {
log.error("Error occured while getting zk node status", e);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestGlobalHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestGlobalHandler.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestGlobalHandler.java
index 05eb735..67ee9b8 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestGlobalHandler.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestGlobalHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.job;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
@@ -29,4 +30,9 @@ public class TestGlobalHandler extends AbstractHandler {
public void initProperties(Properties properties) throws GFacHandlerException {
//To change body of implemented methods use File | Settings | File Templates.
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestInHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestInHandler.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestInHandler.java
index e50bfd6..7307ba5 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestInHandler.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestInHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.job;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
@@ -29,4 +30,9 @@ public class TestInHandler extends AbstractHandler {
public void initProperties(Properties properties) throws GFacHandlerException {
//To change body of implemented methods use File | Settings | File Templates.
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestOutHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestOutHandler.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestOutHandler.java
index 42dd00c..b19d646 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestOutHandler.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestOutHandler.java
@@ -20,6 +20,7 @@
*/
package org.apache.airavata.job;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
@@ -29,4 +30,9 @@ public class TestOutHandler extends AbstractHandler {
public void initProperties(Properties properties) throws GFacHandlerException {
//To change body of implemented methods use File | Settings | File Templates.
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
index 151ee19..3829dae 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestProvider.java
@@ -33,6 +33,11 @@ public class TestProvider extends AbstractProvider {
//To change body of implemented methods use File | Settings | File Templates.
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
//To change body of implemented methods use File | Settings | File Templates.
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestThreadedHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestThreadedHandler.java b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestThreadedHandler.java
index 7ee54bd..f2788c5 100644
--- a/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestThreadedHandler.java
+++ b/modules/gfac/gfac-core/src/test/java/org/apache/airavata/job/TestThreadedHandler.java
@@ -21,6 +21,7 @@
package org.apache.airavata.job;
import junit.framework.Assert;
+import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
@@ -32,6 +33,11 @@ public class TestThreadedHandler extends ThreadedHandler {
public void initProperties(Properties properties) throws GFacHandlerException {
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
public void run() {
System.out.println("Invoking TestThreadedHandler");
Assert.assertTrue(true);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
index 26547b5..b4790c7 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHDirectorySetupHandler.java
@@ -22,7 +22,7 @@ package org.apache.airavata.gfac.gsissh.handler;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -37,7 +37,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Properties;
-public class GSISSHDirectorySetupHandler extends AbstractRecoverableHandler {
+public class GSISSHDirectorySetupHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(GSISSHDirectorySetupHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
index 70706ae..5fba1c1 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHInputHandler.java
@@ -24,7 +24,7 @@ import org.apache.airavata.common.exception.ApplicationSettingsException;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -53,7 +53,7 @@ import java.util.Set;
* Recoverability for this handler assumes the same input values will come in the second
* run, and assume nobody is changing registry during the original submission and re-submission
*/
-public class GSISSHInputHandler extends AbstractRecoverableHandler {
+public class GSISSHInputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(GSISSHInputHandler.class);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
index 1064e1c..cfd5a40 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/GSISSHOutputHandler.java
@@ -24,7 +24,7 @@ package org.apache.airavata.gfac.gsissh.handler;
//import org.apache.airavata.commons.gfac.type.MappingFactory;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
@@ -52,7 +52,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
-public class GSISSHOutputHandler extends AbstractRecoverableHandler {
+public class GSISSHOutputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(GSISSHOutputHandler.class);
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
index f4aaf72..ed94312 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/handler/NewGSISSHOutputHandler.java
@@ -69,7 +69,12 @@ public class NewGSISSHOutputHandler extends AbstractHandler{
}
}
- @Override
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ @Override
public void initProperties(Properties properties) throws GFacHandlerException {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
index 1b0d283..c35445b 100644
--- a/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
+++ b/modules/gfac/gfac-gsissh/src/main/java/org/apache/airavata/gfac/gsissh/provider/impl/GSISSHProvider.java
@@ -24,7 +24,6 @@ import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
-import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.gfac.ExecutionMode;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
@@ -32,7 +31,7 @@ import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.gsissh.security.GSISecurityContext;
@@ -62,7 +61,7 @@ import java.util.Map;
//import org.apache.airavata.schemas.gfac.GsisshHostType;
-public class GSISSHProvider extends AbstractRecoverableProvider {
+public class GSISSHProvider extends AbstractProvider {
private static final Logger log = LoggerFactory.getLogger(GSISSHProvider.class);
public void initProperties(Map<String, String> properties) throws GFacProviderException, GFacException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
index 394cfaa..2f9e3b0 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalDirectorySetupHandler.java
@@ -41,6 +41,12 @@ public class LocalDirectorySetupHandler implements GFacHandler {
makeFileSystemDir(jobExecutionContext.getInputDir());
makeFileSystemDir(jobExecutionContext.getOutputDir());
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
private void makeFileSystemDir(String dir) throws GFacHandlerException {
File f = new File(dir);
if (f.isDirectory() && f.exists()) {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
index d085fbf..884ccd5 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/handler/LocalInputHandler.java
@@ -20,9 +20,8 @@
*/
package org.apache.airavata.gfac.local.handler;
-import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.model.appcatalog.appinterface.DataType;
import org.apache.airavata.model.appcatalog.appinterface.InputDataObjectType;
@@ -36,7 +35,7 @@ import java.util.Map;
import java.util.Properties;
-public class LocalInputHandler extends AbstractRecoverableHandler {
+public class LocalInputHandler extends AbstractHandler {
private static final Logger logger = LoggerFactory.getLogger(LocalInputHandler.class);
@Override
public void invoke(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
index d62d3d7..df932b4 100644
--- a/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
+++ b/modules/gfac/gfac-local/src/main/java/org/apache/airavata/gfac/local/provider/impl/LocalProvider.java
@@ -252,6 +252,11 @@ public class LocalProvider extends AbstractProvider {
throw new NotImplementedException();
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacProviderException, GFacException {
+ // TODO: Auto generated method body.
+ }
+
private void buildCommand() {
cmdList.add(jobExecutionContext.getExecutablePath());
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
index 24b300e..6fff840 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPullMonitorHandler.java
@@ -107,6 +107,12 @@ public class GridPullMonitorHandler extends ThreadedHandler implements Watcher{
monitorID.getJobID(), monitorID.getExperimentID());
}
}
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
public AuthenticationInfo getAuthenticationInfo() {
return authenticationInfo;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
index 0eb4526..bb36b28 100644
--- a/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
+++ b/modules/gfac/gfac-monitor/gfac-hpc-monitor/src/main/java/org/apache/airavata/gfac/monitor/handlers/GridPushMonitorHandler.java
@@ -85,6 +85,11 @@ public class GridPushMonitorHandler extends ThreadedHandler {
amqpMonitor.getRunningQueue().add(monitorID);
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
public AMQPMonitor getAmqpMonitor() {
return amqpMonitor;
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
index 2fae3d5..a1c5278 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPInputHandler.java
@@ -23,7 +23,7 @@ package org.apache.airavata.gfac.ssh.handler;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.core.context.JobExecutionContext;
import org.apache.airavata.gfac.core.context.MessageContext;
-import org.apache.airavata.gfac.core.handler.AbstractRecoverableHandler;
+import org.apache.airavata.gfac.core.handler.AbstractHandler;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.ssh.security.SSHSecurityContext;
@@ -57,7 +57,7 @@ import java.util.*;
* <property name="hostName" value="gw98.iu.xsede.org"/>
* <property name="inputPath" value="/home/airavata/outputData"/>
*/
-public class AdvancedSCPInputHandler extends AbstractRecoverableHandler {
+public class AdvancedSCPInputHandler extends AbstractHandler {
private static final Logger log = LoggerFactory.getLogger(AdvancedSCPInputHandler.class);
public static final String ADVANCED_SSH_AUTH = "advanced.ssh.auth";
public static final int DEFAULT_SSH_PORT = 22;
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
index fac03e8..efed1bc 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/AdvancedSCPOutputHandler.java
@@ -212,5 +212,10 @@ public class AdvancedSCPOutputHandler extends AbstractHandler {
}
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
index f7efb34..93d0ed0 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/NewSSHOutputHandler.java
@@ -64,7 +64,12 @@ public class NewSSHOutputHandler extends AbstractHandler{
}
- @Override
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ @Override
public void initProperties(Properties properties) throws GFacHandlerException {
// TODO Auto-generated method stub
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
index 5c0d1f8..a985bd3 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHDirectorySetupHandler.java
@@ -63,7 +63,13 @@ public class SSHDirectorySetupHandler extends AbstractHandler {
makeDirectory(jobExecutionContext);
}
- private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
+ private void makeDirectory(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
Cluster cluster = null;
try{
String hostAddress = jobExecutionContext.getHostName();
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
index 27471d4..160c77d 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHInputHandler.java
@@ -153,6 +153,11 @@ public class SSHInputHandler extends AbstractHandler {
jobExecutionContext.setInMessageContext(inputNew);
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
private static String stageInputFiles(Cluster cluster, JobExecutionContext jobExecutionContext, String paramValue) throws IOException, GFacException {
int i = paramValue.lastIndexOf(File.separator);
String substring = paramValue.substring(i + 1);
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
index 9b7cd2e..f9eb1ef 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/handler/SSHOutputHandler.java
@@ -243,6 +243,11 @@ public class SSHOutputHandler extends AbstractHandler {
}
+ @Override
+ public void recover(JobExecutionContext jobExecutionContext) throws GFacHandlerException {
+ // TODO: Auto generated method body.
+ }
+
public void initProperties(Properties properties) throws GFacHandlerException {
}
http://git-wip-us.apache.org/repos/asf/airavata/blob/a3487e77/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 7023e3c..0f88327 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -21,7 +21,6 @@
package org.apache.airavata.gfac.ssh.provider.impl;
-import org.airavata.appcatalog.cpi.AppCatalog;
import org.airavata.appcatalog.cpi.AppCatalogException;
import org.apache.airavata.common.exception.AiravataException;
import org.apache.airavata.common.exception.ApplicationSettingsException;
@@ -34,7 +33,7 @@ import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
import org.apache.airavata.gfac.core.handler.GFacHandlerException;
import org.apache.airavata.gfac.core.handler.ThreadedHandler;
import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
-import org.apache.airavata.gfac.core.provider.AbstractRecoverableProvider;
+import org.apache.airavata.gfac.core.provider.AbstractProvider;
import org.apache.airavata.gfac.core.provider.GFacProviderException;
import org.apache.airavata.gfac.core.utils.GFacUtils;
import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
@@ -71,7 +70,7 @@ import java.util.*;
/**
* Execute application using remote SSH
*/
-public class SSHProvider extends AbstractRecoverableProvider {
+public class SSHProvider extends AbstractProvider {
private static final Logger log = LoggerFactory.getLogger(SSHProvider.class);
private Cluster cluster;
private String jobID = null;