You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by di...@apache.org on 2022/11/10 09:09:23 UTC

[oozie] branch master updated: OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)

This is an automated email from the ASF dual-hosted git repository.

dionusos pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/oozie.git


The following commit(s) were added to refs/heads/master by this push:
     new 91d95bb2a OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
91d95bb2a is described below

commit 91d95bb2a295751c99490ab4ec7b5cb544d74778
Author: Denes Bodo <di...@apache.org>
AuthorDate: Thu Nov 10 09:58:58 2022 +0100

    OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
---
 .../command/MaterializeTransitionXCommand.java     |   1 +
 .../coord/CoordMaterializeTransitionXCommand.java  |  84 +++++--
 .../service/CoordMaterializeTriggerService.java    |   5 +
 core/src/main/resources/oozie-default.xml          |  10 +
 .../TestCoordMaterializeTransitionXCommand.java    | 265 +++++++++++++++++----
 ...alizeTransitionXCommandWithRunningServices.java |   2 +-
 .../java/org/apache/oozie/test/XDataTestCase.java  |   6 +-
 .../src/site/markdown/CoordinatorFunctionalSpec.md |  12 +
 release-log.txt                                    |   1 +
 9 files changed, 315 insertions(+), 71 deletions(-)

diff --git a/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
index d8c406edd..fcbb8ba4b 100644
--- a/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/MaterializeTransitionXCommand.java
@@ -26,6 +26,7 @@ package org.apache.oozie.command;
  * StartChildren() : submit or queue commands to start children
  * notifyParent() : update the status to upstream if any
  */
+@Deprecated
 public abstract class MaterializeTransitionXCommand extends TransitionXCommand<Void> {
 
     /**
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index fd178dd76..4cce7f5f1 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -30,7 +30,7 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.MaterializeTransitionXCommand;
+import org.apache.oozie.command.TransitionXCommand;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand;
 import org.apache.oozie.coord.CoordUtils;
@@ -68,9 +68,32 @@ import java.util.TimeZone;
 
 /**
  * Materialize actions for specified start and end time for coordinator job.
+ *
+ *
+ * - Mechanism when the execution's mode is LAST_ONLY or NONE:
+ *
+ * In case the Coordinator job's execution mode is LAST_ONLY or NONE, then the Coordinator action
+ * number to be materialized can be huge.
+ * This can be too much for only one CoordMaterializeTransitionXCommand to handle, as it would lead to
+ * OOM as described in OOZIE-3254.
+ * The cause of this would not only because the insertList object inside CoordMaterializeTransitionXCommand
+ * would get filled with a lot of Coordinator job objects, but the other XCommands invoked inside
+ * CoordMaterializeTransitionXCommand would store Coordinator job objects (or field(s) of them) as well until
+ * CoordMaterializeTransitionXCommand run its course.
+ * Now in order to prevent this situation to happen, the current approach only lets a certain amount of actions
+ * to be materialized within a CoordMaterializeTransitionXCommand with the default value of 10000,
+ * which can be configured through Oozie configuration defined in either oozie-default.xml or oozie-site.xml
+ * using the property name `oozie.service.CoordMaterializeTriggerService.action.batch.size`. NOTE: this "batch mode"
+ * can be turned off by setting its value to -1.
+ * Once a CoordMaterializeTransitionXCommand is finished, the CoordMaterializeTriggerService is responsible for
+ * materializing the potential remaining Coordinator actions. NOTE: the CoordMaterializeTriggerService gets
+ * triggered in every 5 minutes by default. This means if the Coordinator job's execution mode is LAST_ONLY or NONE,
+ * a maximum number of `oozie.service.CoordMaterializeTriggerService.action.batch.size` will be materialized in every
+ * 5 minutes.
+ *
  */
 @SuppressWarnings("deprecation")
-public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCommand {
+public class CoordMaterializeTransitionXCommand extends TransitionXCommand<Void> {
 
     private JPAService jpaService = null;
     private CoordinatorJobBean coordJob = null;
@@ -120,6 +143,20 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
     public void transitToNext() throws CommandException {
     }
 
+    @Override
+    protected Void execute() throws CommandException {
+        try {
+            materialize();
+            updateJob();
+            performWrites();
+            insertList.clear();
+            calcMatdTime();
+        } finally {
+            notifyParent();
+        }
+        return null;
+    }
+
     @Override
     public void updateJob() throws CommandException {
         updateList.add(new UpdateEntry(CoordJobQuery.UPDATE_COORD_JOB_MATERIALIZE,coordJob));
@@ -137,7 +174,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
                         CoordinatorXCommand.generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), null);
                     }
 
-                    // TODO: time 100s should be configurable
+                    // TODO: time 100ms should be configurable
                     queue(new CoordActionNotificationXCommand(coordAction), 100);
 
                     //Delay for input check = (nominal time - now)
@@ -336,7 +373,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
 
     }
 
