You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/06/17 02:31:04 UTC
git commit: OOZIE-1879 Workflow Rerun causes error depending on the
order of forked nodes (rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 25c640b3b -> cbb1eac9d
OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cbb1eac9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cbb1eac9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cbb1eac9
Branch: refs/heads/master
Commit: cbb1eac9db427db0047847e3ed5564979be5571c
Parents: 25c640b
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jun 16 17:30:12 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jun 16 17:30:12 2014 -0700
----------------------------------------------------------------------
.../org/apache/oozie/WorkflowActionBean.java | 2 +-
.../apache/oozie/command/wf/ReRunXCommand.java | 10 ++-
.../jpa/WorkflowActionQueryExecutor.java | 1 +
.../org/apache/oozie/workflow/WorkflowLib.java | 8 ++-
.../workflow/lite/LiteWorkflowInstance.java | 72 ++++++++++++++++++++
.../oozie/workflow/lite/LiteWorkflowLib.java | 7 +-
.../oozie/command/wf/TestReRunXCommand.java | 68 ++++++++++++++++++
core/src/test/resources/rerun-wf-fork.xml | 63 +++++++++++++++++
release-log.txt | 1 +
9 files changed, 226 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 6861995..e64e9bf 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -109,7 +109,7 @@ import org.json.simple.JSONObject;
@NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.statusStr = 'START_RETRY' OR a.statusStr = 'START_MANUAL' OR a.statusStr = 'END_RETRY' OR a.statusStr = 'END_MANUAL')"),
- @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") })
+ @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr, a.endTimestamp from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") })
@Table(name = "WF_ACTIONS")
public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
@Id
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
index fe588d4..5dd06ca 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
@@ -158,8 +158,16 @@ public class ReRunXCommand extends WorkflowXCommand<Void> {
// Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
conf = ((XConfiguration) conf).resolve();
+ // Prepare the action endtimes map
+ Map<String, Date> actionEndTimes = new HashMap<String, Date>();
+ for (WorkflowActionBean action : actions) {
+ if (action.getEndTime() != null) {
+ actionEndTimes.put(action.getName(), action.getEndTime());
+ }
+ }
+
try {
- newWfInstance = workflowLib.createInstance(app, conf, jobId);
+ newWfInstance = workflowLib.createInstance(app, conf, jobId, actionEndTimes);
}
catch (WorkflowException e) {
throw new CommandException(e);
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 9156a27..0c323a3 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -364,6 +364,7 @@ public class WorkflowActionQueryExecutor extends
bean.setId((String) arr[0]);
bean.setName((String) arr[1]);
bean.setStatusStr((String) arr[2]);
+ bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
index 7e4c90a..e79e59d 100644
--- a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
@@ -17,6 +17,8 @@
*/
package org.apache.oozie.workflow;
+import java.util.Date;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -50,15 +52,17 @@ public interface WorkflowLib {
public WorkflowInstance createInstance(WorkflowApp app, Configuration conf) throws WorkflowException;
/**
- * Create a workflow instance with the given wfId. This will be used for re-running workflows.
+ * Create a workflow instance with the given wfId and actions endtime map. This will be used for re-running workflows.
*
* @param app application to create a workflow instance of.
* @param conf job configuration.
* @param wfId Workflow ID.
+ * @param actionEndTimes A map of the actions to their endtimes; actions with no endtime should be omitted
* @return the newly created workflow instance.
* @throws WorkflowException thrown if the instance could not be created.
*/
- public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException;
+ public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes)
+ throws WorkflowException;
/**
* Insert a workflow instance in storage.
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
index bf8dc05..a5db84a 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
@@ -37,6 +37,10 @@ import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -45,6 +49,8 @@ import java.util.Map;
public class LiteWorkflowInstance implements Writable, WorkflowInstance {
private static final String TRANSITION_TO = "transition.to";
+ private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE);
+
private XLog log;
private static String PATH_SEPARATOR = "/";
@@ -154,6 +160,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
private Map<String, String> persistentVars = new HashMap<String, String>();
private Map<String, Object> transientVars = new HashMap<String, Object>();
+ private ActionEndTimesComparator actionEndTimesComparator = null;
protected LiteWorkflowInstance() {
log = XLog.getLog(getClass());
@@ -168,6 +175,11 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
status = Status.PREP;
}
+ public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId, Map<String, Date> actionEndTimes) {
+ this(def, conf, instanceId);
+ actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
+ }
+
public synchronized boolean start() throws WorkflowException {
if (status != Status.PREP) {
throw new WorkflowException(ErrorCode.E0719);
@@ -294,6 +306,16 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
}
}
+
+ // If we're doing a rerun, then we need to make sure to put the actions in pathToStart into the order
+ // that they ended in. Otherwise, it could result in an error later on in some edge cases.
+ // e.g. You have a fork with two nodes, A and B, that both succeeded, followed by a join and some more
+ // nodes, some of which failed. If you do the rerun, it will always signal A and then B, even if in the
+ // original run B signaled first and then A. By sorting this, we maintain the proper signal ordering.
+ if (actionEndTimesComparator != null && pathsToStart.size() > 1) {
+ Collections.sort(pathsToStart, actionEndTimesComparator);
+ }
+
// signal all new synch transitions
for (String pathToStart : pathsToStart) {
signal(pathToStart, "::synch::");
@@ -585,6 +607,14 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
dOut.writeUTF(entry.getKey());
writeStringAsBytes(entry.getValue(), dOut);
}
+ if (actionEndTimesComparator != null) {
+ Map<String, Date> actionEndTimes = actionEndTimesComparator.getActionEndTimes();
+ dOut.writeInt(actionEndTimes.size());
+ for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) {
+ dOut.writeUTF(entry.getKey());
+ dOut.writeLong(entry.getValue().getTime());
+ }
+ }
}
@Override
@@ -616,6 +646,21 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
String vVal = readBytesAsString(dIn);
persistentVars.put(vName, vVal);
}
+ int numActionEndTimes = -1;
+ try {
+ numActionEndTimes = dIn.readInt();
+ } catch (IOException ioe) {
+ // This means that there isn't an actionEndTimes, so just ignore
+ }
+ if (numActionEndTimes > 0) {
+ Map<String, Date> actionEndTimes = new HashMap<String, Date>(numActionEndTimes);
+ for (int x = 0; x < numActionEndTimes; x++) {
+ String name = dIn.readUTF();
+ long endTime = dIn.readLong();
+ actionEndTimes.put(name, new Date(endTime));
+ }
+ actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
+ }
refreshLog();
}
@@ -671,4 +716,31 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
return instanceId.hashCode();
}
+ private class ActionEndTimesComparator implements Comparator<String> {
+
+ private final Map<String, Date> actionEndTimes;
+
+ public ActionEndTimesComparator(Map<String, Date> actionEndTimes) {
+ this.actionEndTimes = actionEndTimes;
+ }
+
+ @Override
+ public int compare(String node1, String node2) {
+ Date date1 = FAR_INTO_THE_FUTURE;
+ Date date2 = FAR_INTO_THE_FUTURE;
+ NodeInstance node1Instance = executionPaths.get(node1);
+ if (node1Instance != null) {
+ date1 = this.actionEndTimes.get(node1Instance.nodeName);
+ }
+ NodeInstance node2Instance = executionPaths.get(node2);
+ if (node2Instance != null) {
+ date2 = this.actionEndTimes.get(node2Instance.nodeName);
+ }
+ return date1.compareTo(date2);
+ }
+
+ public Map<String, Date> getActionEndTimes() {
+ return actionEndTimes;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
index 7f6f1cc..0e0aefd 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import javax.xml.validation.Schema;
import java.io.StringReader;
+import java.util.Date;
+import java.util.Map;
//TODO javadoc
public abstract class LiteWorkflowLib implements WorkflowLib {
@@ -63,9 +65,10 @@ public abstract class LiteWorkflowLib implements WorkflowLib {
}
@Override
- public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException {
+ public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes)
+ throws WorkflowException {
ParamChecker.notNull(app, "app");
ParamChecker.notNull(wfId, "wfId");
- return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId);
+ return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId, actionEndTimes);
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index 1688dc9..5bae614 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -24,13 +24,16 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
+import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.action.hadoop.ShellActionExecutor;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.test.XDataTestCase;
@@ -39,7 +42,9 @@ import org.apache.oozie.util.IOUtils;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogService;
@@ -110,6 +115,69 @@ public class TestReRunXCommand extends XDataTestCase {
assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId1).getStatus());
}
+ /**
+ * This tests a specific edge case where rerun can fail when there's a fork, the actions in the fork succeed, but an action
+ * after the fork fails. Previously, the rerun would step through the forked actions in the order they were listed in the
+ * fork action's XML; if they happened to finish in a different order, this would cause an error during rerun. This is fixed by
+ * enforcing the same order in LiteWorkflowInstance#signal, which this test verifies.
+ *
+ * @throws Exception
+ */
+ public void testRerunFork() throws Exception {
+ // We need the shell schema and action for this test
+ Services.get().getConf().set(ActionService.CONF_ACTION_EXECUTOR_EXT_CLASSES, ShellActionExecutor.class.getName());
+ Services.get().setService(ActionService.class);
+ Services.get().getConf().set(SchemaService.WF_CONF_EXT_SCHEMAS, "shell-action-0.3.xsd");
+ Services.get().setService(SchemaService.class);
+
+ Reader reader = IOUtils.getResourceAsReader("rerun-wf-fork.xml", -1);
+ Writer writer = new FileWriter(new File(getTestCaseDir(), "workflow.xml"));
+ IOUtils.copyCharStream(reader, writer);
+
+ final OozieClient wfClient = LocalOozie.getClient();
+ Properties conf = wfClient.createConfiguration();
+ conf.setProperty("nameNode", getNameNodeUri());
+ conf.setProperty("jobTracker", getJobTrackerUri());
+ conf.setProperty(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml"));
+ conf.setProperty(OozieClient.USER_NAME, getTestUser());
+ conf.setProperty("cmd3", "echo1"); // expected to fail
+
+ final String jobId1 = wfClient.submit(conf);
+ wfClient.start(jobId1);
+ waitFor(40 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return wfClient.getJobInfo(jobId1).getStatus() == WorkflowJob.Status.KILLED;
+ }
+ });
+ assertEquals(WorkflowJob.Status.KILLED, wfClient.getJobInfo(jobId1).getStatus());
+ List<WorkflowAction> actions = wfClient.getJobInfo(jobId1).getActions();
+ assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus()); // fork
+ assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus()); // sh1
+ assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus()); // sh2
+ assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // join
+ assertEquals(WorkflowAction.Status.ERROR, actions.get(5).getStatus()); // sh3
+
+ // rerun failed node, which is after the fork
+ conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
+ conf.setProperty("cmd3", "echo"); // expected to succeed
+
+ wfClient.reRun(jobId1, conf);
+ waitFor(40 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return wfClient.getJobInfo(jobId1).getStatus() == WorkflowJob.Status.SUCCEEDED;
+ }
+ });
+ assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId1).getStatus());
+ actions = wfClient.getJobInfo(jobId1).getActions();
+ assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus()); // fork
+ assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus()); // sh1
+ assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus()); // sh2
+ assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus()); // join
+ assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus()); // sh3
+ }
+
/*
* Test to ensure parameterized configuration variables get resolved in workflow rerun
*/
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/test/resources/rerun-wf-fork.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/rerun-wf-fork.xml b/core/src/test/resources/rerun-wf-fork.xml
new file mode 100644
index 0000000..8fa8f34
--- /dev/null
+++ b/core/src/test/resources/rerun-wf-fork.xml
@@ -0,0 +1,63 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="rerun-wf-fork">
+ <global>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ </global>
+
+ <start to="f"/>
+
+ <fork name="f">
+ <path start="sh1"/>
+ <path start="sh2"/>
+ </fork>
+
+ <action name="sh1">
+ <shell xmlns="uri:oozie:shell-action:0.3">
+ <exec>sleep</exec>
+ <argument>15</argument>
+ </shell>
+ <ok to="j"/>
+ <error to="k"/>
+ </action>
+
+ <action name="sh2">
+ <shell xmlns="uri:oozie:shell-action:0.3">
+ <exec>echo</exec>
+ </shell>
+ <ok to="j"/>
+ <error to="k"/>
+ </action>
+
+ <join name="j" to="sh3"/>
+
+ <action name="sh3">
+ <shell xmlns="uri:oozie:shell-action:0.3">
+ <exec>${cmd3}</exec>
+ </shell>
+ <ok to="end"/>
+ <error to="k"/>
+ </action>
+
+ <kill name="k">
+ <message>kill</message>
+ </kill>
+
+ <end name="end"/>
+</workflow-app>
http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7441678..43a5aad 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)
OOZIE-1659 oozie-site is missing email-action-0.2 schema (jagatsingh via rkanter)
OOZIE-1492 Make sure HA works with HCat (ryota)
OOZIE-1869 Sharelib update shows vip/load balancer address as one of the hostname (puru via ryota)