You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by sh...@apache.org on 2015/05/13 21:57:49 UTC

[1/2] airavata git commit: Implemented JobSubmitted recover steps.

Repository: airavata
Updated Branches:
  refs/heads/master 449eb7576 -> 420a51ad6


Implemented JobSubmitted recover steps.


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/c29aa94c
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/c29aa94c
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/c29aa94c

Branch: refs/heads/master
Commit: c29aa94cd498c5f384b12f3dc655df80f0bc77e6
Parents: 4b048e1
Author: shamrath <sh...@gmail.com>
Authored: Wed May 13 15:57:33 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Wed May 13 15:57:33 2015 -0400

----------------------------------------------------------------------
 .../airavata/gfac/core/cpi/BetterGfacImpl.java  | 32 ++++++++++++--------
 .../gfac/ssh/provider/impl/SSHProvider.java     |  7 +++++
 2 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
index da55a2d..455c0d4 100644
--- a/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
+++ b/modules/gfac/gfac-core/src/main/java/org/apache/airavata/gfac/core/cpi/BetterGfacImpl.java
@@ -616,8 +616,10 @@ public class BetterGfacImpl implements GFac,Watcher {
                     invokeProviderExecute(jobExecutionContext);
                     break;
                 case PROVIDERINVOKING:
-                    reInvokeProviderExecute(jobExecutionContext);
+                    reInvokeProviderExecute(jobExecutionContext, true);
                     break;
+                case JOBSUBMITTED:
+                    reInvokeProviderExecute(jobExecutionContext, false);
                 case PROVIDERINVOKED:
                     // no need to re-run the job
                     log.info("Provider does not have to be recovered because it ran successfully for experiment: " + experimentID);
@@ -772,21 +774,27 @@ public class BetterGfacImpl implements GFac,Watcher {
         }
     }
 
-    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
+    private void reInvokeProviderExecute(JobExecutionContext jobExecutionContext, boolean submit) throws GFacException, GFacProviderException, ApplicationSettingsException, InterruptedException, KeeperException {
         GFacProvider provider = jobExecutionContext.getProvider();
         if (provider != null) {
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
-            GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
-            GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
-            if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
-                initProvider(provider, jobExecutionContext);
-                executeProvider(provider, jobExecutionContext);
-                disposeProvider(provider, jobExecutionContext);
+            if (submit) {
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKING));
+                GfacHandlerState plState = GFacUtils.getHandlerState(zk, jobExecutionContext, provider.getClass().getName());
+                GFacUtils.createHandlerZnode(zk, jobExecutionContext, provider.getClass().getName());
+                if (plState != null && plState == GfacHandlerState.INVOKING) {    // this will make sure if a plugin crashes it will not launch from the scratch, but plugins have to save their invoked state
+                    initProvider(provider, jobExecutionContext);
+                    executeProvider(provider, jobExecutionContext);
+                    disposeProvider(provider, jobExecutionContext);
+                } else {
+                    provider.recover(jobExecutionContext);
+                }
+                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             } else {
-                provider.recover(jobExecutionContext);
+                disposeProvider(provider, jobExecutionContext);
+                GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
+                monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
             }
-            GFacUtils.updateHandlerState(zk, jobExecutionContext, provider.getClass().getName(), GfacHandlerState.COMPLETED);
-            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext), GfacExperimentState.PROVIDERINVOKED));
         }
 
         if (GFacUtils.isSynchronousMode(jobExecutionContext))

http://git-wip-us.apache.org/repos/asf/airavata/blob/c29aa94c/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
----------------------------------------------------------------------
diff --git a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
index 1807339..9bc68bd 100644
--- a/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
+++ b/modules/gfac/gfac-ssh/src/main/java/org/apache/airavata/gfac/ssh/provider/impl/SSHProvider.java
@@ -32,9 +32,12 @@ import org.apache.airavata.gfac.core.context.MessageContext;
 import org.apache.airavata.gfac.core.cpi.BetterGfacImpl;
 import org.apache.airavata.gfac.core.handler.GFacHandlerException;
 import org.apache.airavata.gfac.core.handler.ThreadedHandler;
+import org.apache.airavata.gfac.core.monitor.MonitorID;
+import org.apache.airavata.gfac.core.monitor.state.GfacExperimentStateChangeRequest;
 import org.apache.airavata.gfac.core.notification.events.StartExecutionEvent;
 import org.apache.airavata.gfac.core.provider.AbstractProvider;
 import org.apache.airavata.gfac.core.provider.GFacProviderException;
+import org.apache.airavata.gfac.core.states.GfacExperimentState;
 import org.apache.airavata.gfac.core.utils.GFacUtils;
 import org.apache.airavata.gfac.monitor.email.EmailBasedMonitor;
 import org.apache.airavata.gfac.monitor.email.EmailMonitorFactory;
@@ -161,6 +164,8 @@ public class SSHProvider extends AbstractProvider {
                     if (jobID != null) {
                         jobDetails.setJobID(jobID);
                         GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.SUBMITTED, monitorPublisher);
+                        monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                , GfacExperimentState.JOBSUBMITTED));
                     }
                     jobExecutionContext.setJobDetails(jobDetails);
                     String verifyJobId = verifyJobSubmission(cluster, jobDetails);
@@ -169,6 +174,8 @@ public class SSHProvider extends AbstractProvider {
                         if (jobID == null) {
                             jobID = verifyJobId;
                             jobDetails.setJobID(jobID);
+                            monitorPublisher.publish(new GfacExperimentStateChangeRequest(new MonitorID(jobExecutionContext)
+                                    , GfacExperimentState.JOBSUBMITTED));
                         }
                         GFacUtils.saveJobStatus(jobExecutionContext, jobDetails, JobState.QUEUED, monitorPublisher);
                     }


[2/2] airavata git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata

Posted by sh...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/airavata


Project: http://git-wip-us.apache.org/repos/asf/airavata/repo
Commit: http://git-wip-us.apache.org/repos/asf/airavata/commit/420a51ad
Tree: http://git-wip-us.apache.org/repos/asf/airavata/tree/420a51ad
Diff: http://git-wip-us.apache.org/repos/asf/airavata/diff/420a51ad

Branch: refs/heads/master
Commit: 420a51ad6a6a53ac9d0492d537325947c195490f
Parents: c29aa94 449eb75
Author: shamrath <sh...@gmail.com>
Authored: Wed May 13 15:57:43 2015 -0400
Committer: shamrath <sh...@gmail.com>
Committed: Wed May 13 15:57:43 2015 -0400

----------------------------------------------------------------------
 modules/distribution/new-dist/pom.xml           |  62 +-------
 .../main/assembly/airavata-common-component.xml | 100 +++++++++++++
 .../src/main/assembly/api-server-assembly.xml   | 147 +------------------
 .../src/main/assembly/api-server-component.xml  |   8 +-
 .../new-dist/src/main/assembly/src-assembly.xml |   2 +-
 modules/distribution/pom.xml                    |   2 +-
 6 files changed, 113 insertions(+), 208 deletions(-)
----------------------------------------------------------------------