-    @Override
     protected void materialize() throws CommandException {
         Instrumentation.Cron cron = new Instrumentation.Cron();
         cron.start();
@@ -376,7 +412,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
      */
     protected String materializeActions(boolean dryrun) throws Exception {
 
-        Configuration jobConf = null;
+        Configuration jobConf;
         try {
             jobConf = new XConfiguration(new StringReader(coordJob.getConf()));
         }
@@ -418,14 +454,30 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
 
         String action = null;
         int numWaitingActions = dryrun ? 0 : jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
-        int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
-        // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
-        boolean ignoreMaxActions =
-                (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
-                        coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
-                        && endMatdTime.before(new Date());
-        LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
-                + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
+
+        boolean ignoreMaxActionsToBeCreated = false;
+        int maxActionsToBeCreated = 0;
+        boolean isCoordJobIsLastOnlyOrNone = CoordinatorJob.Execution.LAST_ONLY == coordJob.getExecutionOrder()
+                || CoordinatorJob.Execution.NONE == coordJob.getExecutionOrder();
+
+        if (isCoordJobIsLastOnlyOrNone && endMatdTime.before(new Date())) {
+            int actionBatchSize = ConfigurationService.getInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE);
+            if (actionBatchSize == -1) {
+                // materialization will be done without upper limit. NOTE: can be lead to OOM.
+                ignoreMaxActionsToBeCreated = true;
+            } else {
+                // only let just a certain amount of actions to be materialized, the remaining actions to be
+                // materialized will be materialized via CoordMaterializeTriggerService.
+                maxActionsToBeCreated = actionBatchSize;
+            }
+        } else {
+            maxActionsToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
+        }
+
+        LOG.info("Coordinator job: " + coordJob.getId() + ", maxActionsToBeCreated: "
+                + (ignoreMaxActionsToBeCreated ? " unlimited" : maxActionsToBeCreated)
+                + ", Mat_Throttle: " + (ignoreMaxActionsToBeCreated ? " ignored" : coordJob.getMatThrottling())
+                + ", numWaitingActions: " + numWaitingActions);
 
         boolean isCronFrequency = false;
 
@@ -441,7 +493,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
 
         boolean firstMater = true;
 
-        while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
+        while (effStart.compareTo(end) < 0 && (ignoreMaxActionsToBeCreated || maxActionsToBeCreated-- > 0)) {
             if (pause != null && effStart.compareTo(pause) >= 0) {
                 break;
             }
@@ -494,8 +546,8 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         }
 
         if (isCronFrequency) {
-            if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) {
-                //Since we exceed the throttle, we need to move the nextMadtime forward
+            if (effStart.compareTo(end) < 0 && !(ignoreMaxActionsToBeCreated || maxActionsToBeCreated-- > 0)) {
+                //Since we exceed the throttle, we need to move the nextMatdtime forward
                 //to avoid creating duplicate actions
                 if (!firstMater) {
                     effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob));
diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
index 8f9bc2064..d8c2eff18 100644
--- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
+++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java
@@ -58,6 +58,11 @@ public class CoordMaterializeTriggerService implements Service, Instrumentable {
      * The number of callables to be queued in a batch.
      */
     public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + "callable.batch.size";
+    /**
+     * The maximum number of actions in a LAST_ONLY/NONE past batch.
+     */
+    public static final String CONF_ACTION_BATCH_SIZE = CONF_PREFIX + "action.batch.size";
+
     /**
      * The number of coordinator jobs to be picked for materialization at a given time.
      */
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index ab7b8d358..062e13c9b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -616,6 +616,16 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.CoordMaterializeTriggerService.action.batch.size</name>
+        <value>10000</value>
+        <description>
+            Maximum number of actions to be materialized in a batch in case of LAST_ONLY or NONE materialization
+            in the past. If the value is set to -1, then it will not be a maximum number of actions to be
+            materialized.
+        </description>
+    </property>
+
     <property>
         <name>oozie.service.CoordMaterializeTriggerService.materialization.system.limit</name>
         <value>50</value>
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index fcb47562b..c1a923fbb 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -51,6 +51,7 @@ import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.CoordMaterializeTriggerService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.XConfiguration;
@@ -60,9 +61,14 @@ import org.jdom2.Element;
 @SuppressWarnings("deprecation")
 public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
 
+    private static final int FOUR_HOURS_IN_SECONDS = 4 * 60 * 60;
+    private static final int NUMBER_OF_ACTIONS_MATERIALIZED_3420 = 57 * 60;
+    private static final int SIXTY_HOURS_IN_MS = 60 * 60 * 60 * 1000;
+    private static final int THREE_HOURS_IN_MS = 3 * 60 * 60 * 1000;
     private static final int TIME_IN_MIN = 60 * 1000;
     private static final int TIME_IN_HOURS = TIME_IN_MIN * 60;
     private static final int TIME_IN_DAY = TIME_IN_HOURS * 24;
+
     private JPAService jpaService;
 
     @Override
@@ -84,7 +90,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordAction(job.getId() + "@1");
     }
 
@@ -96,7 +102,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
                 CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         CoordinatorActionBean actionBean = getCoordAction(job.getId() + "@1");
         assertEquals("file://dummyhdfs/2009/05/_SUCCESS" + CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
                 + "${coord:latestRange(-1,0)}", actionBean.getMissingDependencies());
@@ -115,7 +121,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-neg-hcat.xml",
                 CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
         try {
-            new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+            new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
             fail("Expected Command exception but didn't catch any");
         }
         catch (CommandException e) {
@@ -134,7 +140,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-matd-relative.xml",
                 CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
     }
 
     public void testActionMaterWithCronFrequency1() throws Exception {
@@ -142,7 +148,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "10,20 * * * *");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
         final int expectedNominalTimeCount = 2;
@@ -165,7 +171,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "10-20 * * * *");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:11Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:12Z"),
@@ -197,7 +203,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "0/15 2 * 5-7 4,5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         final int expectedNominalTimeCount = 0;
         checkCoordActionsNominalTime(job.getId(), expectedNominalTimeCount, new Date[]{});
 
@@ -218,7 +224,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "0/15 * * 5-7 4,5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"),
@@ -243,7 +249,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "20/15 * * 5-7 4,5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
@@ -267,7 +273,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "20");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:40Z"),};
@@ -291,7 +297,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "20/15 * * 7,10 THU");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
@@ -318,7 +324,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setMatThrottling(3);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
 
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
                 DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
