You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/02/08 21:41:32 UTC

svn commit: r1242082 - in /incubator/oozie/trunk: ./ core/src/main/java/org/apache/oozie/ core/src/main/java/org/apache/oozie/workflow/lite/ core/src/main/resources/ core/src/test/java/org/apache/oozie/service/ core/src/test/java/org/apache/oozie/workf...

Author: tucu
Date: Wed Feb  8 20:41:31 2012
New Revision: 1242082

URL: http://svn.apache.org/viewvc?rev=1242082&view=rev
Log:
OOZIE-636 Validate fork-join (virag via tucu)

Modified:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
    incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
    incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed Feb  8 20:41:31 2012
@@ -133,6 +133,11 @@ public enum ErrorCode {
     E0727(XLog.STD, "Workflow Job can not be suspended as its not in running state, {0}, Status: {1}"),
     E0728(XLog.STD, "Coordinator Job can not be suspended as job finished or failed or killed, id : {0}, status : {1}"),
     E0729(XLog.OPS, "Kill node message [{0}]"),
+    E0730(XLog.STD, "Fork/Join not in pair"),
+    E0731(XLog.STD, "Fork node [{0}] cannot have less than two paths"),
+    E0732(XLog.STD, "Fork [{0}]/Join [{1}] not in pair"),
+    E0733(XLog.STD, "Fork [{0}] without a join"),
+    E0734(XLog.STD, "Invalid transition from node [{0}] to node [{1}] while using fork/join"),
 
     E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
     E0801(XLog.STD, "Workflow already running, workflow [{0}]"),

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java Wed Feb  8 20:41:31 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,8 +38,11 @@ import java.io.StringReader;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 
 /**
  * Class to parse and validate workflow xml
@@ -74,6 +77,7 @@ public class LiteWorkflowAppParser {
     private static final String DECISION_DEFAULT_E = "default";
 
     private static final String KILL_MESSAGE_E = "message";
+    public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
 
     private Schema schema;
     private Class<? extends DecisionNodeHandler> decisionHandlerClass;
@@ -83,8 +87,8 @@ public class LiteWorkflowAppParser {
         VISITING, VISITED
     }
 
-    ;
-
+    private List<String> forkList = new ArrayList<String>();
+    private List<String> joinList = new ArrayList<String>();
 
     public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
                                  Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
@@ -116,6 +120,10 @@ public class LiteWorkflowAppParser {
             Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
             traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
             validate(app, app.getNode(StartNodeDef.START), traversed);
+            //Validate whether fork/join are in pair or not
+            if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
+                validateForkJoin(app);
+            }
             return app;
         }
         catch (JDOMException ex) {
@@ -130,6 +138,90 @@ public class LiteWorkflowAppParser {
     }
 
     /**
+     * Validate whether fork/join are in pair or not
+     * @param app LiteWorkflowApp
+     * @throws WorkflowException
+     */
+    private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
+        // Make sure the number of forks and joins in wf are equal
+        if (forkList.size() != joinList.size()) {
+            throw new WorkflowException(ErrorCode.E0730);
+        }
+
+        while(!forkList.isEmpty()){
+            // Make sure each of the fork node has a corresponding join; start with the root fork Node first
+            validateFork(app.getNode(forkList.remove(0)), app);
+        }
+
+    }
+
+    /*
+     * Test whether the fork node has a corresponding join
+     * @param node - the fork node
+     * @param app - the WorkflowApp
+     * @return
+     * @throws WorkflowException
+     */
+    private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
+        List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
+        String joinNode = null;
+        for (int i = 0; i < transitions.size(); i++) {
+            NodeDef node = app.getNode(transitions.get(i));
+            if (node instanceof DecisionNodeDef) {
+                Set<String> decisionSet = new HashSet<String>(node.getTransitions());
+                for (String ds : decisionSet) {
+                    if (transitions.contains(ds)) {
+                        throw new WorkflowException(ErrorCode.E0734, node.getName(), ds);
+                    } else {
+                        transitions.add(ds);
+                    }
+                }
+            } else if (node instanceof ActionNodeDef) {
+                // Get only the "ok-to" transition of node
+                String okToTransition = node.getTransitions().get(0);
+                // Make sure the transition is valid
+                validateTransition(transitions, app, okToTransition, node);
+                transitions.add(okToTransition);
+            } else if (node instanceof ForkNodeDef) {
+                forkList.remove(node.getName());
+                // Make a recursive call to resolve this fork node
+                NodeDef joinNd = validateFork(node, app);
+                String okToTransition = joinNd.getTransitions().get(0);
+                // Make sure the transition is valid
+                validateTransition(transitions, app, okToTransition, node);
+                transitions.add(okToTransition);
+            } else if (node instanceof JoinNodeDef) {
+                // If joinNode encountered for the first time, remove it from the joinList and remember it
+                String currentJoin = node.getName();
+                if (joinList.contains(currentJoin)) {
+                    joinList.remove(currentJoin);
+                    joinNode = currentJoin;
+                } else {
+                    // Make sure this join is the same as the join seen from the first time
+                    if (joinNode == null) {
+                        throw new WorkflowException(ErrorCode.E0733, forkNode);
+                    }
+                    if (!joinNode.equals(currentJoin)) {
+                        throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
+                    }
+                }
+            } else {
+                throw new WorkflowException(ErrorCode.E0730);
+            }
+        }
+        return app.getNode(joinNode);
+
+    }
+
+    private void validateTransition(List<String> transitions, LiteWorkflowApp app, String okToTransition, NodeDef node)
+            throws WorkflowException {
+        // Make sure the transition node is either a join node or is not already visited
+        if (transitions.contains(okToTransition) && !(app.getNode(okToTransition) instanceof JoinNodeDef)) {
+            throw new WorkflowException(ErrorCode.E0734, node.getName(), okToTransition);
+        }
+    }
+
+    /**
      * Parse xml to {@link LiteWorkflowApp}
      *
      * @param strDef
@@ -201,11 +293,11 @@ public class LiteWorkflowAppParser {
                                                 }
                                             }
                                         }
-                                        
+
                                         String credStr = eNode.getAttributeValue(CRED_A);
                                         String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
                                         String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
-                                        
+
                                         String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
                                         def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
                                                                       transitions[0], transitions[1], credStr,
@@ -259,6 +351,14 @@ public class LiteWorkflowAppParser {
             }
         }
 
+        if(node instanceof ForkNodeDef){
+            forkList.add(node.getName());
+        }
+
+        if(node instanceof JoinNodeDef){
+            joinList.add(node.getName());
+        }
+
         if (node instanceof EndNodeDef) {
             traversed.put(node.getName(), VisitStatus.VISITED);
             return;

Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Feb  8 20:41:31 2012
@@ -1522,4 +1522,13 @@
 		</description>
 	</property>
 
+	<!-- ForkJoin validation -->
+	<property>
+		<name>oozie.validate.ForkJoin</name>
+		<value>true</value>
+		<description>
+			If true, fork and join should be validated at wf submission time.
+		</description>
+	</property>
+
 </configuration>

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java Wed Feb  8 20:41:31 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -244,21 +244,21 @@ public class TestLiteWorkflowAppService 
             assertEquals("a", app.getNode("::start::").getTransitions().get(0));
             assertEquals("b", app.getNode("a").getTransitions().get(0));
             assertEquals("c", app.getNode("a").getTransitions().get(1));
-            assertEquals("d", app.getNode("a").getTransitions().get(2));
+            assertEquals("c", app.getNode("a").getTransitions().get(2));
             assertTrue(app.getNode("b").getConf().contains("kill"));
             assertEquals("d", app.getNode("c").getTransitions().get(0));
             assertEquals("e", app.getNode("c").getTransitions().get(1));
             assertEquals(2, app.getNode("c").getTransitions().size());
 
-            assertEquals("e", app.getNode("d").getTransitions().get(0));
+            assertEquals("f", app.getNode("d").getTransitions().get(0));
             assertEquals("b", app.getNode("d").getTransitions().get(1));
             assertTrue(app.getNode("d").getConf().startsWith("<map-reduce"));
 
-            assertEquals("z", app.getNode("e").getTransitions().get(0));
+            assertEquals("f", app.getNode("e").getTransitions().get(0));
             assertEquals("b", app.getNode("e").getTransitions().get(1));
             assertTrue(app.getNode("e").getConf().startsWith("<pig"));
 
-            assertEquals("g", app.getNode("f").getTransitions().get(0));
+            assertEquals("z", app.getNode("f").getTransitions().get(0));
 
             assertNotNull(app.getNode("z"));
         }
@@ -357,7 +357,7 @@ public class TestLiteWorkflowAppService 
             services.destroy();
         }
     }
-    
+
     public void testCreateprotoConfWithMulipleLibPath() throws Exception {
         Services services = new Services();
         try {
@@ -376,7 +376,7 @@ public class TestLiteWorkflowAppService 
             createTestCaseSubDir("libx");
             writer = new FileWriter(getTestCaseDir() + "/libx/maputil_x.jar");
             writer.write("bla bla");
-            writer.close();            
+            writer.close();
             createTestCaseSubDir("liby");
             writer = new FileWriter(getTestCaseDir() + "/liby/maputil_y1.jar");
             writer.write("bla bla");

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java Wed Feb  8 20:41:31 2012
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *      http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,21 +17,34 @@
  */
 package org.apache.oozie.workflow.lite;
 
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.workflow.WorkflowException;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestDecisionNodeHandler;
 import org.apache.oozie.test.XTestCase;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.ErrorCode;
 
 public class TestLiteWorkflowAppParser extends XTestCase {
+    public static String dummyConf = "<java></java>";
 
+    @Override
     protected void setUp() throws Exception {
         super.setUp();
         new Services().init();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         Services.get().destroy();
         super.tearDown();
@@ -89,4 +102,297 @@ public class TestLiteWorkflowAppParser e
         }
     }
 
