You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2015/08/14 00:26:54 UTC
oozie git commit: OOZIE-2228 Statustransit service doesn't pick
bundle with suspend status
Repository: oozie
Updated Branches:
refs/heads/master f1f3c64dd -> 0bb5e1369
OOZIE-2228 Statustransit service doesn't pick bundle with suspend status
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0bb5e136
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0bb5e136
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0bb5e136
Branch: refs/heads/master
Commit: 0bb5e13694c2b17c8231113c98569b8fcd871c58
Parents: f1f3c64
Author: Purshotam Shah <pu...@yahoo-inc.com>
Authored: Thu Aug 13 15:27:46 2015 -0700
Committer: Purshotam Shah <pu...@yahoo-inc.com>
Committed: Thu Aug 13 15:27:46 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/oozie/BundleJobBean.java | 4 +--
.../bundle/BundleStatusTransitXCommand.java | 16 +++++++---
.../executor/jpa/BundleJobQueryExecutor.java | 9 ++++--
.../jpa/TestBundleJobQueryExecutor.java | 2 +-
.../oozie/service/TestStatusTransitService.java | 33 +++++++++++++++++++-
release-log.txt | 1 +
6 files changed, 53 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/BundleJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/BundleJobBean.java b/core/src/main/java/org/apache/oozie/BundleJobBean.java
index 9b31bd3..5868412 100644
--- a/core/src/main/java/org/apache/oozie/BundleJobBean.java
+++ b/core/src/main/java/org/apache/oozie/BundleJobBean.java
@@ -75,7 +75,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_BUNDLE_JOB_STATUS", query = "select w.statusStr from BundleJobBean w where w.id = :id"),
- @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp from BundleJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME", query = "select w.id, w.statusStr, w.pending, w.lastModifiedTimestamp, w.pauseTimestamp, w.suspendedTimestamp from BundleJobBean w where w.id = :id"),
@NamedQuery(name = "GET_BUNDLE_JOB_ID_JOBXML_CONF", query = "select w.id, w.jobXml, w.conf from BundleJobBean w where w.id = :id"),
@@ -107,7 +107,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "BULK_MONITOR_COUNT_QUERY", query = "SELECT COUNT(a) FROM CoordinatorActionBean a, CoordinatorJobBean c"),
- @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.pending = 1)"),
+ @NamedQuery(name = "GET_BUNDLE_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from BundleActionBean a , BundleJobBean w where a.lastModifiedTimestamp >= :lastModifiedTime and w.id = a.bundleId and (w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.pending = 1)"),
@NamedQuery(name = "GET_BUNDLE_JOB_FOR_USER", query = "select w.user from BundleJobBean w where w.id = :id") })
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
index fb45eb4..835777c 100644
--- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java
@@ -65,7 +65,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
protected void loadState() throws CommandException {
try {
bundleJob = BundleJobQueryExecutor.getInstance().get(
- BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId);
+ BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
bundleActions = BundleActionQueryExecutor.getInstance().getList(
BundleActionQuery.GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE, jobId);
@@ -82,7 +82,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
&& (bAction.getStatus() == Job.Status.FAILED || bAction.getStatus() == Job.Status.KILLED) ) {
new BundleKillXCommand(jobId).call();
bundleJob = BundleJobQueryExecutor.getInstance().get(
- BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, jobId);
+ BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, jobId);
bundleJob.setStatus(Job.Status.FAILED);
bundleJob.setLastModifiedTime(new Date());
BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS,
@@ -150,13 +150,16 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
@Override
protected boolean isPausedState() {
-
- if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR) {
+ //If bundle is paused then timestamp will be set.
+ //If bundleJob.getPauseTime() is not set, that means that status has to be computed from bottom-up.
+ if (bundleJob.getStatus() == Job.Status.PAUSED || bundleJob.getStatus() == Job.Status.PAUSEDWITHERROR
+ && bundleJob.getPauseTime() != null) {
return true;
}
else {
return getBottomUpPauseStatus() != null;
}
+
}
@Override
@@ -177,7 +180,10 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand {
@Override
protected boolean isSuspendedState() {
- if (bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR) {
+ //If bundle is suspended then timestamp will be set.
+ //If bundleJob.getSuspendedTimestamp() is not set, that means that status has to be computed from bottom-up.
+ if ((bundleJob.getStatus() == Job.Status.SUSPENDED || bundleJob.getStatus() == Job.Status.SUSPENDEDWITHERROR)
+ && bundleJob.getSuspendedTimestamp() != null) {
return true;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
index a770aad..e07672b 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/BundleJobQueryExecutor.java
@@ -48,7 +48,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
UPDATE_BUNDLE_JOB_PAUSE_KICKOFF,
GET_BUNDLE_JOB,
GET_BUNDLE_JOB_STATUS,
- GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME,
+ GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME,
GET_BUNDLE_JOB_ID_JOBXML_CONF,
GET_BUNDLE_IDS_FOR_STATUS_TRANSIT
};
@@ -131,7 +131,7 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case GET_BUNDLE_JOB:
- case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME:
+ case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
case GET_BUNDLE_JOB_ID_JOBXML_CONF:
case GET_BUNDLE_JOB_STATUS:
query.setParameter("id", parameters[0]);
@@ -196,13 +196,16 @@ public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJ
bean.setId((String) parameters[0]);
bean.setStatus((String) ret);
break;
- case GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME:
+ case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
bean = new BundleJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setStatus((String) arr[1]);
bean.setPending((Integer) arr[2]);
bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3]));
+ bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4]));
+ bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5]));
+
break;
case GET_BUNDLE_JOB_ID_JOBXML_CONF:
bean = new BundleJobBean();
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
index 509eedd..97cbb7f 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestBundleJobQueryExecutor.java
@@ -128,7 +128,7 @@ public class TestBundleJobQueryExecutor extends XDataTestCase {
BundleJobBean bean = this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
// GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME
BundleJobBean retBean = BundleJobQueryExecutor.getInstance().get(
- BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MODTIME, bean.getId());
+ BundleJobQuery.GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME, bean.getId());
assertEquals(bean.getId(), retBean.getId());
assertEquals(bean.getStatusStr(), retBean.getStatusStr());
assertEquals(bean.getPending(), retBean.getPending());
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
index d25a2a4..0bcbce0 100644
--- a/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
@@ -1648,7 +1648,38 @@ public class TestStatusTransitService extends XDataTestCase {
}
- static class JobLock implements Runnable {
+ public void testBundleRunningAfterCoordResume() throws Exception {
+
+ setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false");
+ services = new Services();
+ services.init();
+ CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+
+ BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true);
+ final String bundleId = bundleJob.getId();
+ addRecordToBundleActionTable(bundleId, coord.getId(), "COORD-TEST", 0, Job.Status.RUNNING);
+ new CoordSuspendXCommand(coord.getId()).call();
+
+ coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
+ assertEquals(Job.Status.SUSPENDED, coord.getStatus());
+
+ Runnable runnable = new StatusTransitRunnable();
+ runnable.run();
+ bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
+ assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus());
+
+ new CoordResumeXCommand(coord.getId()).call();
+ coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
+ assertEquals(Job.Status.RUNNING, coord.getStatus());
+
+ runnable = new StatusTransitRunnable();
+ runnable.run();
+ bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId);
+ assertEquals(Job.Status.RUNNING, bundleJob.getStatus());
+
+ }
+
+ static class JobLock implements Runnable {
String jobId;
public JobLock(String jobId) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/0bb5e136/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1f50731..729202a 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru)
OOZIE-2325 Shell action fails if user overrides oozie.launcher.mapreduce.map.env (kailongs via puru)
OOZIE-2324 A syntax error in the kill node causes the workflow to get stuck and other problems (rkanter)
OOZIE-2309 Enable the coord:dateOffset() function in /coordinator-app/datasets/dataset/@initial-instance (kailongs via rohini)