@@ -350,7 +356,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setTimeUnit(Timeunit.CRON);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
 
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         final String startPlusOneHour = "2013-03-10T09:00Z";
         final String startPlusTwoHours = "2013-03-10T10:00Z";
         final Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
@@ -391,7 +397,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setTimeUnit(Timeunit.CRON);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
 
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
 
         final String startPlusOneHour = "2013-03-10T09:00Z";
         final String startPlusTwoHours = "2013-03-10T10:00Z";
@@ -433,7 +439,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setTimeUnit(Timeunit.CRON);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
 
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
 
         final String startPlusOneHour = "2013-03-10T09:00Z";
         final Date[] nominalTimesWithoutDSTChange = new Date[] {DateUtils.parseDateOozieTZ(startInThePast),
@@ -449,7 +455,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "0 * * * *");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), FOUR_HOURS_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"),
                 DateUtils.parseDateOozieTZ("2013-03-10T09:00Z"),
                 DateUtils.parseDateOozieTZ("2013-03-10T10:00Z"),
@@ -475,7 +481,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2012-11-04T11:00Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 "0 * * * *");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(4)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), FOUR_HOURS_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2012-11-04T07:00Z"),
                 DateUtils.parseDateOozieTZ("2012-11-04T08:00Z"),
                 DateUtils.parseDateOozieTZ("2012-11-04T09:00Z"),
@@ -501,7 +507,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z")};
         checkCoordActionsNominalTime(job.getId(), 1, nominalTimes);
     }
