You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/02/04 19:03:16 UTC

svn commit: r1442260 - in /oozie/branches/branch-3.3: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/workflow/lite/ core/src/test/java/org/apache/oozie/workflow/lite/

Author: rkanter
Date: Mon Feb  4 18:03:16 2013
New Revision: 1442260

URL: http://svn.apache.org/viewvc?rev=1442260&view=rev
Log:
OOZIE-1035 Improve forkjoin validation to allow same errorTo transitions (rkanter)

Modified:
    oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/ErrorCode.java
    oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
    oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
    oozie/branches/branch-3.3/release-log.txt

Modified: oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1442260&r1=1442259&r2=1442260&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/ErrorCode.java Mon Feb  4 18:03:16 2013
@@ -137,14 +137,18 @@ public enum ErrorCode {
     E0729(XLog.OPS, "Kill node message [{0}]"),
     E0730(XLog.STD, "Fork/Join not in pair"),
     E0731(XLog.STD, "Fork node [{0}] cannot have less than two paths"),
-    E0732(XLog.STD, "Fork [{0}]/Join [{1}] not in pair"),
+    E0732(XLog.STD, "Fork [{0}]/Join [{1}] not in pair (join should have been [{2}])"),
     E0733(XLog.STD, "Fork [{0}] without a join"),
     E0734(XLog.STD, "Invalid transition from node [{0}] to node [{1}] while using fork/join"),
     E0735(XLog.STD, "There was an invalid \"error to\" transition to node [{1}] while using fork/join"),
     E0736(XLog.STD, "Workflow definition length [{0}] exceeded maximum allowed length [{1}]"),
-    E0737(XLog.STD, "Invalid transition from node [{0}] to node [{1}] while using Fork/Join because node [{1}] is of type [{2}]"),
+    E0737(XLog.STD, "Invalid transition from node [{0}] to node [{1}] -- nodes of type 'end' are not allowed within Fork/Join"),
     E0738(XLog.STD, "The following {0} parameters are required but were not defined and no default values are available: {1}"),
     E0739(XLog.STD, "Parameter name cannot be empty"),
+    E0740(XLog.STD, "Invalid node type encountered (node [{0}])"),
+    E0741(XLog.STD, "Cycle detected transitioning to [{0}] via path {1}"),
+    E0742(XLog.STD, "No Fork for Join [{0}] to pair with"),
+    E0743(XLog.STD, "Multiple \"ok to\" transitions to the same node, [{0}], are not allowed"),
 
     E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
     E0801(XLog.STD, "Workflow already running, workflow [{0}]"),

Modified: oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1442260&r1=1442259&r2=1442260&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java (original)
+++ oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java Mon Feb  4 18:03:16 2013
@@ -41,8 +41,11 @@ import java.io.Reader;
 import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -95,6 +98,9 @@ public class LiteWorkflowAppParser {
 
     private List<String> forkList = new ArrayList<String>();
     private List<String> joinList = new ArrayList<String>();
+    private StartNodeDef startNode;
+    private List<String> visitedOkNodes = new ArrayList<String>();
+    private List<String> visitedJoinNodes = new ArrayList<String>();
 
     public LiteWorkflowAppParser(Schema schema,
                                  Class<? extends ControlNodeHandler> controlNodeHandler,
@@ -161,99 +167,122 @@ public class LiteWorkflowAppParser {
             throw new WorkflowException(ErrorCode.E0730);
         }
 
-        while(!forkList.isEmpty()){
-            // Make sure each of the fork node has a corresponding join; start with the root fork Node first
-            validateFork(app.getNode(forkList.remove(0)), app);
+        // No need to bother going through all of this if there are no fork/join nodes
+        if (!forkList.isEmpty()) {
+            visitedOkNodes.clear();
+            visitedJoinNodes.clear();
+            validateForkJoin(startNode, app, new LinkedList<String>(), new LinkedList<String>(), new LinkedList<String>(), true);
         }
-
     }
 
     /*
-     * Test whether the fork node has a corresponding join
-     * @param node - the fork node
-     * @param app - the WorkflowApp
-     * @return
+     * Recursively walk through the DAG and make sure that all fork paths are valid.
+     * This should be called from validateForkJoin(LiteWorkflowApp app).  It assumes that visitedOkNodes and visitedJoinNodes are
+     * both empty ArrayLists on the first call.
+     *
+     * @param node the current node; use the startNode on the first call
+     * @param app the WorkflowApp
+     * @param forkNodes a stack of the current fork nodes
+     * @param joinNodes a stack of the current join nodes
+     * @param path a stack of the current path
+     * @param okTo false if node (or an ancestor of node) was gotten to via an "error to" transition or via a join node that has
+     * already been visited at least once before
      * @throws WorkflowException
      */
-    private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
-        List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
-        // list for keeping track of "error-to" transitions of Action Node
-        List<String> errorToTransitions = new ArrayList<String>();
-        String joinNode = null;
-        for (int i = 0; i < transitions.size(); i++) {
-            NodeDef node = app.getNode(transitions.get(i));
-            if (node instanceof DecisionNodeDef) {
-                // Make sure the transition is valid
-                validateTransition(errorToTransitions, transitions, app, node);
-                // Add each transition to transitions (once) if they are not a kill node
-                HashSet<String> decisionSet = new HashSet<String>(node.getTransitions());
-                for (String ds : decisionSet) {
-                    if (!(app.getNode(ds) instanceof KillNodeDef)) {
-                        transitions.add(ds);
-                    }
-                }
-            } else if (node instanceof ActionNodeDef) {
-                // Make sure the transition is valid
-                validateTransition(errorToTransitions, transitions, app, node);
-                // Add the "ok-to" transition of node if its not a kill node
-                String okTo = node.getTransitions().get(0);
-                if (!(app.getNode(okTo) instanceof KillNodeDef)) {
-                    transitions.add(okTo);
-                }
-                String errorTo = node.getTransitions().get(1);
-                // Add the "error-to" transition if the transition is a Action Node
-                if (app.getNode(errorTo) instanceof ActionNodeDef) {
-                    errorToTransitions.add(errorTo);
-                }
-            } else if (node instanceof ForkNodeDef) {
-                forkList.remove(node.getName());
-                // Make a recursive call to resolve this fork node
-                NodeDef joinNd = validateFork(node, app);
-                // Make sure the transition is valid
-                validateTransition(errorToTransitions, transitions, app, node);
-                // Add the "ok-to" transition of node
-                transitions.add(joinNd.getTransitions().get(0));
-            } else if (node instanceof JoinNodeDef) {
-                // If joinNode encountered for the first time, remove it from the joinList and remember it
-                String currentJoin = node.getName();
-                if (joinList.contains(currentJoin)) {
-                    joinList.remove(currentJoin);
-                    joinNode = currentJoin;
-                } else {
-                    // Make sure this join is the same as the join seen from the first time
-                    if (joinNode == null) {
-                        throw new WorkflowException(ErrorCode.E0733, forkNode);
-                    }
-                    if (!joinNode.equals(currentJoin)) {
-                        throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
-                    }
-                }
+    private void validateForkJoin(NodeDef node, LiteWorkflowApp app, Deque<String> forkNodes, Deque<String> joinNodes,
+            Deque<String> path, boolean okTo) throws WorkflowException {
+        if (path.contains(node.getName())) {
+            // cycle
+            throw new WorkflowException(ErrorCode.E0741, node.getName(), Arrays.toString(path.toArray()));
+        }
+        path.push(node.getName());
+
+        // Make sure that we're not revisiting a node (that's not a Kill, Join, or End type) that's been visited before from an
+        // "ok to" transition; if its from an "error to" transition, then its okay to visit it multiple times.  Also, because we
+        // traverse through join nodes multiple times, we have to make sure not to throw an exception here when we're really just
+        // re-walking the same execution path (this is why we need the visitedJoinNodes list used later)
+        if (okTo && !(node instanceof KillNodeDef) && !(node instanceof JoinNodeDef) && !(node instanceof EndNodeDef)) {
+            if (visitedOkNodes.contains(node.getName())) {
+                throw new WorkflowException(ErrorCode.E0743, node.getName());
+            }
+            visitedOkNodes.add(node.getName());
+        }
+
+        if (node instanceof StartNodeDef) {
+            String transition = node.getTransitions().get(0);   // start always has only 1 transition
+            NodeDef tranNode = app.getNode(transition);
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
+        }
+        else if (node instanceof ActionNodeDef) {
+            String transition = node.getTransitions().get(0);   // "ok to" transition
+            NodeDef tranNode = app.getNode(transition);
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);  // propogate okTo
+            transition = node.getTransitions().get(1);          // "error to" transition
+            tranNode = app.getNode(transition);
+            validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false); // use false
+        }
+        else if (node instanceof DecisionNodeDef) {
+            for(String transition : (new HashSet<String>(node.getTransitions()))) {
+                NodeDef tranNode = app.getNode(transition);
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
+            }
+        }
+        else if (node instanceof ForkNodeDef) {
+            forkNodes.push(node.getName());
+            for(String transition : (new HashSet<String>(node.getTransitions()))) {
+                NodeDef tranNode = app.getNode(transition);
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, okTo);
+            }
+            forkNodes.pop();
+            if (!joinNodes.isEmpty()) {
+                joinNodes.pop();
+            }
+        }
+        else if (node instanceof JoinNodeDef) {
+            if (forkNodes.isEmpty()) {
+                // no fork for join to match with
+                throw new WorkflowException(ErrorCode.E0742, node.getName());
+            }
+            if (forkNodes.size() > joinNodes.size() && (joinNodes.isEmpty() || !joinNodes.peek().equals(node.getName()))) {
+                joinNodes.push(node.getName());
+            }
+            if (!joinNodes.peek().equals(node.getName())) {
+                // join doesn't match fork
+                throw new WorkflowException(ErrorCode.E0732, forkNodes.peek(), node.getName(), joinNodes.peek());
+            }
+            joinNodes.pop();
+            String currentForkNode = forkNodes.pop();
+            String transition = node.getTransitions().get(0);   // join always has only 1 transition
+            NodeDef tranNode = app.getNode(transition);
+            // If we're already under a situation where okTo is false, use false (propogate it)
+            // Or if we've already visited this join node, use false (because we've already traversed this path before and we don't
+            // want to throw an exception from the check against visitedOkNodes)
+            if (!okTo || visitedJoinNodes.contains(node.getName())) {
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, false);
+            // Else, use true because this is either the first time we've gone through this join node or okTo was already false
             } else {
-                throw new WorkflowException(ErrorCode.E0730);
+                visitedJoinNodes.add(node.getName());
+                validateForkJoin(tranNode, app, forkNodes, joinNodes, path, true);
             }
+            forkNodes.push(currentForkNode);
+            joinNodes.push(node.getName());
         }
-        return app.getNode(joinNode);
-
-    }
-
-    private void validateTransition(List<String> errorToTransitions, List<String> transitions, LiteWorkflowApp app, NodeDef node)
-            throws WorkflowException {
-        for (String transition : node.getTransitions()) {
-            // Make sure the transition node is not an end node
-            NodeDef tNode = app.getNode(transition);
-            if (tNode instanceof EndNodeDef) {
-                throw new WorkflowException(ErrorCode.E0737, node.getName(), transition, "end");
-            }
-            // Make sure the transition node is either a join node or is not already visited
-            if (transitions.contains(transition) && !(tNode instanceof JoinNodeDef)) {
-                    throw new WorkflowException(ErrorCode.E0734, node.getName(), transition);
-                }
-                // Make sure the transition node is not the same as an already visited 'error-to' transition
-                if (errorToTransitions.contains(transition)) {
-                    throw new WorkflowException(ErrorCode.E0735, node.getName(), transition);
-                }
+        else if (node instanceof KillNodeDef) {
+            // do nothing
+        }
+        else if (node instanceof EndNodeDef) {
+            if (!forkNodes.isEmpty()) {
+                path.pop();     // = node
+                String parent = path.peek();
+                // can't go to an end node in a fork
+                throw new WorkflowException(ErrorCode.E0737, parent, node.getName());
             }
-
+        }
+        else {
+            // invalid node type (shouldn't happen)
+            throw new WorkflowException(ErrorCode.E0740, node.getName());
+        }
+        path.pop();
     }
 
     /**
@@ -379,7 +408,10 @@ public class LiteWorkflowAppParser {
      * @throws WorkflowException
      */
     private void validate(LiteWorkflowApp app, NodeDef node, Map<String, VisitStatus> traversed) throws WorkflowException {
-        if (!(node instanceof StartNodeDef)) {
+        if (node instanceof StartNodeDef) {
+            startNode = (StartNodeDef) node;
+        }
+        else {
             try {
                 ParamChecker.validateActionName(node.getName());
             }

Modified: oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java?rev=1442260&r1=1442259&r2=1442260&view=diff
==============================================================================
--- oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java (original)
+++ oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java Mon Feb  4 18:03:16 2013
@@ -451,10 +451,9 @@ public class TestLiteWorkflowAppParser e
         } catch (Exception ex) {
             WorkflowException we = (WorkflowException) ex.getCause();
             assertEquals(ErrorCode.E0737, we.getErrorCode());
-            // Make sure the message contains the nodes and type involved in the invalid transition to end
-            assertTrue(we.getMessage().contains("three"));
+            // Make sure the message contains the nodes involved in the invalid transition to end
+            assertTrue(we.getMessage().contains("node [three]"));
             assertTrue(we.getMessage().contains("node [end]"));
-            assertTrue(we.getMessage().contains("type [end]"));
         }
     }
 
@@ -497,7 +496,8 @@ public class TestLiteWorkflowAppParser e
             fail("Expected to catch an exception but did not encounter any");
         } catch (Exception ex) {
             WorkflowException we = (WorkflowException) ex.getCause();
-            assertEquals(ErrorCode.E0730, we.getErrorCode());
+            assertEquals(ErrorCode.E0742, we.getErrorCode());
+            assertTrue(we.getMessage().contains("[j2]"));
         }
     }
 
@@ -507,6 +507,7 @@ public class TestLiteWorkflowAppParser e
      2->fail->j
      3->ok->j
      3->fail->k
+     j->k
     */
     public void testTransitionFailure1() throws WorkflowException{
         LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
@@ -530,9 +531,8 @@ public class TestLiteWorkflowAppParser e
             fail("Expected to catch an exception but did not encounter any");
         } catch (Exception ex) {
             WorkflowException we = (WorkflowException) ex.getCause();
-            assertEquals(ErrorCode.E0734, we.getErrorCode());
-            // Make sure the message contains the nodes involved in the invalid transition
-            assertTrue(we.getMessage().contains("two"));
+            assertEquals(ErrorCode.E0743, we.getErrorCode());
+            // Make sure the message contains the node involved in the invalid transition
             assertTrue(we.getMessage().contains("three"));
         }
 
@@ -544,8 +544,9 @@ public class TestLiteWorkflowAppParser e
     2->ok->j
     3->ok->j
     3->fail->k
+    j->end
    */
-   public void testTransitionFailure2() throws WorkflowException{
+   public void testTransition2() throws WorkflowException{
        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
                LiteWorkflowStoreService.LiteControlNodeHandler.class,
                LiteWorkflowStoreService.LiteDecisionHandler.class,
@@ -558,19 +559,15 @@ public class TestLiteWorkflowAppParser e
                                 Arrays.asList(new String[]{"two","three"})))
        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j", "three"))
        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "k"))
