You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/05/30 23:38:18 UTC

git commit: OOZIE-1319 LAST_ONLY in execution control for coordinator job still runs all the actions (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master ee8ef73f2 -> 18401e614


OOZIE-1319 LAST_ONLY in execution control for coordinator job still runs all the actions (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/18401e61
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/18401e61
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/18401e61

Branch: refs/heads/master
Commit: 18401e614b581711477d63bf6b4a56f8e739dbf0
Parents: ee8ef73
Author: Robert Kanter <rk...@cloudera.com>
Authored: Fri May 30 14:35:53 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Fri May 30 14:35:53 2014 -0700

----------------------------------------------------------------------
 .../apache/oozie/client/CoordinatorAction.java  |   3 +-
 .../org/apache/oozie/CoordinatorActionBean.java |   2 +-
 .../org/apache/oozie/CoordinatorJobBean.java    |   4 +-
 .../coord/CoordActionInputCheckXCommand.java    |  56 +++++++++
 .../command/coord/CoordActionSkipXCommand.java  | 115 +++++++++++++++++++
 .../CoordMaterializeTransitionXCommand.java     |  13 ++-
 .../jpa/CoordJobGetReadyActionsJPAExecutor.java |  13 +--
 .../executor/jpa/CoordJobQueryExecutor.java     |   6 +
 .../oozie/service/StatusTransitService.java     |  10 +-
 .../org/apache/oozie/TestCoordinatorEngine.java |   4 +-
 .../TestCoordActionInputCheckXCommand.java      |  76 ++++++++++++
 .../coord/TestCoordActionSkipXCommand.java      |  92 +++++++++++++++
 .../TestCoordMaterializeTransitionXCommand.java |  56 +++++++++
 .../TestCoordJobGetReadyActionsJPAExecutor.java |  82 +++++++++++--
 .../executor/jpa/TestCoordJobQueryExecutor.java |   7 ++
 .../org/apache/oozie/test/XDataTestCase.java    |  16 ++-
 .../site/twiki/CoordinatorFunctionalSpec.twiki  |  21 +++-
 release-log.txt                                 |   1 +
 18 files changed, 539 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java b/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
index ba17c5b..100823f 100644
--- a/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
+++ b/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
@@ -37,7 +37,8 @@ public interface CoordinatorAction {
         SUCCEEDED,
         KILLED,
         FAILED,
-        IGNORED
+        IGNORED,
+        SKIPPED
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 43d5103..8cbcc4f 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -91,7 +91,7 @@ import org.json.simple.JSONObject;
         @NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
         // Select query used by ActionInfo command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
-        // Select Query used by Timeout command
+        // Select Query used by Timeout and skip commands
         @NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
         // Select query used by InputCheck command
         @NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index b77082c..8fd53f1 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -85,13 +85,13 @@ import org.json.simple.JSONObject;
 
         @NamedQuery(name = "GET_COORD_JOB_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_JOB_INPUT_CHECK", query = "select w.user, w.appName, w.statusStr, w.appNamespace from CoordinatorJobBean w where w.id = :id"),
+        @NamedQuery(name = "GET_COORD_JOB_INPUT_CHECK", query = "select w.user, w.appName, w.statusStr, w.appNamespace, w.execution, w.frequency, w.timeUnitStr, w.timeZone, w.endTimestamp from CoordinatorJobBean w where w.id = :id"),
 
         @NamedQuery(name = "GET_COORD_JOB_ACTION_READY", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.execution, w.concurrency from CoordinatorJobBean w where w.id = :id"),
 
         @NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"),
 
-        @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr from CoordinatorJobBean w where w.id = :id"),
+        @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr, w.execution from CoordinatorJobBean w where w.id = :id"),
 
         @NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index e8667c1..19f867e 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
 
@@ -35,6 +37,7 @@ import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.coord.CoordELEvaluator;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.TimeUnit;
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.dependency.URIHandlerException;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
@@ -88,6 +91,40 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
         this.jobId = jobId;
     }
 
+    /**
+     * Computes the nominal time of the next action.
+     * Based on CoordMaterializeTransitionXCommand#materializeActions
+     *
+     * @return the nominal time of the next action
+     * @throws ParseException
+     */
+    private Date computeNextNominalTime() throws ParseException {
+        Date nextNominalTime;
+        boolean isCronFrequency = false;
+        int freq = -1;
+        try {
+            freq = Integer.parseInt(coordJob.getFrequency());
+        } catch (NumberFormatException e) {
+            isCronFrequency = true;
+        }
+
+        if (isCronFrequency) {
+            nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob);
+        } else {
+            Calendar nextNominalTimeCal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
+            nextNominalTimeCal.setTime(coordAction.getNominalTime());
+            TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
+            nextNominalTimeCal.add(freqTU.getCalendarUnit(), freq);
+            nextNominalTime = nextNominalTimeCal.getTime();
+        }
+
+        // If the next nominal time is after the job's end time, then this is the last action, so return null
+        if (nextNominalTime.after(coordJob.getEndTime())) {
+            nextNominalTime = null;
+        }
+        return nextNominalTime;
+    }
+
     @Override
     protected Void execute() throws CommandException {
         LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
@@ -113,6 +150,25 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
         try {
             Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
             cron.start();
+            if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
+                Date now = new Date();
+                Date nextNominalTime = computeNextNominalTime();
+                if (nextNominalTime != null) {
+                    // If the current time is after the next action's nominal time, then we've passed the window where this action
+                    // should be started; so set it to SKIPPED
+                    if (now.after(nextNominalTime)) {
+                        LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
+                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
+                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
+                        queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
+                        return null;
+                    } else {
+                        LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
+                                + "the nominal time [{2}] of the next action]", coordAction.getId(),
+                                DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
+                    }
+                }
+            }
             StringBuilder existList = new StringBuilder();
             StringBuilder nonExistList = new StringBuilder();
             StringBuilder nonResolvedList = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
new file mode 100644
index 0000000..c774f8e
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.CoordActionGetForTimeoutJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * This class sets a Coordinator action's status to SKIPPED
+ */
+public class CoordActionSkipXCommand extends CoordinatorXCommand<Void> {
+    private CoordinatorActionBean actionBean;
+    private String user;
+    private String appName;
+    private JPAService jpaService = null;
+
+    public CoordActionSkipXCommand(CoordinatorActionBean actionBean, String user, String appName) {
+        super("coord_action_timeout", "coord_action_timeout", 1);
+        this.actionBean = ParamChecker.notNull(actionBean, "ActionBean");
+        this.user = ParamChecker.notEmpty(user, "user");
+        this.appName = ParamChecker.notEmpty(appName, "appName");
+    }
+
+    @Override
+    protected Void execute() throws CommandException {
+        if (actionBean.getStatus() == CoordinatorAction.Status.WAITING
+                || actionBean.getStatus() == CoordinatorAction.Status.READY) {
+            LOG.info("Setting action [{0}] status to SKIPPED", actionBean.getId());
+            actionBean.setStatus(CoordinatorAction.Status.SKIPPED);
+            try {
+                queue(new CoordActionNotificationXCommand(actionBean), 100);
+                actionBean.setLastModifiedTime(new Date());
+                CoordActionQueryExecutor.getInstance().executeUpdate(
+                        CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
+                if (EventHandlerService.isEnabled()) {
+                    generateEvent(actionBean, user, appName, null);
+                }
+            }
+            catch (JPAExecutorException e) {
+                throw new CommandException(e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public String getEntityKey() {
+        return actionBean.getJobId();
+    }
+
+    @Override
+    public String getKey() {
+        return getName() + "_" + actionBean.getId();
+    }
+
+    @Override
+    protected boolean isLockRequired() {
+        return true;
+    }
+
+    @Override
+    protected void loadState() throws CommandException {
+        jpaService = Services.get().get(JPAService.class);
+        if (jpaService == null) {
+            throw new CommandException(ErrorCode.E0610);
+        }
+
+        try {
+            actionBean = jpaService.execute(new CoordActionGetForTimeoutJPAExecutor(actionBean.getId()));
+        }
+        catch (JPAExecutorException e) {
+            throw new CommandException(e);
+        }
+        LogUtils.setLogInfo(actionBean, logInfo);
+    }
+
+    @Override
+    protected void verifyPrecondition() throws CommandException, PreconditionException {
+        if (!(actionBean.getStatus() == CoordinatorAction.Status.WAITING
+                || actionBean.getStatus() == CoordinatorAction.Status.READY)) {
+            throw new PreconditionException(ErrorCode.E1100, "The coord action must have status "
+                    + CoordinatorAction.Status.WAITING + " or " + CoordinatorAction.Status.READY
+                    + " but has status [" + actionBean.getStatus() + "]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 7b46ad6..515e247 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -219,7 +219,8 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
 
     /**
      * Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
-     * Mataterilized Time = startMatdTime + MatThrottling * frequency
+     * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which
+     * case it returns now (to materialize all actions in the past)
      *
      * @param currentMatTime
      * @return
@@ -230,6 +231,9 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         if (currentMatTime.after(new Date())) {
             return currentMatTime;
         }
+        if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY)) {
+            return new Date();
+        }
         int frequency = 0;
         try {
             frequency = Integer.parseInt(coordJob.getFrequency());
@@ -397,7 +401,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         // Move to the End of duration, if needed.
         DateUtils.moveToEnd(origStart, endOfFlag);
 
-        Date effStart = (Date) startMatdTime.clone();
         StringBuilder actionStrings = new StringBuilder();
         Date jobPauseTime = coordJob.getPauseTime();
         Calendar pause = null;
@@ -407,9 +410,11 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         }
 
         String action = null;
-        JPAService jpaService = Services.get().get(JPAService.class);
         int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
         int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
+        // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
+        boolean ignoreMaxActions =
+                coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) && endMatdTime.before(new Date());
         LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
                 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
 
@@ -422,7 +427,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
         }
 
         boolean firstMater = true;
-        while (start.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
+        while (start.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
             if (pause != null && start.compareTo(pause) >= 0) {
                 break;
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
index d22b478..42dbca8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
@@ -59,20 +59,13 @@ public class CoordJobGetReadyActionsJPAExecutor implements JPAExecutor<List<Coor
             if (executionOrder.equalsIgnoreCase("FIFO")) {
                 q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
             }
-            else {
+            else {      // LIFO or LAST_ONLY
                 q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
             }
             q.setParameter("jobId", coordJobId);
 
-            // if executionOrder is LAST_ONLY, only retrieve first record in LIFO,
-            // otherwise, use numResults if it is positive.
-            if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
-                q.setMaxResults(1);
-            }
-            else {
-                if (numResults > 0) {
-                    q.setMaxResults(numResults);
-                }
+            if (numResults > 0) {
+                q.setMaxResults(numResults);
             }
             List<Object[]> objectArrList = q.getResultList();
             actionBeans = new ArrayList<CoordinatorActionBean>();

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 1a6ded7..cddeaf7 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -253,6 +253,11 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 bean.setAppName((String) arr[1]);
                 bean.setStatusStr((String) arr[2]);
                 bean.setAppNamespace((String) arr[3]);
+                bean.setExecution((String) arr[4]);
+                bean.setFrequency((String) arr[5]);
+                bean.setTimeUnitStr((String) arr[6]);
+                bean.setTimeZone((String) arr[7]);
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[8]));
                 break;
             case GET_COORD_JOB_ACTION_READY:
                 bean = new CoordinatorJobBean();
@@ -298,6 +303,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
                 bean.setJobXmlBlob((StringBlob) arr[18]);
                 bean.setAppNamespace((String) arr[19]);
                 bean.setTimeUnitStr((String) arr[20]);
+                bean.setExecution((String) arr[21]);
                 break;
             case GET_COORD_JOB_SUSPEND_KILL:
                 bean = new CoordinatorJobBean();

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index b11b3d2..e140e64 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -380,13 +380,17 @@ public class StatusTransitService implements Service {
             if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
                 totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
             }
-
             int totalValuesTimeOut = 0;
             if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
                 totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
             }
+            int totalValuesSkipped = 0;
+            if (coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) {
+                totalValuesSkipped = coordActionStatus.get(CoordinatorAction.Status.SKIPPED);
+            }
 
-            if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
+            if (coordActionsCount ==
+                    (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) {
 
                 // If all coord action is done and coord is killed, then don't change the status.
                 if (coordStatus[0].equals(Job.Status.KILLED)) {
@@ -394,7 +398,7 @@ public class StatusTransitService implements Service {
                     return true;
                 }
                 // If all the coordinator actions are succeeded then coordinator job should be succeeded.
-                if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
+                if (coordActionsCount == (totalValuesSucceed + totalValuesSkipped) && isDoneMaterialization) {
                     coordStatus[0] = Job.Status.SUCCEEDED;
                     ret = true;
                 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
index e15e567..e3db675 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
@@ -502,7 +502,7 @@ public class TestCoordinatorEngine extends XTestCase {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
             assertEquals("E0421: Invalid job filter [status=blahblah], invalid status value [blahblah]."
                     + " Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT "
-                    + "SUCCEEDED KILLED FAILED IGNORED ]", ex.getMessage());
+                    + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage());
         }
 
         // Check for empty status value
@@ -513,7 +513,7 @@ public class TestCoordinatorEngine extends XTestCase {
             assertEquals(ErrorCode.E0421, ex.getErrorCode());
             assertEquals("E0421: Invalid job filter [status=\"\"], invalid status value [\"\"]. "
                     + "Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT "
-                    + "SUCCEEDED KILLED FAILED IGNORED ]", ex.getMessage());
+                    + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage());
         }
 
         // Check for invalid filter option

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index ebf5081..1ffadbd 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -39,6 +39,7 @@ import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -772,6 +773,61 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
         checkCoordAction(actionId, missingDeps, CoordinatorAction.Status.TIMEDOUT);
     }
 
+    @Test
+    public void testLastOnly() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNING, false, true);
+        job.setExecutionOrder(CoordinatorJobBean.Execution.LAST_ONLY);
+        job.setFrequency("10");
+        job.setTimeUnit(Timeunit.MINUTE);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+        String missingDeps = "hdfs:///dirx/filex";
+
+        // 1 hour in the past means next nominal time is 50 mins ago so should become SKIPPED
+        String actionId1 = addInitRecords(missingDeps, null, TZ, job, 1);
+        Date nomTime = new Date(new Date().getTime() - 60 * 60 * 1000);     // 1 hour ago
+        setCoordActionNominalTime(actionId1, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId1, job.getId()).call();
+        checkCoordActionStatus(actionId1, CoordinatorAction.Status.SKIPPED);
+
+        // 1 hour in the future means next nominal time is 70 mins from now, so nothing should happen
+        String actionId2 = addInitRecords(missingDeps, null, TZ, job, 2);
+        nomTime = new Date(new Date().getTime() + 60 * 60 * 1000);          // 1 hour from now
+        setCoordActionNominalTime(actionId2, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId2, job.getId()).call();
+        checkCoordActionStatus(actionId2, CoordinatorAction.Status.WAITING);
+
+        // 5 mins in the past means next nominal time is 5 mins from now, so nothing should happen
+        String actionId3 = addInitRecords(missingDeps, null, TZ, job, 3);
+        nomTime = new Date(new Date().getTime() - 5 * 60 * 1000);           // 5 minutes ago
+        setCoordActionNominalTime(actionId3, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId3, job.getId()).call();
+        checkCoordActionStatus(actionId3, CoordinatorAction.Status.WAITING);
+
+        // 3 mins in the past means the next nominal time is 7 mins from now, but the end time is only 3 mins from now (which means
+        // that the next nominal time would be after the end time), so nothing should happen
+        Date endTime = new Date(new Date().getTime() + 3 * 60 * 1000);      // 3 minutes from now
+        job.setEndTime(endTime);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+        String actionId4 = addInitRecords(missingDeps, null, TZ, job, 4);
+        nomTime = new Date(new Date().getTime() - 5 * 60 * 1000);           // 5 minutes ago
+        setCoordActionNominalTime(actionId4, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId4, job.getId()).call();
+        checkCoordActionStatus(actionId4, CoordinatorAction.Status.WAITING);
+
+        // 1 hour in the past means the next nominal time is 50 mins ago, but the end time is 55 mins ago (which means that the next
+        // nominal time would be after the end time but still in the past), so nothing should happen even though the action and the
+        // next nominal time are both in the past
+        endTime = new Date(new Date().getTime() - 55 * 60 * 1000);          // 55 mins ago
+        job.setEndTime(endTime);
+        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+        String actionId5 = addInitRecords(missingDeps, null, TZ, job, 5);
+        nomTime = new Date(new Date().getTime() - 60 * 60 * 1000);           // 1 hour ago
+        setCoordActionNominalTime(actionId5, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId5, job.getId()).call();
+        checkCoordActionStatus(actionId5, CoordinatorAction.Status.WAITING);
+    }
+
     protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
             Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
 
@@ -951,4 +1007,24 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
             throw new Exception("Action ID " + actionId + " was not stored properly in db");
         }
     }
+
+    private CoordinatorActionBean checkCoordActionStatus(final String actionId, final CoordinatorAction.Status stat)
+            throws Exception {
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        waitFor(5 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                try {
+                    CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+                    return stat.equals(action.getStatus());
+                }
+                catch (JPAExecutorException se) {
+                    throw new Exception("Action ID " + actionId + " was not stored properly in db");
+                }
+            }
+        });
+        CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+        assertEquals(stat, action.getStatus());
+        return action;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
new file mode 100644
index 0000000..2baabce
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionSkipXCommand extends XDataTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+    public void testVerifyPrecondition() throws Exception {
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + 1 * 60 * 1000);
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+                true, 0);
+        int actionNum = 1;
+        for (CoordinatorAction.Status actionStatus : CoordinatorAction.Status.values()) {
+            CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum, actionStatus,
+                "coord-action-get.xml", 0);
+            try {
+                new CoordActionSkipXCommand(action, getTestUser(), "my-app-name").verifyPrecondition();
+                if (!(actionStatus.equals(CoordinatorAction.Status.WAITING)
+                        || actionStatus.equals(CoordinatorAction.Status.READY))) {
+                    fail();
+                }
+            } catch (PreconditionException pe) {
+                assertEquals(ErrorCode.E1100, pe.getErrorCode());
+                assertTrue(pe.getMessage().endsWith("[" + actionStatus + "]]"));
+            }
+            actionNum++;
+        }
+    }
+
+    public void testWaitingToSkipped() throws Exception {
+        _testToSkipped(CoordinatorAction.Status.WAITING);
+    }
+
+    public void testReadyToSkipped() throws Exception {
+        _testToSkipped(CoordinatorAction.Status.READY);
+    }
+
+    public void _testToSkipped(CoordinatorAction.Status actionStatus) throws Exception {
+        Date startTime = new Date();
+        Date endTime = new Date(startTime.getTime() + 1 * 60 * 1000);
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+                true, 0);
+        CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, actionStatus,
+                "coord-action-get.xml", 0);
+        assertEquals(actionStatus, action.getStatus());
+        new CoordActionSkipXCommand(action, getTestUser(), "my-app-name").call();
+        action = CoordActionQueryExecutor.getInstance()
+                .get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, action.getId());
+        assertEquals(CoordinatorAction.Status.SKIPPED, action.getStatus());
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 9a8d65b..5b22abc 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -18,6 +18,7 @@
 package org.apache.oozie.command.coord;
 
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
@@ -30,6 +31,7 @@ import org.apache.oozie.client.CoordinatorJob.Timeunit;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
@@ -563,6 +565,33 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
 
     }
 
+    public void testLastOnlyMaterialization() throws Exception {
+
+        long now = System.currentTimeMillis();
+        Date startTime = DateUtils.toDate(new Timestamp(now - 180 * 60 * 1000));    // 3 hours ago
+        Date endTime = DateUtils.toDate(new Timestamp(now + 180 * 60 * 1000));      // 3 hours from now
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
+                CoordinatorJob.Execution.LAST_ONLY);
+        // This would normally materialize the throttle amount and within a 1 hour window; however, with LAST_ONLY this should
+        // ignore those parameters and materialize everything in the past
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+        CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[19];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+
+        startTime = DateUtils.toDate(new Timestamp(now));                           // now
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
+                CoordinatorJob.Execution.LAST_ONLY);
+        // We're starting from "now" this time (i.e. present/future), so it should materialize things normally
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+        expectedStatuses = new CoordinatorActionBean.Status[6];
+        Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+        checkCoordActionsStatus(job.getId(), expectedStatuses);
+    }
+
+
     protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
             Date pauseTime, String freq) throws Exception {
         return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq);
@@ -570,6 +599,11 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
 
     protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
             Date pauseTime, int timeout, String freq) throws Exception {
+        return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, CoordinatorJob.Execution.FIFO);
+    }
+
+    protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
+            Date pauseTime, int timeout, String freq, CoordinatorJob.Execution execution) throws Exception {
         CoordinatorJobBean coordJob = createCoordJob(status, startTime, endTime, false, false, 0);
         coordJob.setStartTime(startTime);
         coordJob.setEndTime(endTime);
@@ -579,6 +613,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         coordJob.setTimeout(timeout);
         coordJob.setConcurrency(3);
         coordJob.setMatThrottling(20);
+        coordJob.setExecutionOrder(execution);
 
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
@@ -697,6 +732,27 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
         }
     }
 
+    private void checkCoordActionsStatus(String jobId, CoordinatorActionBean.Status[] statuses) {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
+                    null, 1, 1000, false));
+
+            if (actions.size() != statuses.length) {
+                fail("Should have " + statuses.length + " actions created for job " + jobId + ", but has " + actions.size()
+                        + " actions.");
+            }
+
+            for (int i=0; i < statuses.length; i++ ) {
+                assertEquals(statuses[i], actions.get(i).getStatus());
+            }
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + jobId + " was not stored properly in db");
+        }
+    }
+
     private void checkCoordActionsTimeout(String actionId, int expected) {
         try {
             JPAService jpaService = Services.get().get(JPAService.class);

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
index 1cdccf1..b81c568 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
@@ -17,24 +17,28 @@
  */
 package org.apache.oozie.executor.jpa;
 
+import java.util.Date;
 import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 
 public class TestCoordJobGetReadyActionsJPAExecutor extends XDataTestCase {
     Services services;
+    private final String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+            "org.apache.oozie.service.PauseTransitService",
+            "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
         services = new Services();
+        setClassesToBeExcluded(services.getConf(), excludedServices);
         services.init();
     }
 
@@ -44,24 +48,82 @@ public class TestCoordJobGetReadyActionsJPAExecutor extends XDataTestCase {
         super.tearDown();
     }
 
-    public void testCoordActionGet() throws Exception {
-        int actionNum = 1;
+    public void testCoordActionGetFIFO() throws Exception {
         CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
-        _testGetReadyActions(job.getId(), 0);
+        Date nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        _testGetReadyActions(job.getId(), 0, "FIFO");
 
         cleanUpDBTables();
         job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
-        addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.READY, "coord-action-get.xml", 0);
-        _testGetReadyActions(job.getId(), 1);
+        nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "FIFO");
+        assertEquals((job.getId() + "@1"), actions.get(0).getId());
+        assertEquals((job.getId() + "@2"), actions.get(1).getId());
     }
 
-    private void _testGetReadyActions(String jobId, int expected) throws Exception {
+    public void testCoordActionGetLIFO() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        Date nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        _testGetReadyActions(job.getId(), 0, "LIFO");
+
+        cleanUpDBTables();
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "LIFO");
+        assertEquals((job.getId() + "@2"), actions.get(0).getId());
+        assertEquals((job.getId() + "@1"), actions.get(1).getId());
+    }
+
+    public void testCoordActionGetLAST_ONLY() throws Exception {
+        CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        Date nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        _testGetReadyActions(job.getId(), 0, "LAST_ONLY");
+
+        cleanUpDBTables();
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        nomTime = new Date();
+        addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+        nomTime.setTime(nomTime.getTime() + 1000);
+        addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+        List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "LAST_ONLY");
+        assertEquals((job.getId() + "@2"), actions.get(0).getId());
+        assertEquals((job.getId() + "@1"), actions.get(1).getId());
+    }
+
+    private List<CoordinatorActionBean> _testGetReadyActions(String jobId, int expected, String execution) throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         assertNotNull(jpaService);
-        CoordJobGetReadyActionsJPAExecutor actionGetCmd = new CoordJobGetReadyActionsJPAExecutor(jobId, 10, "FIFO");
+        CoordJobGetReadyActionsJPAExecutor actionGetCmd = new CoordJobGetReadyActionsJPAExecutor(jobId, 10, execution);
         List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
-        assertEquals(actions.size(), expected);
+        assertEquals(expected, actions.size());
+        return actions;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
index 9427db0..0ad2f41 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
@@ -186,6 +186,7 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
         bean.setBundleId("dummy-bundleid");
         bean.setOrigJobXml("dummy-origjobxml");
         bean.setSlaXml("<sla></sla>");
+        bean.setExecution("LIFO");  // FIFO is the default
         CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, bean);
         CoordinatorJobBean retBean;
         // GET_COORD_JOB_USER_APPNAME
@@ -203,6 +204,11 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
         assertEquals(bean.getAppName(), retBean.getAppName());
         assertEquals(bean.getStatusStr(), retBean.getStatusStr());
         assertEquals(bean.getAppNamespace(), retBean.getAppNamespace());
+        assertEquals(bean.getExecution(), retBean.getExecution());
+        assertEquals(bean.getFrequency(), retBean.getFrequency());
+        assertEquals(bean.getTimeUnit(), retBean.getTimeUnit());
+        assertEquals(bean.getTimeZone(), retBean.getTimeZone());
+        assertEquals(bean.getEndTime(), retBean.getEndTime());
         assertNull(retBean.getConf());
         assertNull(retBean.getJobXmlBlob());
         assertNull(retBean.getOrigJobXmlBlob());
@@ -252,6 +258,7 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
         assertEquals(bean.getBundleId(), retBean.getBundleId());
         assertEquals(bean.getConf(), retBean.getConf());
         assertEquals(bean.getJobXml(), retBean.getJobXml());
+        assertEquals(bean.getExecution(), retBean.getExecution());
         assertNull(retBean.getOrigJobXmlBlob());
         assertNull(retBean.getSlaXmlBlob());
         // GET_COORD_JOB_SUSPEND_KILL

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 4f500da..a19b877 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -523,7 +523,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
      * @param status coord action status
      * @param resourceXmlName xml file name
      * @param pending pending counter
-     * @param action nominal time
+     * @param actionNominalTime
      * @return coord action bean
      * @throws Exception thrown if unable to create coord action bean
      */
@@ -1525,7 +1525,12 @@ public abstract class XDataTestCase extends XHCatTestCase {
         CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
                 CoordinatorJob.Status.RUNNING, false, true);
 
-        CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+        return addInitRecords(missingDependencies, pushMissingDependencies, oozieTimeZoneMask, job, 1);
+    }
+
+    protected String addInitRecords(String missingDependencies, String pushMissingDependencies, String oozieTimeZoneMask,
+            CoordinatorJobBean job, int actionNum) throws Exception {
+        CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), actionNum,
                 CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", missingDependencies,
                 pushMissingDependencies, oozieTimeZoneMask);
         return action.getId();
@@ -1594,6 +1599,13 @@ public abstract class XDataTestCase extends XHCatTestCase {
         CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
     }
 
+    protected void setCoordActionNominalTime(String actionId, long actionNominalTime) throws Exception {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+        action.setNominalTime(new Date(actionNominalTime));
+        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
+    }
+
     protected void setMissingDependencies(String actionId, String missingDependencies) throws Exception {
         JPAService jpaService = Services.get().get(JPAService.class);
         CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index 675c0fc..eb72768 100644
--- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -864,6 +864,9 @@ A coordinator action in *WAITING* status may timeout before it becomes ready for
 
 A coordinator action may remain in *READY* status for a while, without starting execution, due to the concurrency execution policies of the coordinator job.
 
+A coordinator action in *READY* or *WAITING* status changes to *SKIPPED* status if the execution strategy is LAST_ONLY and the
+current time is past the next action's nominal time.  See section 6.3 for more details.
+
 A coordinator action in *READY* status changes to *SUBMITTED* status if total current *RUNNING* and *SUBMITTED* actions are less than concurrency execution limit.
 
 A coordinator action in *SUBMITTED* status changes to *RUNNING* status when the workflow engine start execution of the coordinator action.
@@ -879,8 +882,8 @@ A coordinator action in *FAILED*, *KILLED*, or *TIMEDOUT* status can be changed
 
 Valid coordinator action status transitions are:
 
-   * *WAITING --> READY | TIMEDOUT | KILLED*
-   * *READY --> SUBMITTED | KILLED*
+   * *WAITING --> READY | TIMEDOUT | SKIPPED | KILLED*
+   * *READY --> SUBMITTED | SKIPPED | KILLED*
    * *SUBMITTED --> RUNNING | KILLED | FAILED*
    * *RUNNING --> SUCCEEDED | KILLED | FAILED*
    * *FAILED | KILLED | TIMEDOUT --> IGNORED*
@@ -961,7 +964,7 @@ A synchronous coordinator definition is a is defined by a name, start time and e
       * *%BLUE% execution: %ENDCOLOR%* Specifies the execution order if multiple instances of the coordinator job have satisfied their execution criteria. Valid values are: 
          * =FIFO= (oldest first) *default*.
          * =LIFO= (newest first).
-         * =LAST_ONLY= (discards all older materializations).
+         * =LAST_ONLY= (see explanation below).
       * *%BLUE% throttle: %ENDCOLOR%* The maximum coordinator actions are allowed to be in WAITING state concurrently. The default value is =12=.
    * *%BLUE% datasets: %ENDCOLOR%* The datasets coordinator application uses.
    * *%BLUE% input-events: %ENDCOLOR%* The coordinator job input events. 
@@ -979,6 +982,18 @@ A synchronous coordinator definition is a is defined by a name, start time and e
    * *%BLUE% action: %ENDCOLOR%* The coordinator action to execute. 
       * *%BLUE% workflow: %ENDCOLOR%* The workflow job invocation. Workflow job properties can refer to the defined data-in and data-out elements.
 
+*LAST_ONLY:* While =FIFO= and =LIFO= simply specify the order in which READY actions should be executed, =LAST_ONLY= can actually
+cause some actions to be SKIPPED and is a little harder to understand.  When =LAST_ONLY= is set, an action that is =WAITING=
+or =READY= will be =SKIPPED= when the current time is past the next action's nominal time.  For example, suppose action 1 and 2
+are both =WAITING=, the current time is 5:00pm, and action 2's nominal time is 5:10pm.  In 10 minutes from now, at 5:10pm, action 1
+will become SKIPPED, assuming it doesn't transition to =SUBMITTED= (or a terminal state) before then.  Another way of thinking about
+this is to view it as similar to setting the =timeout= equal to the =frequency=, except that the =SKIPPED= status doesn't cause the
+coordinator job to eventually become =DONEWITHERROR= and can actually become =SUCCEEDED= (i.e. it's a "good" version
+of =TIMEDOUT=).  =LAST_ONLY= is useful if you want a recurring job, but do not actually care about the individual instances and just
+always want the latest action.  For example, if you have a coordinator running every 10 minutes and take Oozie down for 1 hour, when
+Oozie comes back, there would normally be 6 actions =READY= to run.  However, with =LAST_ONLY=, only the current one will go
+to =SUBMITTED= and =RUNNING=; the others will go to SKIPPED.
+
 *%PURPLE% Syntax: %ENDCOLOR%*
 
 <verbatim>

http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 37dba02..9c18798 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1319 "LAST_ONLY" in execution control for coordinator job still runs all the actions (rkanter)
 OOZIE-1862 Add hadoop token file location for Hive/Tez jobs (venkatnrangan via bzhang)
 OOZIE-1775 TestEventGeneration.testCoordinatorActionEvent is failing and CoordRerunX should generate event (mona)
 OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)