@@ -511,7 +517,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         Date[] nominalTimes = new Date[] {DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"),
                 DateUtils.parseDateOozieTZ("2009-03-06T10:05Z")};
         checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
@@ -524,7 +530,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("5");
         job.setTimeUnit(Timeunit.MINUTE);
         job.setMatThrottling(20);
-        String dryRunOutput = new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime)
+        String dryRunOutput = new CoordMaterializeTransitionXCommand(job, ONE_HOUR_IN_SECONDS, startTime, endTime)
                 .materializeActions(true);
         String[] actions = dryRunOutput.split("action for new instance");
         assertEquals(3, actions.length -1);
@@ -539,7 +545,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date pauseTime = null;
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
                 pauseTime, 300, "5", Timeunit.MINUTE);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordActionsTimeout(job.getId() + "@1", 300);
     }
 
@@ -547,7 +553,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
     }
 
@@ -555,7 +561,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T01:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-02-03T23:59Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordWaiting(job.getId(), job.getMatThrottling());
     }
 
@@ -569,7 +575,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.parseDateOozieTZ("2099-02-01T01:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
     }
 
@@ -583,7 +589,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 180 * 1000));
         Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
     }
 
@@ -606,7 +612,8 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
                 + "<workflow>" + "<app-path>hdfs:///tmp/workflows/</app-path>" + "</workflow>" + "</action>"
                 + "</coordinator-app>";
         CoordinatorJobBean job = addRecordToCoordJobTable(coordXml);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
+        JPAService jpaService = Services.get().get(JPAService.class);
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         assertEquals(CoordinatorJob.Status.FAILED, job.getStatus());
         // GetActions for coord job, should be none
@@ -625,7 +632,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.toDate(new Timestamp(System.currentTimeMillis() + 360 * 1000));
         Date endTime = DateUtils.parseDateOozieTZ("2099-02-03T23:59Z");
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, startTime, endTime, false, false, 0);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.PREP);
     }
 
@@ -645,7 +652,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime());
     }
@@ -659,7 +666,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("HOUR");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         assertEquals(new Date(startTime.getTime() + TIME_IN_HOURS * 10), job.getNextMaterializedTime());
     }
@@ -674,7 +681,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -690,7 +697,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_DAY);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -706,7 +713,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("5");
         job.setTimeUnitStr("MINUTE");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_HOURS);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -721,7 +728,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_DAY);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -737,7 +744,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_DAY * 3);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -755,7 +762,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setFrequency("1");
         job.setTimeUnitStr("DAY");
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job, hoursToSeconds(1), startTime, endTime).call();
+        new CoordMaterializeTransitionXCommand(job, ONE_HOUR_IN_SECONDS, startTime, endTime).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
         Date next = new Date(startTime.getTime() + TIME_IN_DAY * 4);
         adjustExpectedMaterializationDateForDSTSwitch(next, startTime, job);
@@ -1022,8 +1029,11 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 cronFrequency);
 
-        new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
-        new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
+        new CoordMaterializeTransitionXCommand(elJob.getId(), ONE_HOUR_IN_SECONDS).call();
+        new CoordMaterializeTransitionXCommand(cronJob.getId(), ONE_HOUR_IN_SECONDS).call();
+
+
+        JPAService jpaService = Services.get().get(JPAService.class);
 
         elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
         cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
@@ -1038,7 +1048,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
     private void testCronNominalTimes (Date startTime, Date endTime, Date[] nominalTimes, String cronFrequency) throws Exception {
         CoordinatorJobBean cronJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 cronFrequency);
-        new CoordMaterializeTransitionXCommand(cronJob.getId(), TIME_IN_HOURS).call();
+        new CoordMaterializeTransitionXCommand(cronJob.getId(), ONE_HOUR_IN_SECONDS).call();
 
         cronJob = jpaService.execute(new CoordJobGetJPAExecutor(cronJob.getId()));
         checkCoordActionsNominalTime(cronJob.getId(), nominalTimes.length, nominalTimes);
@@ -1049,7 +1059,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
             throws Exception {
         CoordinatorJobBean elJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null,
                 elFrequency, elTimeUnit);
