You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2015/07/07 19:33:07 UTC
oozie git commit: OOZIE-2285 Change in concurrency should trigger
coord action ready command (kailongs via rohini)
Repository: oozie
Updated Branches:
refs/heads/master d0921f691 -> 9959e2ca0
OOZIE-2285 Change in concurrency should trigger coord action ready command (kailongs via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9959e2ca
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9959e2ca
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9959e2ca
Branch: refs/heads/master
Commit: 9959e2ca0b2aa71f86da7b68b4905f670680b20c
Parents: d0921f6
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Tue Jul 7 10:31:27 2015 -0700
Committer: Rohini Palaniswamy <ro...@apache.org>
Committed: Tue Jul 7 10:31:27 2015 -0700
----------------------------------------------------------------------
.../command/coord/CoordChangeXCommand.java | 14 +++++---
.../command/coord/TestCoordChangeXCommand.java | 37 +++++++++++++++++---
release-log.txt | 1 +
3 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 00c547d..d060859 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -39,17 +39,17 @@ import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor;
-import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
@@ -64,6 +64,7 @@ import org.apache.oozie.util.StatusUtils;
public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
private final String jobId;
private Date newEndTime = null;
+ private Integer oldConcurrency = null;
private Integer newConcurrency = null;
private Date newPauseTime = null;
private Date oldPauseTime = null;
@@ -344,6 +345,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
LOG.info("STARTED CoordChangeXCommand for jobId=" + jobId);
try {
+ oldConcurrency = this.coordJob.getConcurrency();
if (newEndTime != null) {
// during coord materialization, nextMaterializedTime is set to
// startTime + n(actions materialized) * frequency and this can be AFTER endTime,
@@ -452,6 +454,10 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> {
updateList.add(new UpdateEntry<CoordJobQuery>(CoordJobQuery.UPDATE_COORD_JOB_CHANGE, coordJob));
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, deleteList);
+ if (newConcurrency != null && newConcurrency > oldConcurrency) {
+ queue(new CoordActionReadyXCommand(jobId));
+ }
+
return null;
}
catch (XException ex) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index 7c154c8..3a91aa5 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -29,20 +29,21 @@ import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Execution;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
-import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionByActionNumberJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
-import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
+import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StatusTransitService;
@@ -628,6 +629,34 @@ public class TestCoordChangeXCommand extends XDataTestCase {
assertNotNull(slaSummaryBean1);
}
+ public void testCoordChangeConcurrency() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-08-01T04:59Z");
+ final CoordinatorJobBean job = addRecordToCoordJobTableForPauseTimeTest(CoordinatorJob.Status.RUNNING,
+ startTime, endTime, endTime, true, false, 4);
+ CoordinatorActionBean ca1 = addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean ca2 = addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean ca3 = addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.READY,
+ "coord-action-get.xml", 0);
+ CoordinatorActionBean ca4 = addRecordToCoordActionTable(job.getId(), 4, CoordinatorAction.Status.READY,
+ "coord-action-get.xml", 0);
+ new CoordChangeXCommand(job.getId(), "concurrency=4").call();
+ Thread.sleep(100);
+ ca1 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION,
+ job.getId() + "@1");
+ ca2 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION,
+ job.getId() + "@2");
+ ca3 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION,
+ job.getId() + "@3");
+ ca4 = CoordActionQueryExecutor.getInstance().get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION,
+ job.getId() + "@4");
+ assertEquals(ca1.getStatusStr(), CoordinatorAction.Status.RUNNING.toString());
+ assertEquals(ca2.getStatusStr(), CoordinatorAction.Status.RUNNING.toString());
+ assertFalse(ca3.getStatus().equals(CoordinatorAction.Status.READY));
+ assertFalse(ca4.getStatus().equals(CoordinatorAction.Status.READY));
+ }
// Checks that RUNNING coord action is not deleted
public void testChangeTimeDeleteRunning() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2013-08-01T00:00Z");
http://git-wip-us.apache.org/repos/asf/oozie/blob/9959e2ca/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 521a6ca..73ef9c7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2285 Change in concurrency should trigger coord action ready command (kailongs via rohini)
OOZIE-2284 HBaseCredentials should only add hbase-default.xml and hbase-site.xml to actionConf (rohini)
OOZIE-2286 Update Log4j and Log4j-extras to latest 1.2.x release (rkanter)
OOZIE-2250 Show log for WAITING and TIMEDOUT coord actions (kailongs via rohini)