+    /*
+     * 1->ok->2
+     * 2->ok->end
+     */
+   public void testWfNoForkJoin() throws WorkflowException  {
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+            .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "two", "three"))
+            .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "end", "end"))
+            .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "end", "end"))
+            .addNode(new EndNodeDef( "end"));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    /*
+    f->(2,3)
+    (2,3)->j
+    */
+    public void testSimpleForkJoin() throws WorkflowException {
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("wf", "<worklfow-app/>", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f", "end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+        .addNode(new ActionNodeDef("two", dummyConf,  TestActionNodeHandler.class, "j", "end"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new JoinNodeDef("j", "four"))
+        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "end", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    /*
+     f->(2,3)
+     2->f2
+     3->j
+     f2->(4,5,6)
+     (4,5,6)->j2
+     j2->7
+     7->j
+    */
+    public void testNestedForkJoin() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "f2","end"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j","end"))
+        .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"four", "five", "six"})))
+        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j2","end"))
+        .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2","end"))
+        .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class,"j2", "end"))
+        .addNode(new JoinNodeDef("j2", "seven"))
+        .addNode(new ActionNodeDef("seven", dummyConf, TestActionNodeHandler.class, "j","end"))
+        .addNode(new JoinNodeDef("j", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    /*
+      f->(2,3)
+      2->j
+      3->end
+    */
+    public void testForkJoinFailure() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j","end"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "end","end"))
+        .addNode(new JoinNodeDef("j", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail();
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0730, we.getErrorCode());
+        }
+    }
+
+    /*
+     f->(2,3,4)
+     2->j
+     3->j
+     4->f2
+     f2->(5,6)
+     5-j2
+     6-j2
+     j-j2
+     j2-end
+    */
+    public void testNestedForkJoinFailure() throws WorkflowException {
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+            .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+            .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"four", "three", "two"})))
+            .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j","end"))
+            .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j","end"))
+            .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "f2","end"))
+            .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"five", "six"})))
+            .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2","end"))
+            .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class, "j2","end"))
+            .addNode(new JoinNodeDef("j", "j2"))
+            .addNode(new JoinNodeDef("j2", "end"))
+            .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail();
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0730, we.getErrorCode());
+        }
+    }
+
+    /*
+     f->(2,3)
+     2->ok->3
+     2->fail->j
+     3->ok->j
+     3->fail->end
+    */
+    public void testTransitionFailure() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "three", "j"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new JoinNodeDef("j", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail();
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0734, we.getErrorCode());
+            // Make sure the message contains the nodes involved in the invalid transition
+            assertTrue(we.getMessage().contains("two"));
+            assertTrue(we.getMessage().contains("three"));
+        }
+
+    }
+
+    /*
+    f->(2,3)
+    2->decision node->{4,5,4}
+    4->j
+    5->j
+    3->j
+    */
+    public void testDecisionForkJoin() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"four","five","four"})))
+        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new JoinNodeDef("j", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    /*
+     *f->(2,3)
+     *2->decision node->{3,4}
+     *3->end
+     *4->end
+     */
+    public void testDecisionForkJoinFailure() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+        .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+        .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+        .addNode(new DecisionNodeDef("two", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"four","three"})))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "end"))
+        .addNode(new JoinNodeDef("j", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+            fail();
+        } catch (Exception ex) {
+            WorkflowException we = (WorkflowException) ex.getCause();
+            assertEquals(ErrorCode.E0734, we.getErrorCode());
+            // Make sure the message contains the nodes involved in the invalid transition
+            assertTrue(we.getMessage().contains("two"));
+            assertTrue(we.getMessage().contains("three"));
+        }
+    }
+
+    /*
+     * 1->decision node->{f1, f2}
+     * f1->(2,3)
+     * f2->(4,5)
+     * (2,3)->j1
+     * (4,5)->j2
+     * j1->end
+     * j2->end
+     */
+    public void testDecisionMultipleForks() throws WorkflowException{
+        LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+                LiteWorkflowStoreService.LiteDecisionHandler.class,
+                LiteWorkflowStoreService.LiteActionHandler.class);
+        LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+        .addNode(new DecisionNodeDef("one", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"f1","f2"})))
+        .addNode(new ForkNodeDef("f1", Arrays.asList(new String[]{"two","three"})))
+        .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"four","five"})))
+        .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j1", "end"))
+        .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j1", "end"))
+        .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j2", "end"))
+        .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2", "end"))
+        .addNode(new JoinNodeDef("j1", "end"))
+        .addNode(new JoinNodeDef("j2", "end"))
+        .addNode(new EndNodeDef("end"));
+
+        try {
+            invokeForkJoin(parser, def);
+        } catch (Exception e) {
+            e.printStackTrace();
+            fail();
+        }
+    }
+
+    // Invoke private validateForkJoin method using Reflection API
+    private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp def) throws Exception {
+        Class<? extends LiteWorkflowAppParser> c = parser.getClass();
+        Class d = Class.forName("org.apache.oozie.workflow.lite.LiteWorkflowAppParser$VisitStatus");
+        Field f = d.getField("VISITING");
+        Map traversed = new HashMap();
+        traversed.put(def.getNode(StartNodeDef.START).getName(), f);
+        Method validate = c.getDeclaredMethod("validate", LiteWorkflowApp.class, NodeDef.class, Map.class);
+        validate.setAccessible(true);
+        // invoke validate method to populate the fork and join list
+        validate.invoke(parser, def, def.getNode(StartNodeDef.START), traversed);
+        Method validateForkJoin = c.getDeclaredMethod("validateForkJoin", LiteWorkflowApp.class);
+        validateForkJoin.setAccessible(true);
+        // invoke validateForkJoin
+        validateForkJoin.invoke(parser, def);
+    }
+
 }