-       .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "k"))
+       .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
        .addNode(new KillNodeDef("k", "kill", LiteWorkflowStoreService.LiteControlNodeHandler.class))
        .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
 
        try {
            invokeForkJoin(parser, def);
-           fail("Expected to catch an exception but did not encounter any");
        } catch (Exception ex) {
-           WorkflowException we = (WorkflowException) ex.getCause();
-           assertEquals(ErrorCode.E0734, we.getErrorCode());
-           // Make sure the message contains the nodes involved in the invalid transition
-           assertTrue(we.getMessage().contains("two"));
-           assertTrue(we.getMessage().contains("three"));
+           ex.printStackTrace();
+           fail("Unexpected Exception");
        }
 
    }
@@ -578,11 +575,14 @@ public class TestLiteWorkflowAppParser e
    /*
    f->(2,3)
    2->ok->j
-   3->ok->4
    2->fail->4
+   3->ok->4
+   3->fail->k
    4->ok->j
+   4->fail->k
+   j->end
   */
-   public void testTransitionFailure3() throws WorkflowException{
+   public void testTransition3() throws WorkflowException{
        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
                LiteWorkflowStoreService.LiteControlNodeHandler.class,
                LiteWorkflowStoreService.LiteDecisionHandler.class,
@@ -596,18 +596,15 @@ public class TestLiteWorkflowAppParser e
        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j", "four"))
        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "four", "k"))
        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "k"))
