You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/06/17 02:31:04 UTC

git commit: OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 25c640b3b -> cbb1eac9d


OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/cbb1eac9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/cbb1eac9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/cbb1eac9

Branch: refs/heads/master
Commit: cbb1eac9db427db0047847e3ed5564979be5571c
Parents: 25c640b
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jun 16 17:30:12 2014 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jun 16 17:30:12 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/WorkflowActionBean.java    |  2 +-
 .../apache/oozie/command/wf/ReRunXCommand.java  | 10 ++-
 .../jpa/WorkflowActionQueryExecutor.java        |  1 +
 .../org/apache/oozie/workflow/WorkflowLib.java  |  8 ++-
 .../workflow/lite/LiteWorkflowInstance.java     | 72 ++++++++++++++++++++
 .../oozie/workflow/lite/LiteWorkflowLib.java    |  7 +-
 .../oozie/command/wf/TestReRunXCommand.java     | 68 ++++++++++++++++++
 core/src/test/resources/rerun-wf-fork.xml       | 63 +++++++++++++++++
 release-log.txt                                 |  1 +
 9 files changed, 226 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index 6861995..e64e9bf 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -109,7 +109,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.statusStr = 'START_RETRY' OR a.statusStr = 'START_MANUAL' OR a.statusStr = 'END_RETRY' OR a.statusStr = 'END_MANUAL')"),
 