Modified: incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml (original)
+++ incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml Wed Feb  8 20:41:31 2012
@@ -21,7 +21,7 @@
         <switch>
             <case to="b">true</case>
             <case to="c">false</case>
-            <default to="d"/>
+            <default to="c"/>
         </switch>
     </decision>
     <kill name="b">
@@ -59,7 +59,7 @@
             <file>/tmp</file>
             <archive>/tmp</archive>
         </map-reduce>
-        <ok to="e"/>
+        <ok to="f"/>
         <error to="b"/>
     </action>
 
@@ -86,11 +86,11 @@
             <file>/tmp</file>
             <file>/tmp</file>
         </pig>
-        <ok to="z"/>
+        <ok to="f"/>
         <error to="b"/>
     </action>
 
-    <join name="f" to="g"/>
+    <join name="f" to="z"/>
 
     <end name="z"/>
 </workflow-app>

Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Feb  8 20:41:31 2012
@@ -1,18 +1,19 @@
 -- Oozie 3.2.0 release
 
+OOZIE-636 Validate fork-join (virag via tucu)
 OOZIE-685 Update License file with 3rd party license information. (Mohammad)
 OOZIE-678 Update NOTICE.txt to reflect the workcount binaries into oozie src(Mohammad)
 OOZIE-667 Change the way Oozie brings in Hadoop JARs into the build (tucu)