-       .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "k"))
+       .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
        .addNode(new KillNodeDef("k", "kill", LiteWorkflowStoreService.LiteControlNodeHandler.class))
        .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
 
        try {
            invokeForkJoin(parser, def);
-           fail("Expected to catch an exception but did not encounter any");
        } catch (Exception ex) {
-           WorkflowException we = (WorkflowException) ex.getCause();
-           assertEquals(ErrorCode.E0735, we.getErrorCode());
-           // Make sure the message contains the node involved in the invalid transition
-           assertTrue(we.getMessage().contains("four"));
+           ex.printStackTrace();
+           fail("Unexpected Exception");
        }
    }
 
@@ -615,12 +612,13 @@ public class TestLiteWorkflowAppParser e
     * f->(2,3)
     * 2->ok->j
     * 3->ok->j
-    * j->end
+    * j->6
     * 2->error->f1
     * 3->error->f1
     * f1->(4,5)
     * (4,5)->j1
-    * j1->end
+    * j1->6
+    * 6->k
     */
    public void testErrorTransitionForkJoin() throws WorkflowException {
        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
@@ -641,7 +639,7 @@ public class TestLiteWorkflowAppParser e
        .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j1", "k"))
        .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "six"))
        .addNode(new JoinNodeDef("j1", LiteWorkflowStoreService.LiteControlNodeHandler.class, "six"))