-    @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") })
+    @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr, a.endTimestamp from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") })
 @Table(name = "WF_ACTIONS")
 public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
     @Id

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
index fe588d4..5dd06ca 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
@@ -158,8 +158,16 @@ public class ReRunXCommand extends WorkflowXCommand<Void> {
             // Resetting the conf to contain all the resolved values is necessary to ensure propagation of Oozie properties to Hadoop calls downstream
             conf = ((XConfiguration) conf).resolve();
 
+            // Prepare the action endtimes map
+            Map<String, Date> actionEndTimes = new HashMap<String, Date>();
+            for (WorkflowActionBean action : actions) {
+                if (action.getEndTime() != null) {
+                    actionEndTimes.put(action.getName(), action.getEndTime());
+                }
+            }
+
             try {
-                newWfInstance = workflowLib.createInstance(app, conf, jobId);
+                newWfInstance = workflowLib.createInstance(app, conf, jobId, actionEndTimes);
             }
             catch (WorkflowException e) {
                 throw new CommandException(e);

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index 9156a27..0c323a3 100644
--- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -364,6 +364,7 @@ public class WorkflowActionQueryExecutor extends
                 bean.setId((String) arr[0]);
                 bean.setName((String) arr[1]);
                 bean.setStatusStr((String) arr[2]);
+                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
                 break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
index 7e4c90a..e79e59d 100644
--- a/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/WorkflowLib.java
@@ -17,6 +17,8 @@
  */
 package org.apache.oozie.workflow;
 
+import java.util.Date;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -50,15 +52,17 @@ public interface WorkflowLib {
     public WorkflowInstance createInstance(WorkflowApp app, Configuration conf) throws WorkflowException;
 
     /**
-     * Create a workflow instance with the given wfId. This will be used for re-running workflows.
+     * Create a workflow instance with the given wfId and actions endtime map. This will be used for re-running workflows.
      *
      * @param app application to create a workflow instance of.
      * @param conf job configuration.
      * @param wfId Workflow ID.
+     * @param actionEndTimes A map of the actions to their endtimes; actions with no endtime should be omitted
      * @return the newly created workflow instance.
      * @throws WorkflowException thrown if the instance could not be created.
      */
-    public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException;
+    public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes)
+            throws WorkflowException;
 
     /**
      * Insert a workflow instance in storage.

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
index bf8dc05..a5db84a 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
@@ -37,6 +37,10 @@ import java.io.IOException;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,6 +49,8 @@ import java.util.Map;
 public class LiteWorkflowInstance implements Writable, WorkflowInstance {
     private static final String TRANSITION_TO = "transition.to";
 
+    private final Date FAR_INTO_THE_FUTURE = new Date(Long.MAX_VALUE);
+
     private XLog log;
 
     private static String PATH_SEPARATOR = "/";
@@ -154,6 +160,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
     private Map<String, NodeInstance> executionPaths = new HashMap<String, NodeInstance>();
     private Map<String, String> persistentVars = new HashMap<String, String>();
     private Map<String, Object> transientVars = new HashMap<String, Object>();
+    private ActionEndTimesComparator actionEndTimesComparator = null;
 
     protected LiteWorkflowInstance() {
         log = XLog.getLog(getClass());
@@ -168,6 +175,11 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
         status = Status.PREP;
     }
 
+    public LiteWorkflowInstance(LiteWorkflowApp def, Configuration conf, String instanceId, Map<String, Date> actionEndTimes) {
+        this(def, conf, instanceId);
+        actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
+    }
+
     public synchronized boolean start() throws WorkflowException {
         if (status != Status.PREP) {
             throw new WorkflowException(ErrorCode.E0719);
@@ -294,6 +306,16 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
                                 }
 
                             }
+
+                            // If we're doing a rerun, then we need to make sure to put the actions in pathToStart into the order
+                            // that they ended in.  Otherwise, it could result in an error later on in some edge cases.
+                            // e.g. You have a fork with two nodes, A and B, that both succeeded, followed by a join and some more
+                            // nodes, some of which failed.  If you do the rerun, it will always signal A and then B, even if in the
+                            // original run B signaled first and then A.  By sorting this, we maintain the proper signal ordering.
+                            if (actionEndTimesComparator != null && pathsToStart.size() > 1) {
+                                Collections.sort(pathsToStart, actionEndTimesComparator);
+                            }
+
                             // signal all new synch transitions
                             for (String pathToStart : pathsToStart) {
                                 signal(pathToStart, "::synch::");
@@ -585,6 +607,14 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
             dOut.writeUTF(entry.getKey());
             writeStringAsBytes(entry.getValue(), dOut);
         }
+        if (actionEndTimesComparator != null) {
+            Map<String, Date> actionEndTimes = actionEndTimesComparator.getActionEndTimes();
+            dOut.writeInt(actionEndTimes.size());
+            for (Map.Entry<String, Date> entry : actionEndTimes.entrySet()) {
+                dOut.writeUTF(entry.getKey());
+                dOut.writeLong(entry.getValue().getTime());
+            }
+        }
     }
 
     @Override
@@ -616,6 +646,21 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
             String vVal = readBytesAsString(dIn);
             persistentVars.put(vName, vVal);
         }
+        int numActionEndTimes = -1;
+        try {
+            numActionEndTimes = dIn.readInt();
+        } catch (IOException ioe) {
+            // This means that there isn't an actionEndTimes, so just ignore
+        }
+        if (numActionEndTimes > 0) {
+            Map<String, Date> actionEndTimes = new HashMap<String, Date>(numActionEndTimes);
+            for (int x = 0; x < numActionEndTimes; x++) {
+                String name = dIn.readUTF();
+                long endTime = dIn.readLong();
+                actionEndTimes.put(name, new Date(endTime));
+            }
+            actionEndTimesComparator = new ActionEndTimesComparator(actionEndTimes);
+        }
         refreshLog();
     }
 
@@ -671,4 +716,31 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance {
         return instanceId.hashCode();
     }
 
+    private class ActionEndTimesComparator implements Comparator<String> {
+
+        private final Map<String, Date> actionEndTimes;
+
+        public ActionEndTimesComparator(Map<String, Date> actionEndTimes) {
+            this.actionEndTimes = actionEndTimes;
+        }
+
+        @Override
+        public int compare(String node1, String node2) {
+            Date date1 = FAR_INTO_THE_FUTURE;
+            Date date2 = FAR_INTO_THE_FUTURE;
+            NodeInstance node1Instance = executionPaths.get(node1);
+            if (node1Instance != null) {
+                date1 = this.actionEndTimes.get(node1Instance.nodeName);
+            }
+            NodeInstance node2Instance = executionPaths.get(node2);
+            if (node2Instance != null) {
+                date2 = this.actionEndTimes.get(node2Instance.nodeName);
+            }
+            return date1.compareTo(date2);
+        }
+
+        public Map<String, Date> getActionEndTimes() {
+            return actionEndTimes;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
index 7f6f1cc..0e0aefd 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 
 import javax.xml.validation.Schema;
 import java.io.StringReader;
+import java.util.Date;
+import java.util.Map;
 
 //TODO javadoc
 public abstract class LiteWorkflowLib implements WorkflowLib {
@@ -63,9 +65,10 @@ public abstract class LiteWorkflowLib implements WorkflowLib {
     }
 
     @Override
-    public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId) throws WorkflowException {
+    public WorkflowInstance createInstance(WorkflowApp app, Configuration conf, String wfId, Map<String, Date> actionEndTimes)
+            throws WorkflowException {
         ParamChecker.notNull(app, "app");
         ParamChecker.notNull(wfId, "wfId");
-        return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId);
+        return new LiteWorkflowInstance((LiteWorkflowApp) app, conf, wfId, actionEndTimes);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
index 1688dc9..5bae614 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestReRunXCommand.java
@@ -24,13 +24,16 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Reader;
 import java.io.Writer;
+import java.util.List;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.action.hadoop.ShellActionExecutor;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.test.XDataTestCase;
@@ -39,7 +42,9 @@ import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.SchemaService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.XLogService;
 
@@ -110,6 +115,69 @@ public class TestReRunXCommand extends XDataTestCase {
         assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId1).getStatus());
     }
 
+    /**
+     * This tests a specific edge case where rerun can fail when there's a fork, the actions in the fork succeed, but an action
+     * after the fork fails.  Previously, the rerun would step through the forked actions in the order they were listed in the
+     * fork action's XML; if they happened to finish in a different order, this would cause an error during rerun.  This is fixed by
+     * enforcing the same order in LiteWorkflowInstance#signal, which this test verifies.
+     *
+     * @throws Exception
+     */
+    public void testRerunFork() throws Exception {
+        // We need the shell schema and action for this test
+        Services.get().getConf().set(ActionService.CONF_ACTION_EXECUTOR_EXT_CLASSES, ShellActionExecutor.class.getName());
+        Services.get().setService(ActionService.class);
+        Services.get().getConf().set(SchemaService.WF_CONF_EXT_SCHEMAS, "shell-action-0.3.xsd");
+        Services.get().setService(SchemaService.class);
+
+        Reader reader = IOUtils.getResourceAsReader("rerun-wf-fork.xml", -1);
+        Writer writer = new FileWriter(new File(getTestCaseDir(), "workflow.xml"));
+        IOUtils.copyCharStream(reader, writer);
+
+        final OozieClient wfClient = LocalOozie.getClient();
+        Properties conf = wfClient.createConfiguration();
+        conf.setProperty("nameNode", getNameNodeUri());
+        conf.setProperty("jobTracker", getJobTrackerUri());
+        conf.setProperty(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml"));
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("cmd3", "echo1");      // expected to fail
+
+        final String jobId1 = wfClient.submit(conf);
+        wfClient.start(jobId1);
+        waitFor(40 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(jobId1).getStatus() == WorkflowJob.Status.KILLED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.KILLED, wfClient.getJobInfo(jobId1).getStatus());
+        List<WorkflowAction> actions = wfClient.getJobInfo(jobId1).getActions();
+        assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus());     // fork
+        assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus());     // sh1
+        assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus());     // sh2
+        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());     // join
+        assertEquals(WorkflowAction.Status.ERROR, actions.get(5).getStatus());  // sh3
+
+        // rerun failed node, which is after the fork
+        conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
+        conf.setProperty("cmd3", "echo");      // expected to succeed
+
+        wfClient.reRun(jobId1, conf);
+        waitFor(40 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(jobId1).getStatus() == WorkflowJob.Status.SUCCEEDED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.SUCCEEDED, wfClient.getJobInfo(jobId1).getStatus());
+        actions = wfClient.getJobInfo(jobId1).getActions();
+        assertEquals(WorkflowAction.Status.OK, actions.get(1).getStatus());     // fork
+        assertEquals(WorkflowAction.Status.OK, actions.get(2).getStatus());     // sh1
+        assertEquals(WorkflowAction.Status.OK, actions.get(3).getStatus());     // sh2
+        assertEquals(WorkflowAction.Status.OK, actions.get(4).getStatus());     // join
+        assertEquals(WorkflowAction.Status.OK, actions.get(5).getStatus());     // sh3
+    }
+
     /*
      * Test to ensure parameterized configuration variables get resolved in workflow rerun
      */

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/core/src/test/resources/rerun-wf-fork.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/rerun-wf-fork.xml b/core/src/test/resources/rerun-wf-fork.xml
new file mode 100644
index 0000000..8fa8f34
--- /dev/null
+++ b/core/src/test/resources/rerun-wf-fork.xml
@@ -0,0 +1,63 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="rerun-wf-fork">
+    <global>
+       <job-tracker>${jobTracker}</job-tracker>
+       <name-node>${nameNode}</name-node>
+    </global>
+
+    <start to="f"/>
+
+    <fork name="f">
+        <path start="sh1"/>
+        <path start="sh2"/>
+    </fork>
+
+    <action name="sh1">
+        <shell xmlns="uri:oozie:shell-action:0.3">
+            <exec>sleep</exec>
+            <argument>15</argument>
+        </shell>
+        <ok to="j"/>
+        <error to="k"/>
+    </action>
+
+    <action name="sh2">
+        <shell xmlns="uri:oozie:shell-action:0.3">
+            <exec>echo</exec>
+        </shell>
+        <ok to="j"/>
+        <error to="k"/>
+    </action>
+
+    <join name="j" to="sh3"/>
+
+    <action name="sh3">
+        <shell xmlns="uri:oozie:shell-action:0.3">
+            <exec>${cmd3}</exec>
+        </shell>
+        <ok to="end"/>
+        <error to="k"/>
+    </action>
+
+    <kill name="k">
+        <message>kill</message>
+    </kill>
+
+    <end name="end"/>
+</workflow-app>

http://git-wip-us.apache.org/repos/asf/oozie/blob/cbb1eac9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7441678..43a5aad 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes (rkanter)
 OOZIE-1659 oozie-site is missing email-action-0.2 schema (jagatsingh via rkanter)
 OOZIE-1492 Make sure HA works with HCat (ryota)
 OOZIE-1869 Sharelib update shows vip/load balancer address as one of the hostname (puru via ryota)