-OOZIE-673: Offset and len option not working as expected.(Virag via Mohammad)
+OOZIE-673 Offset and len option not working as expected.(Virag via Mohammad)
 OOZIE-668 Adding license header into minitest/pom.xml.(Mohammad)
 OOZIE-665 Shell action doesn't capture multiple key-value pairs.(Mohammad)
 OOZIE-666 Oozie's Tomcat admin port is hardcoded. (tucu)
 OOZIE-662 Unit test failing: TestHostnameFilter. (tucu)
-OOZIE-651: Coordinator rerun fails due to NPE in some cases.(Virag via Mohammad)
-OOZIE-655: Information added to Oozie help.(Virag via Mohammad)
-OOZIE-642: Year support in dateOffset() El function.(Virag via Mohammad)
+OOZIE-651 Coordinator rerun fails due to NPE in some cases.(Virag via Mohammad)
+OOZIE-655 Information added to Oozie help.(Virag via Mohammad)
+OOZIE-642 Year support in dateOffset() El function.(Virag via Mohammad)
 OOZIE-652 Add proxyuser capabilities to Oozie HTTP API. (tucu)
-OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
+OOZIE-591 Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
 OOZIE-639 Hive sharelib POM must exclude hadoop-core. (tucu)
 OOZIE-635 ShellMain closes the STD/ERR stream while shell is processing. (tucu)
 OOZIE-629 Oozie server to prevent usage of dataset initial-instance earlier than the system-defined default value.(Mona via Mohammad)
@@ -27,10 +28,10 @@ OOZIE-578 Support shell action in Oozie 
 OOZIE-620 POMs clean/tune up before 3.2 release. (tucu)
 OOZIE-613 Update documentation for execution order.(Mohammad)
 OOZIE-589 Make the command requeue interval configurable.(Mohammad)
-OOZIE-156. Add support for a SQOOP action. (tucu)
-OOZIE-77. Oozie should support Kerberos authentication on its HTTP REST API. (tucu)
-OOZIE-622. Remove system sharelib tests from TestLiteWorkflowAppService. (tucu)
-OOZIE-68 Add Hive action. (tucu)
+OOZIE-156 Add support for a SQOOP action. (tucu)
+OOZIE-77  Oozie should support Kerberos authentication on its HTTP REST API. (tucu)
+OOZIE-622 Remove system sharelib tests from TestLiteWorkflowAppService. (tucu)
+OOZIE-68  Add Hive action. (tucu)
 OOZIE-608 Fix test failure for testCoordChangeXCommand, testCoordChangeEndTime Unit
 OOZIE-610 Oozie system share lib should have jars per action type. (tucu)
 OOZIE-565 Make Oozie compile against Hadoop 0.23. (tucu)