-       .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class, "end", "end"))
+       .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class, "k", "k"))
        .addNode(new KillNodeDef("k", "kill", LiteWorkflowStoreService.LiteControlNodeHandler.class))
        .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
 
@@ -653,7 +651,7 @@ public class TestLiteWorkflowAppParser e
        }
    }
 
-    /*
+   /*
     f->(2,3)
     2->decision node->{4,5,4}
     4->j
@@ -760,8 +758,11 @@ public class TestLiteWorkflowAppParser e
     /*
      *f->(2,3)
      *2->decision node->{3,4}
-     *3->j
-     *4->j
+     *3->ok->j
+     *3->fail->k
+     *4->ok->j
+     *4->fail->k
+     *j->end
      */
     public void testDecisionForkJoinFailure() throws WorkflowException{
         LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
@@ -777,7 +778,7 @@ public class TestLiteWorkflowAppParser e
                                      Arrays.asList(new String[]{"four","three"})))
         .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "k"))
         .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "k"))
-        .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "k"))
+        .addNode(new JoinNodeDef("j", LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
         .addNode(new KillNodeDef("k", "kill", LiteWorkflowStoreService.LiteControlNodeHandler.class))
         .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
 
@@ -786,9 +787,8 @@ public class TestLiteWorkflowAppParser e
             fail("Expected to catch an exception but did not encounter any");
         } catch (Exception ex) {
             WorkflowException we = (WorkflowException) ex.getCause();
-            assertEquals(ErrorCode.E0734, we.getErrorCode());
-            // Make sure the message contains the nodes involved in the invalid transition
-            assertTrue(we.getMessage().contains("two"));
+            assertEquals(ErrorCode.E0743, we.getErrorCode());
+            // Make sure the message contains the node involved in the invalid transition
             assertTrue(we.getMessage().contains("three"));
         }
     }