-        new CoordMaterializeTransitionXCommand(elJob.getId(), TIME_IN_HOURS).call();
+        new CoordMaterializeTransitionXCommand(elJob.getId(), ONE_HOUR_IN_SECONDS).call();
 
         elJob = jpaService.execute(new CoordJobGetJPAExecutor(elJob.getId()));
         checkCoordActionsNominalTime(elJob.getId(), nominalTimes.length, nominalTimes);
@@ -1061,10 +1071,10 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         Date startTime = DateUtils.toDate(new Timestamp(now - 180 * 60 * 1000));    // 3 hours ago
         Date endTime = DateUtils.toDate(new Timestamp(now + 180 * 60 * 1000));      // 3 hours from now
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
-                CoordinatorJob.Execution.LAST_ONLY);
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
         // This would normally materialize the throttle amount and within a 1 hour window; however, with LAST_ONLY this should
         // ignore those parameters and materialize everything in the past
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
         CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[19];
         Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
@@ -1072,27 +1082,184 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
 
         startTime = DateUtils.toDate(new Timestamp(now));                           // now
         job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
-                CoordinatorJob.Execution.LAST_ONLY);
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
         // We're starting from "now" this time (i.e. present/future), so it should materialize things normally
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
         expectedStatuses = new CoordinatorActionBean.Status[6];
         Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
         checkCoordActionsStatus(job.getId(), expectedStatuses);
     }
 
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+     * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationNonCronSingleBatch() throws Exception {
+        long now = System.currentTimeMillis();
+        Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+        Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = 5000;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+                new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 1000 action batch size.
+     * According to the expectations, this size is NOT satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationNonCronMultiBatch() throws Exception {
+        long now = System.currentTimeMillis();
+        Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+        Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = 1000;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+                new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[actionBatchSize];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with unlimited action batch size.
+     * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationNonCronWithoutBatch() throws Exception {
+        long now = System.currentTimeMillis();
+        Date startTime = DateUtils.toDate(new Timestamp(now - SIXTY_HOURS_IN_MS));
+        Date endTime = DateUtils.toDate(new Timestamp(now - THREE_HOURS_IN_MS));
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "1",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = -1;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+                new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+     * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationCronSingleBatch() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+        Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z");   // +57 hours
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = 5000;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+            new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with 5000 action batch size.
+     * According to the expectations, this size is NOT satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationCronMultiBatch() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+        Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z");   // +57 hours
+        CoordinatorJobBean  job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = 1000;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+                new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[actionBatchSize];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+    /**
+     * Tests a Coordinator job's materialization process with LAST_ONLY mode and with unlimited action batch size.
+     * According to the expectations, this size is satisfactory for the (57 * 60) actions to be materialized using a
+     * single {@link org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand}
+     **/
+    public void testTooManyLastOnlyMaterializationCronWithoutBatch() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"); // past
+        Date endTime = DateUtils.parseDateOozieTZ("2013-03-12T17:00Z");   // +57 hours
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "* * * * *",
+                Timeunit.MINUTE, CoordinatorJob.Execution.LAST_ONLY, 12);
+
+        // setting the batch size
+        int actionBatchSize = -1;
+        Services.get().getConf().setInt(CoordMaterializeTriggerService.CONF_ACTION_BATCH_SIZE, actionBatchSize);
+
+        CoordMaterializeTransitionXCommand coordMaterializeTransitionXCommand =
+                new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS);
+        coordMaterializeTransitionXCommand.call();
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[NUMBER_OF_ACTIONS_MATERIALIZED_3420];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
     public void testCurrentTimeCheck() throws Exception {
         long now = System.currentTimeMillis();
         Date startTime = DateUtils.toDate(new Timestamp(now)); // now
-        Date endTime = DateUtils.toDate(new Timestamp(now + 3 * 60 * 60 * 1000)); // 3 secondsFromHours from now
+        Date endTime = DateUtils.toDate(new Timestamp(now + THREE_HOURS_IN_MS));
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, "5",
                 20);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
 
         job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
         assertEquals(job.getLastActionNumber(), 12);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         // unfortunatily XCommand doesn't throw exception on precondition
         // assertEquals(e.getErrorCode(), ErrorCode.E1100);
         // assertTrue(e.getMessage().contains("Request is for future time. Lookup time is"));
@@ -1146,7 +1313,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         job.setEndTime(endTime);
         job.setMatThrottling(10);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
         assertEquals(job.getLastActionNumber(), 3);
 
@@ -1193,7 +1360,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
 
         job.setMatThrottling(10);
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job);
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId());
         assertEquals(4, job.getLastActionNumber());
 
