You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/08/14 00:26:54 UTC

oozie git commit: OOZIE-2228 Statustransit service doesn't pick bundle with suspend status

Repository: oozie
Updated Branches:
  refs/heads/master f1f3c64dd -> 0bb5e1369


OOZIE-2228 Statustransit service doesn't pick bundle with suspend status


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0bb5e136
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0bb5e136
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0bb5e136

Branch: refs/heads/master
Commit: 0bb5e13694c2b17c8231113c98569b8fcd871c58
Parents: f1f3c64
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Thu Aug 13 15:27:46 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Thu Aug 13 15:27:46 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/oozie/BundleJobBean.java    |  4 +--
 .../bundle/BundleStatusTransitXCommand.java     | 16 +++++++---
 .../executor/jpa/BundleJobQueryExecutor.java    |  9 ++++--
 .../jpa/TestBundleJobQueryExecutor.java         |  2 +-
 .../oozie/service/TestStatusTransitService.java | 33 +++++++++++++++++++-
 release-log.txt                                 |  1 +
 6 files changed, 53 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 9b31bd3..5868412 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -75,7 +75,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_BUNDLE_JOB_STATUS", query = "select w.statusStr from BundleJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp from BundleJobBean w where w.id = :id"),
+        @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp, w.pauseTimestamp, w.suspendedTimestamp from BundleJobBean w where w.id = :id"),
 
         @NamedQuery(name = "GET_BUNDLE_JOB_ID_JOBXML_CONF", query = "select w.id, w.jobXml, w.conf from BundleJobBean w where w.id = :id"),
 
@@ -107,7 +107,7 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"),
 
-        @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.pending = 1)"),
+        @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.pending = 1)"),
 
 
         @NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") })

http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
index fb45eb4..835777c 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
@@ -65,7 +65,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
     protected void loadState() throws CommandException {
         try {
             bundleJob = BundleJobQueryExecutor.getInstance().get(
-                    BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId);
+                    BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
 
             bundleActions = BundleActionQueryExecutor.getInstance().getList(
                     BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
@@ -82,7 +82,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
                         && (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) {
                     new BundleKillXCommand(jobId).call();
                     bundleJob = BundleJobQueryExecutor.getInstance().get(
-                            BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId);
+                            BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
                     bundleJob.setStatus(Job.Status.FAILED);
                     bundleJob.setLastModifiedTime(new Date());
                     BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS,
@@ -150,13 +150,16 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
 
     @Override
     protected boolean isPausedState() {
-
-        if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
+        //If bundle is paused then timestamp will be set.
+        //If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up.
+        if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR
+                && bundleJob.getPauseTime() != null) {
             return true;
         }
         else {
             return getBottomUpPauseStatus() != null;
         }
+
     }
 
     @Override
@@ -177,7 +180,10 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
 
     @Override
     protected boolean isSuspendedState() {
-        if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+        //If bundle is suspended then timestamp will be set.
+        //If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up.
+        if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR)
+                && bundleJob.getSuspendedTimestamp() != null) {
             return true;
         }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
index a770aad..e07672b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
@@ -48,7 +48,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
         UPDATE_BUNDLE_JOB_PAUSE_KICKOFF,
         GET_BUNDLE_JOB,
         GET_BUNDLE_JOB_STATUS,
-        GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME,
+        GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME,
         GET_BUNDLE_JOB_ID_JOBXML_CONF,
         GET_BUNDLE_IDS_FOR_STATUS_TRANSIT
     };
@@ -131,7 +131,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
         Query query = em.createNamedQuery(namedQuery.name());
         switch (namedQuery) {
             case GET_BUNDLE_JOB:
-            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME:
+            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
             case GET_BUNDLE_JOB_ID_JOBXML_CONF:
             case GET_BUNDLE_JOB_STATUS:
                 query.setParameter("id", parameters[0]);
@@ -196,13 +196,16 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
                 bean.setId((String) parameters[0]);
                 bean.setStatus((String) ret);
                 break;
-            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME:
+            case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
                 bean = new BundleJobBean();
                 arr = (Object[]) ret;
                 bean.setId((String) arr[0]);
                 bean.setStatus((String) arr[1]);
                 bean.setPending((Integer) arr[2]);
                 bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3]));
+                bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4]));
+                bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5]));
+
                 break;
             case GET_BUNDLE_JOB_ID_JOBXML_CONF:
                 bean = new BundleJobBean();

http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
index 509eedd..97cbb7f 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
@@ -128,7 +128,7 @@ public class TestBundleJobQueryExecutor extends XDataTestCase {
         BundleJobBean bean = this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
         // GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME
         BundleJobBean retBean = BundleJobQueryExecutor.getInstance().get(
-                BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bean.getId());
+                BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, bean.getId());
         assertEquals(bean.getId(), retBean.getId());
         assertEquals(bean.getStatusStr(), retBean.getStatusStr());
         assertEquals(bean.getPending(), retBean.getPending());

http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
index d25a2a4..0bcbce0 100644
--- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
@@ -1648,7 +1648,38 @@ public class TestStatusTransitService extends XDataTestCase {
 
     }
 
-    static class JobLock implements Runnable {
+    public void testBundleRunningAfterCoordResume() throws Exception {
+
+        setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+        services = new Services();
+        services.init();
+        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+
+        BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true);
+        final String bundleId = bundleJob.getId();
+        addRecordToBundleActionTable(bundleId, coord.getId(), "COORD-TEST", 0, Job.Status.RUNNING);
+        new CoordSuspendXCommand(coord.getId()).call();
+
+        coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
+        assertEquals(Job.Status.SUSPENDED, coord.getStatus());
+
+        Runnable runnable = new StatusTransitRunnable();
+        runnable.run();
+        bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
+        assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus());
+
+        new CoordResumeXCommand(coord.getId()).call();
+        coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
+        assertEquals(Job.Status.RUNNING, coord.getStatus());
+
+        runnable = new StatusTransitRunnable();
+        runnable.run();
+        bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
+        assertEquals(Job.Status.RUNNING, bundleJob.getStatus());
+
+    }
+
+  static class JobLock implements Runnable {
         String jobId;
 
         public JobLock(String jobId) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1f50731..729202a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru)
 OOZIE-2325 Shell action fails if user overrides oozie.launcher.mapreduce.map.env (kailongs via puru)
 OOZIE-2324 A syntax error in the kill node causes the workflow to get stuck and other problems (rkanter)
 OOZIE-2309 Enable the coord:dateOffset() function in /coordinator-app/datasets/dataset/@initial-instance (kailongs via rohini)