You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by di...@apache.org on 2022/11/10 09:09:23 UTC
[oozie] branch master updated: OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
This is an automated email from the ASF dual-hosted git repository.
dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git
The following commit(s) were added to refs/heads/master by this push:
new 91d95bb2a OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
91d95bb2a is described below
commit 91d95bb2a295751c99490ab4ec7b5cb544d74778
Author: Denes Bodo <di...@apache.org>
AuthorDate: Thu Nov 10 09:58:58 2022 +0100
OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
---
.../command/MaterializeTransitionXCommand.java | 1 +
.../coord/CoordMaterializeTransitionXCommand.java | 84 +++++--
.../service/CoordMaterializeTriggerService.java | 5 +
core/src/main/resources/oozie-default.xml | 10 +
.../TestCoordMaterializeTransitionXCommand.java | 265 +++++++++++++++++----
...alizeTransitionXCommandWithRunningServices.java | 2 +-
.../java/org/apache/oozie/test/XDataTestCase.java | 6 +-
.../src/site/markdown/CoordinatorFunctionalSpec.md | 12 +
release-log.txt | 1 +
9 files changed, 315 insertions(+), 71 deletions(-)
diff --git a/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
index d8c406edd..fcbb8ba4b 100644
--- a/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
@@ -26,6 +26,7 @@ package org.apache.oozie.command;
* StartChildren() : submit or queue commands to start children
* notifyParent() : update the status to upstream if any
*/
+@Deprecated
public abstract class MaterializeTransitionXCommand extends TransitionXCommand<Void> {
/**
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index fd178dd76..4cce7f5f1 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -30,7 +30,7 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.SLAEvent.SlaAppType;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.MaterializeTransitionXCommand;
+import org.apache.oozie.command.TransitionXCommand;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
import org.apache.oozie.coord.CoordUtils;
@@ -68,9 +68,32 @@ import java.util.TimeZone;
/**
* Materialize actions for specified start and end time for coordinator job.
+ *
+ *
+ * - Mechanism when the execution's mode is LAST_ONLY or NONE:
+ *
+ * In case the Coordinator job's execution mode is LAST_ONLY or NONE, then the Coordinator action
+ * number to be materialized can be huge.
+ * This can be too much for only one CoordMaterializeTransitionXCommand to handle, as it would lead to
+ * OOM as described in OOZIE-3254.
+ * The cause of this would not only because the insertList object inside CoordMaterializeTransitionXCommand
+ * would get filled with a lot of Coordinator job objects, but the other XCommands invoked inside
+ * CoordMaterializeTransitionXCommand would store Coordinator job objects (or field(s) of them) as well until
+ * CoordMaterializeTransitionXCommand run its course.
+ * Now in order to prevent this situation to happen, the current approach only lets a certain amount of actions
+ * to be materialized within a CoordMaterializeTransitionXCommand with the default value of 10000,
+ * which can be configured through Oozie configuration defined in either oozie-default.xml or oozie-site.xml
+ * using the property name `oozie.service.CoordMaterializeTriggerService.action.batch.size`. NOTE: this "batch mode"
+ * can be turned off by setting its value to -1.
+ * Once a CoordMaterializeTransitionXCommand is finished, the CoordMaterializeTriggerService is responsible for
+ * materializing the potential remaining Coordinator actions. NOTE: the CoordMaterializeTriggerService gets
+ * triggered in every 5 minutes by default. This means if the Coordinator job's execution mode is LAST_ONLY or NONE,
+ * a maximum number of `oozie.service.CoordMaterializeTriggerService.action.batch.size` will be materialized in every
+ * 5 minutes.
+ *
*/
@SuppressWarnings("deprecation")
-public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
+public class CoordMaterializeTransitionXCommand extends TransitionXCommand<Void> {
private JPAService jpaService = null;
private CoordinatorJobBean coordJob = null;
@@ -120,6 +143,20 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
public void transitToNext() throws CommandException {
}
+ @Override
+ protected Void execute() throws CommandException {
+ try {
+ materialize();
+ updateJob();
+ performWrites();
+ insertList.clear();
+ calcMatdTime();
+ } finally {
+ notifyParent();
+ }
+ return null;
+ }
+
@Override
public void updateJob() throws CommandException {
updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
@@ -137,7 +174,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
}
- // TODO: time 100s should be configurable
+ // TODO: time 100ms should be configurable
queue(new CoordActionNotificationXCommand(coordAction), 100);
//Delay for input check = (nominal time - now)
@@ -336,7 +373,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
- @Override
protected void materialize() throws CommandException {
Instrumentation.Cron cron = new Instrumentation.Cron();
cron.start();
@@ -376,7 +412,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
*/
protected String materializeActions(boolean dryrun) throws Exception {
- Configuration jobConf = null;
+ Configuration jobConf;
try {
jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
}
@@ -418,14 +454,30 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
String action = null;
int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
- int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
- // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
- boolean ignoreMaxActions =
- (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
- coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
- && endMatdTime.before(new Date());
- LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
- + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
+
+ boolean ignoreMaxActionsToBeCreated = false;
+ int maxActionsToBeCreated = 0;
+ boolean isCoordJobIsLastOnlyOrNone = CoordinatorJob.Execution.LAST_ONLY == coordJob.getExecutionOrder()
+ || CoordinatorJob.Execution.NONE == coordJob.getExecutionOrder();
+
+ if (isCoordJobIsLastOnlyOrNone && endMatdTime.before(new Date())) {
+ int actionBatchSize = ConfigurationService.getInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE);
+ if (actionBatchSize == -1) {
+ // materialization will be done without upper limit. NOTE: can be lead to OOM.
+ ignoreMaxActionsToBeCreated = true;
+ } else {
+ // only let just a certain amount of actions to be materialized, the remaining actions to be
+ // materialized will be materialized via CoordMaterializeTriggerService.
+ maxActionsToBeCreated = actionBatchSize;
+ }
+ } else {
+ maxActionsToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
+ }
+
+ LOG.info("Coordinator job: " + coordJob.getId() + ", maxActionsToBeCreated: "
+ + (ignoreMaxActionsToBeCreated ? " unlimited" : maxActionsToBeCreated)
+ + ", Mat_Throttle: " + (ignoreMaxActionsToBeCreated ? " ignored" : coordJob.getMatThrottling())
+ + ", numWaitingActions: " + numWaitingActions);
boolean isCronFrequency = false;
@@ -441,7 +493,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
boolean firstMater = true;
- while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
+ while (effStart.compareTo(end) < 0 && (ignoreMaxActionsToBeCreated || maxActionsToBeCreated-- > 0)) {
if (pause != null && effStart.compareTo(pause) >= 0) {
break;
}
@@ -494,8 +546,8 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
if (isCronFrequency) {
- if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
- //Since we exceed the throttle, we need to move the nextMadtime forward
+ if (effStart.compareTo(end) < 0 && !(ignoreMaxActionsToBeCreated || maxActionsToBeCreated-- > 0)) {
+ //Since we exceed the throttle, we need to move the nextMatdtime forward
//to avoid creating duplicate actions
if (!firstMater) {
effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 8f9bc2064..d8c2eff18 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -58,6 +58,11 @@ public class CoordMaterializeTriggerService implements Service, Instrumentable {
* The number of callables to be queued in a batch.
*/
public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
+ /**
+ * The maximum number of actions in a LAST_ONLY/NONE past batch.
+ */
+ public static final String CONF_ACTION_BATCH_SIZE = CONF_PREFIX + "action.batch.size";
+
/**
* The number of coordinator jobs to be picked for materialization at a given time.
*/
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index ab7b8d358..062e13c9b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -616,6 +616,16 @@
</description>
</property>
+ <property>
+ <name>oozie.service.CoordMaterializeTriggerService.action.batch.size</name>
+ <value>10000</value>
+ <description>
+ Maximum number of actions to be materialized in a batch in case of LAST_ONLY or NONE materialization
+ in the past. If the value is set to -1, then it will not be a maximum number of actions to be
+ materialized.
+ </description>
+ </property>
+
<property>
<name>oozie.service.CoordMaterializeTriggerService.materialization.system.limit</name>
<value>50</value>
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index fcb47562b..c1a923fbb 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -51,6 +51,7 @@ import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XConfiguration;
@@ -60,9 +61,14 @@ import org.jdom2.Element;
@SuppressWarnings("deprecation")
public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
+ private static final int FOUR_HOURS_IN_SECONDS = 4 * 60 * 60;
+ private static final int NUMBER_OF_ACTIONS_MATERIALIZED_3420 = 57 * 60;
+ private static final int SIXTY_HOURS_IN_MS = 60 * 60 * 60 * 1000;
+ private static final int THREE_HOURS_IN_MS = 3 * 60 * 60 * 1000;
private static final int TIME_IN_MIN = 60 * 1000;
private static final int TIME_IN_HOURS = TIME_IN_MIN * 60;
private static final int TIME_IN_DAY = TIME_IN_HOURS * 24;
+
private JPAService jpaService;
@Override
@@ -84,7 +90,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordAction(job.getId() + "@1");
}
@@ -96,7 +102,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
CoordinatorActionBean actionBean = getCoordAction(job.getId() + "@1");
assertEquals("file://dummyhdfs/2009/05/_SUCCESS" + CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ "${coord:latestRange(-1,0)}", actionBean.getMissingDependencies());
@@ -115,7 +121,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-neg-hcat.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
try {
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
fail("Expected Command exception but didn't catch any");
}
catch (CommandException e) {
@@ -134,7 +140,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-relative.xml",
CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
}
public void testActionMaterWithCronFrequency1() throws Exception {
@@ -142,7 +148,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"10,20 * * * *");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
final int expectedNominalTimeCount = 2;
@@ -165,7 +171,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"10-20 * * * *");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:11Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:12Z"),
@@ -197,7 +203,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0/15 2 * 5-7 4,5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
final int expectedNominalTimeCount = 0;
checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, new Date[]{});
@@ -218,7 +224,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0/15 * * 5-7 4,5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"),
@@ -243,7 +249,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20/15 * * 5-7 4,5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
@@ -267,7 +273,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:40Z"),};
@@ -291,7 +297,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"20/15 * * 7,10 THU");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
@@ -318,7 +324,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setMatThrottling(3);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
@@ -350,7 +356,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final String startPlusTwoHours = "2013-03-10T10:00Z";
final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
@@ -391,7 +397,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final String startPlusTwoHours = "2013-03-10T10:00Z";
@@ -433,7 +439,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setTimeUnit(Timeunit.CRON);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
final String startPlusOneHour = "2013-03-10T09:00Z";
final Date[] nominalTimesWithoutDSTChange = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
@@ -449,7 +455,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0 * * * *");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), FOUR_HOURS_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"),
DateUtils.parseDateOozieTZ("2013-03-10T09:00Z"),
DateUtils.parseDateOozieTZ("2013-03-10T10:00Z"),
@@ -475,7 +481,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2012-11-04T11:00Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
"0 * * * *");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), FOUR_HOURS_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2012-11-04T07:00Z"),
DateUtils.parseDateOozieTZ("2012-11-04T08:00Z"),
DateUtils.parseDateOozieTZ("2012-11-04T09:00Z"),
@@ -501,7 +507,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z")};
checkCoordActionsNominalTime(job.getId(), 1, nominalTimes);
}
@@ -511,7 +517,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"),
DateUtils.parseDateOozieTZ("2009-03-06T10:05Z")};
checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
@@ -524,7 +530,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("5");
job.setTimeUnit(Timeunit.MINUTE);
job.setMatThrottling(20);
- String dryRunOutput = new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime)
+ String dryRunOutput = new CoordMaterializeTransitionXCommand(job, ONE_HOUR_IN_SECONDS, startTime, endTime)
.materializeActions(true);
String[] actions = dryRunOutput.split("action for new instance");
assertEquals(3, actions.length -1);
@@ -539,7 +545,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date pauseTime = null;
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime, 300, "5", Timeunit.MINUTE);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordActionsTimeout(job.getId() + "@1", 300);
}
@@ -547,7 +553,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
}
@@ -555,7 +561,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordWaiting(job.getId(), job.getMatThrottling());
}
@@ -569,7 +575,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.parseDateOozieTZ("2099-02-01T01:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
}
@@ -583,7 +589,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 180 * 1000));
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
}
@@ -606,7 +612,8 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
+ "<workflow>" + "<app-path>hdfs:///tmp/workflows/</app-path>" + "</workflow>" + "</action>"
+ "</coordinator-app>";
CoordinatorJobBean job = addRecordToCoordJobTable(coordXml);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
+ JPAService jpaService = Services.get().get(JPAService.class);
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(CoordinatorJob.Status.FAILED, job.getStatus());
// GetActions for coord job, should be none
@@ -625,7 +632,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 360 * 1000));
Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
}
@@ -645,7 +652,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
}
@@ -659,7 +666,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("HOUR");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime());
}
@@ -674,7 +681,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -690,7 +697,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -706,7 +713,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("5");
job.setTimeUnitStr("MINUTE");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_HOURS);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -721,7 +728,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -737,7 +744,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -755,7 +762,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setFrequency("1");
job.setTimeUnitStr("DAY");
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime).call();
+ new CoordMaterializeTransitionXCommand(job, ONE_HOUR_IN_SECONDS, startTime, endTime).call();
job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
Date next = new Date(startTime.getTime() + TIME_IN_DAY * 4);
adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -1022,8 +1029,11 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
cronFrequency);
- new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
- new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
+ new CoordMaterializeTransitionXCommand(elJob.getId(), ONE_HOUR_IN_SECONDS).call();
+ new CoordMaterializeTransitionXCommand(cronJob.getId(), ONE_HOUR_IN_SECONDS).call();
+
+
+ JPAService jpaService = Services.get().get(JPAService.class);
elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
@@ -1038,7 +1048,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
private void testCronNominalTimes (Date startTime, Date endTime, Date[] nominalTimes, String cronFrequency) throws Exception {
CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
cronFrequency);
- new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
+ new CoordMaterializeTransitionXCommand(cronJob.getId(), ONE_HOUR_IN_SECONDS).call();
cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
checkCoordActionsNominalTime(cronJob.getId(), nominalTimes.length, nominalTimes);
@@ -1049,7 +1059,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
throws Exception {
CoordinatorJobBean elJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
elFrequency, elTimeUnit);
- new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
+ new CoordMaterializeTransitionXCommand(elJob.getId(), ONE_HOUR_IN_SECONDS).call();
elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
checkCoordActionsNominalTime(elJob.getId(), nominalTimes.length, nominalTimes);
@@ -1061,10 +1071,10 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
Date startTime = DateUtils.toDate(new Timestamp(now - 180 * 60 * 1000)); // 3 hours ago
Date endTime = DateUtils.toDate(new Timestamp(now + 180 * 60 * 1000)); // 3 hours from now
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
- CoordinatorJob.Execution.LAST_ONLY);
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
// This would normally materialize the throttle amount and within a 1 hour window; however, with LAST_ONLY this should
// ignore those parameters and materialize everything in the past
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[19];
Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
@@ -1072,27 +1082,184 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
startTime = DateUtils.toDate(new Timestamp(now)); // now
job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
- CoordinatorJob.Execution.LAST_ONLY);
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
// We're starting from "now" this time (i.e. present/future), so it should materialize things normally
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
expectedStatuses = new CoordinatorActionBean.Status[6];
Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
checkCoordActionsStatus(job.getId(), expectedStatuses);
}
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+ * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationNonCronSingleBatch() throws Exception {
+ long now = System.currentTimeMillis();
+ Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+ Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = 5000;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 1000 action batch size.
+ * According to the expectations, this size is NOT satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationNonCronMultiBatch() throws Exception {
+ long now = System.currentTimeMillis();
+ Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+ Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = 1000;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[actionBatchSize];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with unlimited action batch size.
+ * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationNonCronWithoutBatch() throws Exception {
+ long now = System.currentTimeMillis();
+ Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+ Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = -1;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+ * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationCronSingleBatch() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+ Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z"); // +57 hours
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = 5000;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+ * According to the expectations, this size is NOT satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationCronMultiBatch() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+ Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z"); // +57 hours
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = 1000;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[actionBatchSize];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+ /**
+ * Tests a Coordinator job's materialization process with LAST_ONLY mode and with unlimited action batch size.
+ * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+ * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+ **/
+ public void testTooManyLastOnlyMaterializationCronWithoutBatch() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+ Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z"); // +57 hours
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+ Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+ // setting the batch size
+ int actionBatchSize = -1;
+ Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+ CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+ coordMaterializeTransitionXCommand.call();
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
public void testCurrentTimeCheck() throws Exception {
long now = System.currentTimeMillis();
Date startTime = DateUtils.toDate(new Timestamp(now)); // now
- Date endTime = DateUtils.toDate(new Timestamp(now + 3 * 60 * 60 * 1000)); // 3 secondsFromHours from now
+ Date endTime = DateUtils.toDate(new Timestamp(now + THREE_HOURS_IN_MS));
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, "5",
20);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(job.getLastActionNumber(), 12);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
// unfortunatily XCommand doesn't throw exception on precondition
// assertEquals(e.getErrorCode(), ErrorCode.E1100);
// assertTrue(e.getMessage().contains("Request is for future time. Lookup time is"));
@@ -1146,7 +1313,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setEndTime(endTime);
job.setMatThrottling(10);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(job.getLastActionNumber(), 3);
@@ -1193,7 +1360,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
job.setMatThrottling(10);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
assertEquals(4, job.getLastActionNumber());
@@ -1278,7 +1445,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
private void checkCoordActionsStatus(String jobId, CoordinatorActionBean.Status[] statuses) {
try {
List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
- null, 1, 1000, false));
+ null, 1, 100000, false));
if (actions.size() != statuses.length) {
fail("Should have " + statuses.length + " actions created for job " + jobId + ", but has " + actions.size()
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
index 0a4c8e722..5615a2406 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
@@ -55,7 +55,7 @@ public class TestCoordMaterializeTransitionXCommandWithRunningServices extends X
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
final CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
- new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+ new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
waitFor(60_000, new Predicate() {
public boolean evaluate() throws Exception {
return (getStatus(job.getId()) == CoordinatorJob.Status.PAUSED);
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index ed4f9695c..db22103eb 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -119,6 +119,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
+ " <sla:dev-contact>abc@example.com</sla:dev-contact>"
+ " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>"
+ "</sla:info>";
+ protected static final int ONE_HOUR_IN_SECONDS = 60 * 60;
protected String bundleName;
protected String bundleId;
@@ -165,11 +166,6 @@ public abstract class XDataTestCase extends XHCatTestCase {
tearDown();
}
- protected int hoursToSeconds(final int hours) {
- return new Long(java.util.concurrent.TimeUnit.HOURS.toSeconds(hours)).intValue();
- }
-
-
/**
* Inserts the passed coord job
* @param coord job bean
diff --git a/docs/src/site/markdown/CoordinatorFunctionalSpec.md b/docs/src/site/markdown/CoordinatorFunctionalSpec.md
index 5afc97eb7..eda0a18b7 100644
--- a/docs/src/site/markdown/CoordinatorFunctionalSpec.md
+++ b/docs/src/site/markdown/CoordinatorFunctionalSpec.md
@@ -1147,6 +1147,18 @@ or `READY` will be `SKIPPED` when the current time is more than the configured n
nominal time. For example, suppose action 1 and 2 are both `READY`, the current time is 5:20pm, and both actions' nominal times are
before 5:19pm. Both actions will become `SKIPPED`, assuming they don't transition to `SUBMITTED` (or a terminal state) before then.
+In case the Coordinator job's execution mode is LAST_ONLY or NONE, then the Coordinator action number to be
+materialized can be huge. This can be too much for only one CoordMaterializeTransitionXCommand to handle,
+as it would lead to OOM as described in OOZIE-3254. In order to prevent this situation to happen, the current
+approach only lets a certain amount of actions to be materialized within a CoordMaterializeTransitionXCommand
+with the default value of 10000, which can be configured through Oozie configuration defined in either oozie-default.xml
+or oozie-site.xml using the property name `oozie.service.CoordMaterializeTriggerService.action.batch.size`.
+NOTE: this "batch mode" can be turned off by setting its value to -1. Once a CoordMaterializeTransitionXCommand
+is finished, the CoordMaterializeTriggerService is responsible for materializing the potential remaining
+Coordinator actions. NOTE: the CoordMaterializeTriggerService gets triggered in every 5 minutes by default.
+This means if the Coordinator job's execution mode is LAST_ONLY or NONE, a maximum number of
+`oozie.service.CoordMaterializeTriggerService.action.batch.size` will be materialized in every 5 minutes.
+
**<font color="#800080">Syntax: </font>**
diff --git a/release-log.txt b/release-log.txt
index 2122ccfab..56ecdb9cb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
OOZIE-3661 Oozie cannot handle environment variables with key=value content (dionusos via asalamon74)
OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via dionusos)