@@ -796,8 +796,11 @@ public class TestLiteWorkflowAppParser e
     /*
      *f->(2,3)
      *2->decision node->{4,end}
-     *3->j
-     *4->j
+     *3->ok->j
+     *3->fail->k
+     *4->ok->j
+     *4->fail->k
+     *j->end
      */
     public void testDecisionToEndForkJoinFailure() throws WorkflowException{
         LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
@@ -823,10 +826,9 @@ public class TestLiteWorkflowAppParser e
         } catch (Exception ex) {
             WorkflowException we = (WorkflowException) ex.getCause();
             assertEquals(ErrorCode.E0737, we.getErrorCode());
-            // Make sure the message contains the nodes and type involved in the invalid transition to end
-            assertTrue(we.getMessage().contains("two"));
+            // Make sure the message contains the nodes involved in the invalid transition to end
+            assertTrue(we.getMessage().contains("node [two]"));
             assertTrue(we.getMessage().contains("node [end]"));
-            assertTrue(we.getMessage().contains("type [end]"));
         }
     }
 
@@ -869,6 +871,45 @@ public class TestLiteWorkflowAppParser e
         }
     }
 
+    /*
+     * f->(1,2)
+     * 1->ok->j1
+     * 1->fail->k
+     * 2->ok->j2
+     * 2->fail->k
+     * j1->end
+     * j2->f2
+     * f2->k,k
+     */
+    public void testForkJoinMismatch() throws WorkflowException {
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "f"))
+        .addNode(new ForkNodeDef("f", LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                        Arrays.asList(new String[]{"one", "two"})))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "j1", "k"))
+        .addNode(new JoinNodeDef("j1", LiteWorkflowStoreService.LiteControlNodeHandler.class, "end"))
+        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j2", "k"))
+        .addNode(new JoinNodeDef("j2", LiteWorkflowStoreService.LiteControlNodeHandler.class, "f2"))
+        .addNode(new ForkNodeDef("f2", LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                        Arrays.asList(new String[]{"k", "k"})))
+        .addNode(new KillNodeDef("k", "kill", LiteWorkflowStoreService.LiteControlNodeHandler.class))
+        .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+        try {
+            invokeForkJoin(parser, def);
+            fail("Expected to catch an exception but did not encounter any");
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0732, we.getErrorCode());
+            assertTrue(we.getMessage().contains("Fork [f]"));
+            assertTrue(we.getMessage().contains("Join [j1]"));
+            assertTrue(we.getMessage().contains("been [j2]"));
+        }
+    }
+
     // Invoke private validateForkJoin method using Reflection API
     private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp def) throws Exception {
         Class<? extends LiteWorkflowAppParser> c = parser.getClass();

Modified: oozie/branches/branch-3.3/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/branches/branch-3.3/release-log.txt?rev=1442260&r1=1442259&r2=1442260&view=diff
==============================================================================
--- oozie/branches/branch-3.3/release-log.txt (original)
+++ oozie/branches/branch-3.3/release-log.txt Mon Feb  4 18:03:16 2013
@@ -1,5 +1,6 @@
 -- Oozie 3.3.2 (unreleased)
 
+OOZIE-1035 Improve forkjoin validation to allow same errorTo transitions (rkanter)
 OOZIE-1137 In light of federation use actionLibPath instead of appPath (vaidya via rkanter)
 OOZIE-1126 see if checkstyle works for oozie development. (jaoki via rkanter)
 OOZIE-1124 Split pig unit tests to a separate module (rohini via virag)