You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/05/30 23:38:18 UTC
git commit: OOZIE-1319 LAST_ONLY in execution control for coordinator
job still runs all the actions (rkanter)
Repository: oozie
Updated Branches:
refs/heads/master ee8ef73f2 -> 18401e614
OOZIE-1319 LAST_ONLY in execution control for coordinator job still runs all the actions (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/18401e61
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/18401e61
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/18401e61
Branch: refs/heads/master
Commit: 18401e614b581711477d63bf6b4a56f8e739dbf0
Parents: ee8ef73
Author: Robert Kanter <rk...@cloudera.com>
Authored: Fri May 30 14:35:53 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Fri May 30 14:35:53 2014 -0700
----------------------------------------------------------------------
.../apache/oozie/client/CoordinatorAction.java | 3 +-
.../org/apache/oozie/CoordinatorActionBean.java | 2 +-
.../org/apache/oozie/CoordinatorJobBean.java | 4 +-
.../coord/CoordActionInputCheckXCommand.java | 56 +++++++++
.../command/coord/CoordActionSkipXCommand.java | 115 +++++++++++++++++++
.../CoordMaterializeTransitionXCommand.java | 13 ++-
.../jpa/CoordJobGetReadyActionsJPAExecutor.java | 13 +--
.../executor/jpa/CoordJobQueryExecutor.java | 6 +
.../oozie/service/StatusTransitService.java | 10 +-
.../org/apache/oozie/TestCoordinatorEngine.java | 4 +-
.../TestCoordActionInputCheckXCommand.java | 76 ++++++++++++
.../coord/TestCoordActionSkipXCommand.java | 92 +++++++++++++++
.../TestCoordMaterializeTransitionXCommand.java | 56 +++++++++
.../TestCoordJobGetReadyActionsJPAExecutor.java | 82 +++++++++++--
.../executor/jpa/TestCoordJobQueryExecutor.java | 7 ++
.../org/apache/oozie/test/XDataTestCase.java | 16 ++-
.../site/twiki/CoordinatorFunctionalSpec.twiki | 21 +++-
release-log.txt | 1 +
18 files changed, 539 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java b/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
index ba17c5b..100823f 100644
--- a/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
+++ b/client/src/main/java/org/apache/oozie/client/CoordinatorAction.java
@@ -37,7 +37,8 @@ public interface CoordinatorAction {
SUCCEEDED,
KILLED,
FAILED,
- IGNORED
+ IGNORED,
+ SKIPPED
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
index 43d5103..8cbcc4f 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
@@ -91,7 +91,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_ACTION_FOR_SLA", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.lastModifiedTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by ActionInfo command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INFO", query = "select a.id, a.jobId, a.actionNumber, a.consoleUrl, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.trackerUri, a.createdTimestamp, a.nominalTimestamp, a.statusStr, a.lastModifiedTimestamp, a.missingDependencies, a.pushMissingDependencies from CoordinatorActionBean a where a.id = :id"),
- // Select Query used by Timeout command
+ // Select Query used by Timeout and skip commands
@NamedQuery(name = "GET_COORD_ACTION_FOR_TIMEOUT", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.id = :id"),
// Select query used by InputCheck command
@NamedQuery(name = "GET_COORD_ACTION_FOR_INPUTCHECK", query = "select a.id, a.jobId, a.statusStr, a.runConf, a.nominalTimestamp, a.createdTimestamp, a.actionXml, a.missingDependencies, a.pushMissingDependencies, a.timeOut from CoordinatorActionBean a where a.id = :id"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
index b77082c..8fd53f1 100644
--- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
+++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java
@@ -85,13 +85,13 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_COORD_JOB_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"),
- @NamedQuery(name = "GET_COORD_JOB_INPUT_CHECK", query = "select w.user, w.appName, w.statusStr, w.appNamespace from CoordinatorJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_COORD_JOB_INPUT_CHECK", query = "select w.user, w.appName, w.statusStr, w.appNamespace, w.execution, w.frequency, w.timeUnitStr, w.timeZone, w.endTimestamp from CoordinatorJobBean w where w.id = :id"),
@NamedQuery(name = "GET_COORD_JOB_ACTION_READY", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.execution, w.concurrency from CoordinatorJobBean w where w.id = :id"),
@NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"),
- @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr from CoordinatorJobBean w where w.id = :id"),
+ @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr, w.execution from CoordinatorJobBean w where w.id = :id"),
@NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"),
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index e8667c1..19f867e 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.ParseException;
+import java.util.Calendar;
import java.util.Date;
import java.util.List;
@@ -35,6 +37,7 @@ import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.TimeUnit;
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.dependency.URIHandlerException;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
@@ -88,6 +91,40 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
this.jobId = jobId;
}
+ /**
+ * Computes the nominal time of the next action.
+ * Based on CoordMaterializeTransitionXCommand#materializeActions
+ *
+ * @return the nominal time of the next action
+ * @throws ParseException
+ */
+ private Date computeNextNominalTime() throws ParseException {
+ Date nextNominalTime;
+ boolean isCronFrequency = false;
+ int freq = -1;
+ try {
+ freq = Integer.parseInt(coordJob.getFrequency());
+ } catch (NumberFormatException e) {
+ isCronFrequency = true;
+ }
+
+ if (isCronFrequency) {
+ nextNominalTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(coordAction.getNominalTime(), coordJob);
+ } else {
+ Calendar nextNominalTimeCal = Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
+ nextNominalTimeCal.setTime(coordAction.getNominalTime());
+ TimeUnit freqTU = TimeUnit.valueOf(coordJob.getTimeUnitStr());
+ nextNominalTimeCal.add(freqTU.getCalendarUnit(), freq);
+ nextNominalTime = nextNominalTimeCal.getTime();
+ }
+
+ // If the next nominal time is after the job's end time, then this is the last action, so return null
+ if (nextNominalTime.after(coordJob.getEndTime())) {
+ nextNominalTime = null;
+ }
+ return nextNominalTime;
+ }
+
@Override
protected Void execute() throws CommandException {
LOG.debug("[" + actionId + "]::ActionInputCheck:: Action is in WAITING state.");
@@ -113,6 +150,25 @@ public class CoordActionInputCheckXCommand extends CoordinatorXCommand<Void> {
try {
Configuration actionConf = new XConfiguration(new StringReader(coordAction.getRunConf()));
cron.start();
+ if (coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
+ Date now = new Date();
+ Date nextNominalTime = computeNextNominalTime();
+ if (nextNominalTime != null) {
+ // If the current time is after the next action's nominal time, then we've passed the window where this action
+ // should be started; so set it to SKIPPED
+ if (now.after(nextNominalTime)) {
+ LOG.info("LAST_ONLY execution: Preparing to skip action [{0}] because the current time [{1}] is later than "
+ + "the nominal time [{2}] of the next action]", coordAction.getId(),
+ DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
+ queue(new CoordActionSkipXCommand(coordAction, coordJob.getUser(), coordJob.getAppName()));
+ return null;
+ } else {
+ LOG.debug("LAST_ONLY execution: Not skipping action [{0}] because the current time [{1}] is earlier than "
+ + "the nominal time [{2}] of the next action]", coordAction.getId(),
+ DateUtils.formatDateOozieTZ(now), DateUtils.formatDateOozieTZ(nextNominalTime));
+ }
+ }
+ }
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
StringBuilder nonResolvedList = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
new file mode 100644
index 0000000..c774f8e
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionSkipXCommand.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.CoordActionGetForTimeoutJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.service.EventHandlerService;
+import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.LogUtils;
+import org.apache.oozie.util.ParamChecker;
+
+/**
+ * This class sets a Coordinator action's status to SKIPPED
+ */
+public class CoordActionSkipXCommand extends CoordinatorXCommand<Void> {
+ private CoordinatorActionBean actionBean;
+ private String user;
+ private String appName;
+ private JPAService jpaService = null;
+
+ public CoordActionSkipXCommand(CoordinatorActionBean actionBean, String user, String appName) {
+ super("coord_action_timeout", "coord_action_timeout", 1);
+ this.actionBean = ParamChecker.notNull(actionBean, "ActionBean");
+ this.user = ParamChecker.notEmpty(user, "user");
+ this.appName = ParamChecker.notEmpty(appName, "appName");
+ }
+
+ @Override
+ protected Void execute() throws CommandException {
+ if (actionBean.getStatus() == CoordinatorAction.Status.WAITING
+ || actionBean.getStatus() == CoordinatorAction.Status.READY) {
+ LOG.info("Setting action [{0}] status to SKIPPED", actionBean.getId());
+ actionBean.setStatus(CoordinatorAction.Status.SKIPPED);
+ try {
+ queue(new CoordActionNotificationXCommand(actionBean), 100);
+ actionBean.setLastModifiedTime(new Date());
+ CoordActionQueryExecutor.getInstance().executeUpdate(
+ CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, actionBean);
+ if (EventHandlerService.isEnabled()) {
+ generateEvent(actionBean, user, appName, null);
+ }
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String getEntityKey() {
+ return actionBean.getJobId();
+ }
+
+ @Override
+ public String getKey() {
+ return getName() + "_" + actionBean.getId();
+ }
+
+ @Override
+ protected boolean isLockRequired() {
+ return true;
+ }
+
+ @Override
+ protected void loadState() throws CommandException {
+ jpaService = Services.get().get(JPAService.class);
+ if (jpaService == null) {
+ throw new CommandException(ErrorCode.E0610);
+ }
+
+ try {
+ actionBean = jpaService.execute(new CoordActionGetForTimeoutJPAExecutor(actionBean.getId()));
+ }
+ catch (JPAExecutorException e) {
+ throw new CommandException(e);
+ }
+ LogUtils.setLogInfo(actionBean, logInfo);
+ }
+
+ @Override
+ protected void verifyPrecondition() throws CommandException, PreconditionException {
+ if (!(actionBean.getStatus() == CoordinatorAction.Status.WAITING
+ || actionBean.getStatus() == CoordinatorAction.Status.READY)) {
+ throw new PreconditionException(ErrorCode.E1100, "The coord action must have status "
+ + CoordinatorAction.Status.WAITING + " or " + CoordinatorAction.Status.READY
+ + " but has status [" + actionBean.getStatus() + "]");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 7b46ad6..515e247 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -219,7 +219,8 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
/**
* Get materialization for window for catch-up jobs. for current jobs,it reruns currentMatdate, For catch-up, end
- * Mataterilized Time = startMatdTime + MatThrottling * frequency
+ * Mataterilized Time = startMatdTime + MatThrottling * frequency; unless LAST_ONLY execution order is set, in which
+ * case it returns now (to materialize all actions in the past)
*
* @param currentMatTime
* @return
@@ -230,6 +231,9 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
if (currentMatTime.after(new Date())) {
return currentMatTime;
}
+ if (coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY)) {
+ return new Date();
+ }
int frequency = 0;
try {
frequency = Integer.parseInt(coordJob.getFrequency());
@@ -397,7 +401,6 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
// Move to the End of duration, if needed.
DateUtils.moveToEnd(origStart, endOfFlag);
- Date effStart = (Date) startMatdTime.clone();
StringBuilder actionStrings = new StringBuilder();
Date jobPauseTime = coordJob.getPauseTime();
Calendar pause = null;
@@ -407,9 +410,11 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
String action = null;
- JPAService jpaService = Services.get().get(JPAService.class);
int numWaitingActions = jpaService.execute(new CoordActionsActiveCountJPAExecutor(coordJob.getId()));
int maxActionToBeCreated = coordJob.getMatThrottling() - numWaitingActions;
+ // If LAST_ONLY and all materialization is in the past, ignore maxActionsToBeCreated
+ boolean ignoreMaxActions =
+ coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) && endMatdTime.before(new Date());
LOG.debug("Coordinator job :" + coordJob.getId() + ", maxActionToBeCreated :" + maxActionToBeCreated
+ ", Mat_Throttle :" + coordJob.getMatThrottling() + ", numWaitingActions :" + numWaitingActions);
@@ -422,7 +427,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo
}
boolean firstMater = true;
- while (start.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
+ while (start.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) {
if (pause != null && start.compareTo(pause) >= 0) {
break;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
index d22b478..42dbca8 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
@@ -59,20 +59,13 @@ public class CoordJobGetReadyActionsJPAExecutor implements JPAExecutor<List<Coor
if (executionOrder.equalsIgnoreCase("FIFO")) {
q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
}
- else {
+ else { // LIFO or LAST_ONLY
q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
}
q.setParameter("jobId", coordJobId);
- // if executionOrder is LAST_ONLY, only retrieve first record in LIFO,
- // otherwise, use numResults if it is positive.
- if (executionOrder.equalsIgnoreCase("LAST_ONLY")) {
- q.setMaxResults(1);
- }
- else {
- if (numResults > 0) {
- q.setMaxResults(numResults);
- }
+ if (numResults > 0) {
+ q.setMaxResults(numResults);
}
List<Object[]> objectArrList = q.getResultList();
actionBeans = new ArrayList<CoordinatorActionBean>();
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
index 1a6ded7..cddeaf7 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java
@@ -253,6 +253,11 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
bean.setAppName((String) arr[1]);
bean.setStatusStr((String) arr[2]);
bean.setAppNamespace((String) arr[3]);
+ bean.setExecution((String) arr[4]);
+ bean.setFrequency((String) arr[5]);
+ bean.setTimeUnitStr((String) arr[6]);
+ bean.setTimeZone((String) arr[7]);
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[8]));
break;
case GET_COORD_JOB_ACTION_READY:
bean = new CoordinatorJobBean();
@@ -298,6 +303,7 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo
bean.setJobXmlBlob((StringBlob) arr[18]);
bean.setAppNamespace((String) arr[19]);
bean.setTimeUnitStr((String) arr[20]);
+ bean.setExecution((String) arr[21]);
break;
case GET_COORD_JOB_SUSPEND_KILL:
bean = new CoordinatorJobBean();
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
index b11b3d2..e140e64 100644
--- a/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
+++ b/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
@@ -380,13 +380,17 @@ public class StatusTransitService implements Service {
if (coordActionStatus.containsKey(CoordinatorAction.Status.KILLED)) {
totalValuesKilled = coordActionStatus.get(CoordinatorAction.Status.KILLED);
}
-
int totalValuesTimeOut = 0;
if (coordActionStatus.containsKey(CoordinatorAction.Status.TIMEDOUT)) {
totalValuesTimeOut = coordActionStatus.get(CoordinatorAction.Status.TIMEDOUT);
}
+ int totalValuesSkipped = 0;
+ if (coordActionStatus.containsKey(CoordinatorAction.Status.SKIPPED)) {
+ totalValuesSkipped = coordActionStatus.get(CoordinatorAction.Status.SKIPPED);
+ }
- if (coordActionsCount == (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut)) {
+ if (coordActionsCount ==
+ (totalValuesSucceed + totalValuesFailed + totalValuesKilled + totalValuesTimeOut + totalValuesSkipped)) {
// If all coord action is done and coord is killed, then don't change the status.
if (coordStatus[0].equals(Job.Status.KILLED)) {
@@ -394,7 +398,7 @@ public class StatusTransitService implements Service {
return true;
}
// If all the coordinator actions are succeeded then coordinator job should be succeeded.
- if (coordActionsCount == totalValuesSucceed && isDoneMaterialization) {
+ if (coordActionsCount == (totalValuesSucceed + totalValuesSkipped) && isDoneMaterialization) {
coordStatus[0] = Job.Status.SUCCEEDED;
ret = true;
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
index e15e567..e3db675 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngine.java
@@ -502,7 +502,7 @@ public class TestCoordinatorEngine extends XTestCase {
assertEquals(ErrorCode.E0421, ex.getErrorCode());
assertEquals("E0421: Invalid job filter [status=blahblah], invalid status value [blahblah]."
+ " Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT "
- + "SUCCEEDED KILLED FAILED IGNORED ]", ex.getMessage());
+ + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage());
}
// Check for empty status value
@@ -513,7 +513,7 @@ public class TestCoordinatorEngine extends XTestCase {
assertEquals(ErrorCode.E0421, ex.getErrorCode());
assertEquals("E0421: Invalid job filter [status=\"\"], invalid status value [\"\"]. "
+ "Valid status values are: [WAITING READY SUBMITTED RUNNING SUSPENDED TIMEDOUT "
- + "SUCCEEDED KILLED FAILED IGNORED ]", ex.getMessage());
+ + "SUCCEEDED KILLED FAILED IGNORED SKIPPED ]", ex.getMessage());
}
// Check for invalid filter option
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index ebf5081..1ffadbd 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -39,6 +39,7 @@ import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.HadoopAccessorService;
@@ -772,6 +773,61 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
checkCoordAction(actionId, missingDeps, CoordinatorAction.Status.TIMEDOUT);
}
+ @Test
+ public void testLastOnly() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+ job.setExecutionOrder(CoordinatorJobBean.Execution.LAST_ONLY);
+ job.setFrequency("10");
+ job.setTimeUnit(Timeunit.MINUTE);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+ String missingDeps = "hdfs:///dirx/filex";
+
+ // 1 hour in the past means next nominal time is 50 mins ago so should become SKIPPED
+ String actionId1 = addInitRecords(missingDeps, null, TZ, job, 1);
+ Date nomTime = new Date(new Date().getTime() - 60 * 60 * 1000); // 1 hour ago
+ setCoordActionNominalTime(actionId1, nomTime.getTime());
+ new CoordActionInputCheckXCommand(actionId1, job.getId()).call();
+ checkCoordActionStatus(actionId1, CoordinatorAction.Status.SKIPPED);
+
+ // 1 hour in the future means next nominal time is 70 mins from now, so nothing should happen
+ String actionId2 = addInitRecords(missingDeps, null, TZ, job, 2);
+ nomTime = new Date(new Date().getTime() + 60 * 60 * 1000); // 1 hour from now
+ setCoordActionNominalTime(actionId2, nomTime.getTime());
+ new CoordActionInputCheckXCommand(actionId2, job.getId()).call();
+ checkCoordActionStatus(actionId2, CoordinatorAction.Status.WAITING);
+
+ // 5 mins in the past means next nominal time is 5 mins from now, so nothing should happen
+ String actionId3 = addInitRecords(missingDeps, null, TZ, job, 3);
+ nomTime = new Date(new Date().getTime() - 5 * 60 * 1000); // 5 minutes ago
+ setCoordActionNominalTime(actionId3, nomTime.getTime());
+ new CoordActionInputCheckXCommand(actionId3, job.getId()).call();
+ checkCoordActionStatus(actionId3, CoordinatorAction.Status.WAITING);
+
+ // 3 mins in the past means the next nominal time is 7 mins from now, but the end time is only 3 mins from now (which means
+ // that the next nominal time would be after the end time), so nothing should happen
+ Date endTime = new Date(new Date().getTime() + 3 * 60 * 1000); // 3 minutes from now
+ job.setEndTime(endTime);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+ String actionId4 = addInitRecords(missingDeps, null, TZ, job, 4);
+ nomTime = new Date(new Date().getTime() - 5 * 60 * 1000); // 5 minutes ago
+ setCoordActionNominalTime(actionId4, nomTime.getTime());
+ new CoordActionInputCheckXCommand(actionId4, job.getId()).call();
+ checkCoordActionStatus(actionId4, CoordinatorAction.Status.WAITING);
+
+ // 1 hour in the past means the next nominal time is 50 mins ago, but the end time is 55 mins ago (which means that the next
+ // nominal time would be after the end time but still in the past), so nothing should happen even though the action and the
+ // next nominal time are both in the past
+ endTime = new Date(new Date().getTime() - 55 * 60 * 1000); // 55 mins ago
+ job.setEndTime(endTime);
+ CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
+ String actionId5 = addInitRecords(missingDeps, null, TZ, job, 5);
+ nomTime = new Date(new Date().getTime() - 60 * 60 * 1000); // 1 hour ago
+ setCoordActionNominalTime(actionId5, nomTime.getTime());
+ new CoordActionInputCheckXCommand(actionId5, job.getId()).call();
+ checkCoordActionStatus(actionId5, CoordinatorAction.Status.WAITING);
+ }
+
protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String testFileName, CoordinatorJob.Status status,
Date start, Date end, boolean pending, boolean doneMatd, int lastActionNum) throws Exception {
@@ -951,4 +1007,24 @@ public class TestCoordActionInputCheckXCommand extends XDataTestCase {
throw new Exception("Action ID " + actionId + " was not stored properly in db");
}
}
+
+ private CoordinatorActionBean checkCoordActionStatus(final String actionId, final CoordinatorAction.Status stat)
+ throws Exception {
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ waitFor(5 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ return stat.equals(action.getStatus());
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored properly in db");
+ }
+ }
+ });
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ assertEquals(stat, action.getStatus());
+ return action;
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
new file mode 100644
index 0000000..2baabce
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionSkipXCommand.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.coord;
+
+import java.util.Date;
+
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.command.PreconditionException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+
+public class TestCoordActionSkipXCommand extends XDataTestCase {
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+ public void testVerifyPrecondition() throws Exception {
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + 1 * 60 * 1000);
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+ true, 0);
+ int actionNum = 1;
+ for (CoordinatorAction.Status actionStatus : CoordinatorAction.Status.values()) {
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), actionNum, actionStatus,
+ "coord-action-get.xml", 0);
+ try {
+ new CoordActionSkipXCommand(action, getTestUser(), "my-app-name").verifyPrecondition();
+ if (!(actionStatus.equals(CoordinatorAction.Status.WAITING)
+ || actionStatus.equals(CoordinatorAction.Status.READY))) {
+ fail();
+ }
+ } catch (PreconditionException pe) {
+ assertEquals(ErrorCode.E1100, pe.getErrorCode());
+ assertTrue(pe.getMessage().endsWith("[" + actionStatus + "]]"));
+ }
+ actionNum++;
+ }
+ }
+
+ public void testWaitingToSkipped() throws Exception {
+ _testToSkipped(CoordinatorAction.Status.WAITING);
+ }
+
+ public void testReadyToSkipped() throws Exception {
+ _testToSkipped(CoordinatorAction.Status.READY);
+ }
+
+ public void _testToSkipped(CoordinatorAction.Status actionStatus) throws Exception {
+ Date startTime = new Date();
+ Date endTime = new Date(startTime.getTime() + 1 * 60 * 1000);
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
+ true, 0);
+ CoordinatorActionBean action = addRecordToCoordActionTable(job.getId(), 1, actionStatus,
+ "coord-action-get.xml", 0);
+ assertEquals(actionStatus, action.getStatus());
+ new CoordActionSkipXCommand(action, getTestUser(), "my-app-name").call();
+ action = CoordActionQueryExecutor.getInstance()
+ .get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, action.getId());
+ assertEquals(CoordinatorAction.Status.SKIPPED, action.getStatus());
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 9a8d65b..5b22abc 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -18,6 +18,7 @@
package org.apache.oozie.command.coord;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -30,6 +31,7 @@ import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
@@ -563,6 +565,33 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
}
+ public void testLastOnlyMaterialization() throws Exception {
+
+ long now = System.currentTimeMillis();
+ Date startTime = DateUtils.toDate(new Timestamp(now - 180 * 60 * 1000)); // 3 hours ago
+ Date endTime = DateUtils.toDate(new Timestamp(now + 180 * 60 * 1000)); // 3 hours from now
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
+ CoordinatorJob.Execution.LAST_ONLY);
+ // This would normally materialize the throttle amount and within a 1 hour window; however, with LAST_ONLY this should
+ // ignore those parameters and materialize everything in the past
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+ CoordinatorActionBean.Status[] expectedStatuses = new CoordinatorActionBean.Status[19];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+
+ startTime = DateUtils.toDate(new Timestamp(now)); // now
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, null, -1, "10",
+ CoordinatorJob.Execution.LAST_ONLY);
+ // We're starting from "now" this time (i.e. present/future), so it should materialize things normally
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ checkCoordJobs(job.getId(), CoordinatorJob.Status.RUNNING);
+ expectedStatuses = new CoordinatorActionBean.Status[6];
+ Arrays.fill(expectedStatuses, CoordinatorActionBean.Status.WAITING);
+ checkCoordActionsStatus(job.getId(), expectedStatuses);
+ }
+
+
protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
Date pauseTime, String freq) throws Exception {
return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, -1, freq);
@@ -570,6 +599,11 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
Date pauseTime, int timeout, String freq) throws Exception {
+ return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, CoordinatorJob.Execution.FIFO);
+ }
+
+ protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime,
+ Date pauseTime, int timeout, String freq, CoordinatorJob.Execution execution) throws Exception {
CoordinatorJobBean coordJob = createCoordJob(status, startTime, endTime, false, false, 0);
coordJob.setStartTime(startTime);
coordJob.setEndTime(endTime);
@@ -579,6 +613,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
coordJob.setTimeout(timeout);
coordJob.setConcurrency(3);
coordJob.setMatThrottling(20);
+ coordJob.setExecutionOrder(execution);
try {
JPAService jpaService = Services.get().get(JPAService.class);
@@ -697,6 +732,27 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
}
}
+ private void checkCoordActionsStatus(String jobId, CoordinatorActionBean.Status[] statuses) {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ List<CoordinatorActionBean> actions = jpaService.execute(new CoordJobGetActionsSubsetJPAExecutor(jobId,
+ null, 1, 1000, false));
+
+ if (actions.size() != statuses.length) {
+ fail("Should have " + statuses.length + " actions created for job " + jobId + ", but has " + actions.size()
+ + " actions.");
+ }
+
+ for (int i=0; i < statuses.length; i++ ) {
+ assertEquals(statuses[i], actions.get(i).getStatus());
+ }
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + jobId + " was not stored properly in db");
+ }
+ }
+
private void checkCoordActionsTimeout(String actionId, int expected) {
try {
JPAService jpaService = Services.get().get(JPAService.class);
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
index 1cdccf1..b81c568 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobGetReadyActionsJPAExecutor.java
@@ -17,24 +17,28 @@
*/
package org.apache.oozie.executor.jpa;
+import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
public class TestCoordJobGetReadyActionsJPAExecutor extends XDataTestCase {
Services services;
+ private final String[] excludedServices = { "org.apache.oozie.service.StatusTransitService",
+ "org.apache.oozie.service.PauseTransitService",
+ "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" };
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
+ setClassesToBeExcluded(services.getConf(), excludedServices);
services.init();
}
@@ -44,24 +48,82 @@ public class TestCoordJobGetReadyActionsJPAExecutor extends XDataTestCase {
super.tearDown();
}
- public void testCoordActionGet() throws Exception {
- int actionNum = 1;
+ public void testCoordActionGetFIFO() throws Exception {
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0);
- _testGetReadyActions(job.getId(), 0);
+ Date nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ _testGetReadyActions(job.getId(), 0, "FIFO");
cleanUpDBTables();
job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
- addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.READY, "coord-action-get.xml", 0);
- _testGetReadyActions(job.getId(), 1);
+ nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "FIFO");
+ assertEquals((job.getId() + "@1"), actions.get(0).getId());
+ assertEquals((job.getId() + "@2"), actions.get(1).getId());
}
- private void _testGetReadyActions(String jobId, int expected) throws Exception {
+ public void testCoordActionGetLIFO() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ Date nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ _testGetReadyActions(job.getId(), 0, "LIFO");
+
+ cleanUpDBTables();
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "LIFO");
+ assertEquals((job.getId() + "@2"), actions.get(0).getId());
+ assertEquals((job.getId() + "@1"), actions.get(1).getId());
+ }
+
+ public void testCoordActionGetLAST_ONLY() throws Exception {
+ CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ Date nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ _testGetReadyActions(job.getId(), 0, "LAST_ONLY");
+
+ cleanUpDBTables();
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+ nomTime = new Date();
+ addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.READY, "coord-action-get.xml", 0, nomTime);
+ nomTime.setTime(nomTime.getTime() + 1000);
+ addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, nomTime);
+ List<CoordinatorActionBean> actions = _testGetReadyActions(job.getId(), 2, "LAST_ONLY");
+ assertEquals((job.getId() + "@2"), actions.get(0).getId());
+ assertEquals((job.getId() + "@1"), actions.get(1).getId());
+ }
+
+ private List<CoordinatorActionBean> _testGetReadyActions(String jobId, int expected, String execution) throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
assertNotNull(jpaService);
- CoordJobGetReadyActionsJPAExecutor actionGetCmd = new CoordJobGetReadyActionsJPAExecutor(jobId, 10, "FIFO");
+ CoordJobGetReadyActionsJPAExecutor actionGetCmd = new CoordJobGetReadyActionsJPAExecutor(jobId, 10, execution);
List<CoordinatorActionBean> actions = jpaService.execute(actionGetCmd);
- assertEquals(actions.size(), expected);
+ assertEquals(expected, actions.size());
+ return actions;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
index 9427db0..0ad2f41 100644
--- a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
+++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordJobQueryExecutor.java
@@ -186,6 +186,7 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
bean.setBundleId("dummy-bundleid");
bean.setOrigJobXml("dummy-origjobxml");
bean.setSlaXml("<sla></sla>");
+ bean.setExecution("LIFO"); // FIFO is the default
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, bean);
CoordinatorJobBean retBean;
// GET_COORD_JOB_USER_APPNAME
@@ -203,6 +204,11 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
assertEquals(bean.getAppName(), retBean.getAppName());
assertEquals(bean.getStatusStr(), retBean.getStatusStr());
assertEquals(bean.getAppNamespace(), retBean.getAppNamespace());
+ assertEquals(bean.getExecution(), retBean.getExecution());
+ assertEquals(bean.getFrequency(), retBean.getFrequency());
+ assertEquals(bean.getTimeUnit(), retBean.getTimeUnit());
+ assertEquals(bean.getTimeZone(), retBean.getTimeZone());
+ assertEquals(bean.getEndTime(), retBean.getEndTime());
assertNull(retBean.getConf());
assertNull(retBean.getJobXmlBlob());
assertNull(retBean.getOrigJobXmlBlob());
@@ -252,6 +258,7 @@ public class TestCoordJobQueryExecutor extends XDataTestCase {
assertEquals(bean.getBundleId(), retBean.getBundleId());
assertEquals(bean.getConf(), retBean.getConf());
assertEquals(bean.getJobXml(), retBean.getJobXml());
+ assertEquals(bean.getExecution(), retBean.getExecution());
assertNull(retBean.getOrigJobXmlBlob());
assertNull(retBean.getSlaXmlBlob());
// GET_COORD_JOB_SUSPEND_KILL
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 4f500da..a19b877 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -523,7 +523,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
* @param status coord action status
* @param resourceXmlName xml file name
* @param pending pending counter
- * @param action nominal time
+ * @param actionNominalTime
* @return coord action bean
* @throws Exception thrown if unable to create coord action bean
*/
@@ -1525,7 +1525,12 @@ public abstract class XDataTestCase extends XHCatTestCase {
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
- CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ return addInitRecords(missingDependencies, pushMissingDependencies, oozieTimeZoneMask, job, 1);
+ }
+
+ protected String addInitRecords(String missingDependencies, String pushMissingDependencies, String oozieTimeZoneMask,
+ CoordinatorJobBean job, int actionNum) throws Exception {
+ CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), actionNum,
CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", missingDependencies,
pushMissingDependencies, oozieTimeZoneMask);
return action.getId();
@@ -1594,6 +1599,13 @@ public abstract class XDataTestCase extends XHCatTestCase {
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
}
+ protected void setCoordActionNominalTime(String actionId, long actionNominalTime) throws Exception {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+ action.setNominalTime(new Date(actionNominalTime));
+ CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
+ }
+
protected void setMissingDependencies(String actionId, String missingDependencies) throws Exception {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index 675c0fc..eb72768 100644
--- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -864,6 +864,9 @@ A coordinator action in *WAITING* status may timeout before it becomes ready for
A coordinator action may remain in *READY* status for a while, without starting execution, due to the concurrency execution policies of the coordinator job.
+A coordinator action in *READY* or *WAITING* status changes to *SKIPPED* status if the execution strategy is LAST_ONLY and the
+current time is past the next action's nominal time. See section 6.3 for more details.
+
A coordinator action in *READY* status changes to *SUBMITTED* status if total current *RUNNING* and *SUBMITTED* actions are less than concurrency execution limit.
A coordinator action in *SUBMITTED* status changes to *RUNNING* status when the workflow engine start execution of the coordinator action.
@@ -879,8 +882,8 @@ A coordinator action in *FAILED*, *KILLED*, or *TIMEDOUT* status can be changed
Valid coordinator action status transitions are:
- * *WAITING --> READY | TIMEDOUT | KILLED*
- * *READY --> SUBMITTED | KILLED*
+ * *WAITING --> READY | TIMEDOUT | SKIPPED | KILLED*
+ * *READY --> SUBMITTED | SKIPPED | KILLED*
* *SUBMITTED --> RUNNING | KILLED | FAILED*
* *RUNNING --> SUCCEEDED | KILLED | FAILED*
* *FAILED | KILLED | TIMEDOUT --> IGNORED*
@@ -961,7 +964,7 @@ A synchronous coordinator definition is a is defined by a name, start time and e
* *%BLUE% execution: %ENDCOLOR%* Specifies the execution order if multiple instances of the coordinator job have satisfied their execution criteria. Valid values are:
* =FIFO= (oldest first) *default*.
* =LIFO= (newest first).
- * =LAST_ONLY= (discards all older materializations).
+ * =LAST_ONLY= (see explanation below).
* *%BLUE% throttle: %ENDCOLOR%* The maximum coordinator actions are allowed to be in WAITING state concurrently. The default value is =12=.
* *%BLUE% datasets: %ENDCOLOR%* The datasets coordinator application uses.
* *%BLUE% input-events: %ENDCOLOR%* The coordinator job input events.
@@ -979,6 +982,18 @@ A synchronous coordinator definition is a is defined by a name, start time and e
* *%BLUE% action: %ENDCOLOR%* The coordinator action to execute.
* *%BLUE% workflow: %ENDCOLOR%* The workflow job invocation. Workflow job properties can refer to the defined data-in and data-out elements.
+*LAST_ONLY:* While =FIFO= and =LIFO= simply specify the order in which READY actions should be executed, =LAST_ONLY= can actually
+cause some actions to be SKIPPED and is a little harder to understand. When =LAST_ONLY= is set, an action that is =WAITING=
+or =READY= will be =SKIPPED= when the current time is past the next action's nominal time. For example, suppose action 1 and 2
+are both =WAITING=, the current time is 5:00pm, and action 2's nominal time is 5:10pm. In 10 minutes from now, at 5:10pm, action 1
+will become SKIPPED, assuming it doesn't transition to =SUBMITTED= (or a terminal state) before then. Another way of thinking about
+this is to view it as similar to setting the =timeout= equal to the =frequency=, except that the =SKIPPED= status doesn't cause the
+coordinator job to eventually become =DONEWITHERROR= and can actually become =SUCCEEDED= (i.e. it's a "good" version
+of =TIMEDOUT=). =LAST_ONLY= is useful if you want a recurring job, but do not actually care about the individual instances and just
+always want the latest action. For example, if you have a coordinator running every 10 minutes and take Oozie down for 1 hour, when
+Oozie comes back, there would normally be 6 actions =READY= to run. However, with =LAST_ONLY=, only the current one will go
+to =SUBMITTED= and =RUNNING=; the others will go to SKIPPED.
+
*%PURPLE% Syntax: %ENDCOLOR%*
<verbatim>
http://git-wip-us.apache.org/repos/asf/oozie/blob/18401e61/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 37dba02..9c18798 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1319 "LAST_ONLY" in execution control for coordinator job still runs all the actions (rkanter)
OOZIE-1862 Add hadoop token file location for Hive/Tez jobs (venkatnrangan via bzhang)
OOZIE-1775 TestEventGeneration.testCoordinatorActionEvent is failing and CoordRerunX should generate event (mona)
OOZIE-1844 HA - Lock mechanism for CoordMaterializeTriggerService (puru via rohini)