@@ -1278,7 +1445,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
     private void checkCoordActionsStatus(String jobId, CoordinatorActionBean.Status[] statuses) {
         try {
             List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
-                    null, 1, 1000, false));
+                    null, 1, 100000, false));
 
             if (actions.size() != statuses.length) {
                 fail("Should have " + statuses.length + " actions created for job " + jobId + ", but has " + actions.size()
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
index 0a4c8e722..5615a2406 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommandWithRunningServices.java
@@ -55,7 +55,7 @@ public class TestCoordMaterializeTransitionXCommandWithRunningServices extends X
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
         final CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, pauseTime, "5");
-        new CoordMaterializeTransitionXCommand(job.getId(), hoursToSeconds(1)).call();
+        new CoordMaterializeTransitionXCommand(job.getId(), ONE_HOUR_IN_SECONDS).call();
         waitFor(60_000, new Predicate() {
             public boolean evaluate() throws Exception {
                 return (getStatus(job.getId()) == CoordinatorJob.Status.PAUSED);
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index ed4f9695c..db22103eb 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -119,6 +119,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
             + " <sla:dev-contact>abc@example.com</sla:dev-contact>"
             + " <sla:qa-contact>abc@example.com</sla:qa-contact>" + " <sla:se-contact>abc@example.com</sla:se-contact>"
             + "</sla:info>";
+    protected static final int ONE_HOUR_IN_SECONDS = 60 * 60;
 
     protected String bundleName;
     protected String bundleId;
@@ -165,11 +166,6 @@ public abstract class XDataTestCase extends XHCatTestCase {
         tearDown();
     }
 
-    protected int hoursToSeconds(final int hours) {
-        return new Long(java.util.concurrent.TimeUnit.HOURS.toSeconds(hours)).intValue();
-    }
-
-
     /**
      * Inserts the passed coord job
      * @param coord job bean
diff --git a/docs/src/site/markdown/CoordinatorFunctionalSpec.md b/docs/src/site/markdown/CoordinatorFunctionalSpec.md
index 5afc97eb7..eda0a18b7 100644
--- a/docs/src/site/markdown/CoordinatorFunctionalSpec.md
+++ b/docs/src/site/markdown/CoordinatorFunctionalSpec.md
@@ -1147,6 +1147,18 @@ or `READY` will be `SKIPPED` when the current time is more than the configured n
 nominal time. For example, suppose action 1 and 2 are both `READY`, the current time is 5:20pm, and both actions' nominal times are
 before 5:19pm. Both actions will become `SKIPPED`, assuming they don't transition to `SUBMITTED` (or a terminal state) before then.
 
+In case the Coordinator job's execution mode is LAST_ONLY or NONE, then the Coordinator action number to be
+materialized can be huge. This can be too much for only one CoordMaterializeTransitionXCommand to handle,
+as it would lead to OOM as described in OOZIE-3254. In order to prevent this situation to happen, the current
+approach only lets a certain amount of actions to be materialized within a CoordMaterializeTransitionXCommand
+with the default value of 10000, which can be configured through Oozie configuration defined in either oozie-default.xml
+or oozie-site.xml using the property name `oozie.service.CoordMaterializeTriggerService.action.batch.size`.
+NOTE: this "batch mode" can be turned off by setting its value to -1. Once a CoordMaterializeTransitionXCommand
+is finished, the CoordMaterializeTriggerService is responsible for materializing the potential remaining
+Coordinator actions. NOTE: the CoordMaterializeTriggerService gets triggered in every 5 minutes by default.
+This means if the Coordinator job's execution mode is LAST_ONLY or NONE, a maximum number of
+`oozie.service.CoordMaterializeTriggerService.action.batch.size` will be materialized in every 5 minutes.
+
 **<font color="#800080">Syntax: </font>**
 
 
diff --git a/release-log.txt b/release-log.txt
index 2122ccfab..56ecdb9cb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3254 [coordinator] LAST_ONLY and NONE execution modes: possible OutOfMemoryError when there are too many coordinator actions to materialize (jmakai via dionusos)
 OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
 OOZIE-3661 Oozie cannot handle environment variables with key=value content (dionusos via asalamon74)
 OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via dionusos)