You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/02/08 21:41:32 UTC
svn commit: r1242082 - in /incubator/oozie/trunk: ./
core/src/main/java/org/apache/oozie/
core/src/main/java/org/apache/oozie/workflow/lite/ core/src/main/resources/
core/src/test/java/org/apache/oozie/service/
core/src/test/java/org/apache/oozie/workf...
Author: tucu
Date: Wed Feb 8 20:41:31 2012
New Revision: 1242082
URL: http://svn.apache.org/viewvc?rev=1242082&view=rev
Log:
OOZIE-636 Validate fork-join (virag via tucu)
Modified:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/ErrorCode.java Wed Feb 8 20:41:31 2012
@@ -133,6 +133,11 @@ public enum ErrorCode {
E0727(XLog.STD, "Workflow Job can not be suspended as its not in running state, {0}, Status: {1}"),
E0728(XLog.STD, "Coordinator Job can not be suspended as job finished or failed or killed, id : {0}, status : {1}"),
E0729(XLog.OPS, "Kill node message [{0}]"),
+ E0730(XLog.STD, "Fork/Join not in pair"),
+ E0731(XLog.STD, "Fork node [{0}] cannot have less than two paths"),
+ E0732(XLog.STD, "Fork [{0}]/Join [{1}] not in pair"),
+ E0733(XLog.STD, "Fork [{0}] without a join"),
+ E0734(XLog.STD, "Invalid transition from node [{0}] to node [{1}] while using fork/join"),
E0800(XLog.STD, "Action it is not running its in [{1}] state, action [{0}]"),
E0801(XLog.STD, "Workflow already running, workflow [{0}]"),
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java Wed Feb 8 20:41:31 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -38,8 +38,11 @@ import java.io.StringReader;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
/**
* Class to parse and validate workflow xml
@@ -74,6 +77,7 @@ public class LiteWorkflowAppParser {
private static final String DECISION_DEFAULT_E = "default";
private static final String KILL_MESSAGE_E = "message";
+ public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
private Schema schema;
private Class<? extends DecisionNodeHandler> decisionHandlerClass;
@@ -83,8 +87,8 @@ public class LiteWorkflowAppParser {
VISITING, VISITED
}
- ;
-
+ private List<String> forkList = new ArrayList<String>();
+ private List<String> joinList = new ArrayList<String>();
public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
@@ -116,6 +120,10 @@ public class LiteWorkflowAppParser {
Map<String, VisitStatus> traversed = new HashMap<String, VisitStatus>();
traversed.put(app.getNode(StartNodeDef.START).getName(), VisitStatus.VISITING);
validate(app, app.getNode(StartNodeDef.START), traversed);
+ //Validate whether fork/join are in pair or not
+ if (Services.get().getConf().getBoolean(VALIDATE_FORK_JOIN, true)) {
+ validateForkJoin(app);
+ }
return app;
}
catch (JDOMException ex) {
@@ -130,6 +138,90 @@ public class LiteWorkflowAppParser {
}
/**
+ * Validate whether fork/join are in pair or not
+ * @param app LiteWorkflowApp
+ * @throws WorkflowException
+ */
+ private void validateForkJoin(LiteWorkflowApp app) throws WorkflowException {
+ // Make sure the number of forks and joins in wf are equal
+ if (forkList.size() != joinList.size()) {
+ throw new WorkflowException(ErrorCode.E0730);
+ }
+
+ while(!forkList.isEmpty()){
+ // Make sure each of the fork node has a corresponding join; start with the root fork Node first
+ validateFork(app.getNode(forkList.remove(0)), app);
+ }
+
+ }
+
+ /*
+ * Test whether the fork node has a corresponding join
+ * @param node - the fork node
+ * @param app - the WorkflowApp
+ * @return
+ * @throws WorkflowException
+ */
+ private NodeDef validateFork(NodeDef forkNode, LiteWorkflowApp app) throws WorkflowException {
+ List<String> transitions = new ArrayList<String>(forkNode.getTransitions());
+ String joinNode = null;
+ for (int i = 0; i < transitions.size(); i++) {
+ NodeDef node = app.getNode(transitions.get(i));
+ if (node instanceof DecisionNodeDef) {
+ Set<String> decisionSet = new HashSet<String>(node.getTransitions());
+ for (String ds : decisionSet) {
+ if (transitions.contains(ds)) {
+ throw new WorkflowException(ErrorCode.E0734, node.getName(), ds);
+ } else {
+ transitions.add(ds);
+ }
+ }
+ } else if (node instanceof ActionNodeDef) {
+ // Get only the "ok-to" transition of node
+ String okToTransition = node.getTransitions().get(0);
+ // Make sure the transition is valid
+ validateTransition(transitions, app, okToTransition, node);
+ transitions.add(okToTransition);
+ } else if (node instanceof ForkNodeDef) {
+ forkList.remove(node.getName());
+ // Make a recursive call to resolve this fork node
+ NodeDef joinNd = validateFork(node, app);
+ String okToTransition = joinNd.getTransitions().get(0);
+ // Make sure the transition is valid
+ validateTransition(transitions, app, okToTransition, node);
+ transitions.add(okToTransition);
+ } else if (node instanceof JoinNodeDef) {
+ // If joinNode encountered for the first time, remove it from the joinList and remember it
+ String currentJoin = node.getName();
+ if (joinList.contains(currentJoin)) {
+ joinList.remove(currentJoin);
+ joinNode = currentJoin;
+ } else {
+ // Make sure this join is the same as the join seen from the first time
+ if (joinNode == null) {
+ throw new WorkflowException(ErrorCode.E0733, forkNode);
+ }
+ if (!joinNode.equals(currentJoin)) {
+ throw new WorkflowException(ErrorCode.E0732, forkNode, joinNode);
+ }
+ }
+ } else {
+ throw new WorkflowException(ErrorCode.E0730);
+ }
+ }
+ return app.getNode(joinNode);
+
+ }
+
+ private void validateTransition(List<String> transitions, LiteWorkflowApp app, String okToTransition, NodeDef node)
+ throws WorkflowException {
+ // Make sure the transition node is either a join node or is not already visited
+ if (transitions.contains(okToTransition) && !(app.getNode(okToTransition) instanceof JoinNodeDef)) {
+ throw new WorkflowException(ErrorCode.E0734, node.getName(), okToTransition);
+ }
+ }
+
+ /**
* Parse xml to {@link LiteWorkflowApp}
*
* @param strDef
@@ -201,11 +293,11 @@ public class LiteWorkflowAppParser {
}
}
}
-
+
String credStr = eNode.getAttributeValue(CRED_A);
String userRetryMaxStr = eNode.getAttributeValue(USER_RETRY_MAX_A);
String userRetryIntervalStr = eNode.getAttributeValue(USER_RETRY_INTERVAL_A);
-
+
String actionConf = XmlUtils.prettyPrint(eActionConf).toString();
def.addNode(new ActionNodeDef(eNode.getAttributeValue(NAME_A), actionConf, actionHandlerClass,
transitions[0], transitions[1], credStr,
@@ -259,6 +351,14 @@ public class LiteWorkflowAppParser {
}
}
+ if(node instanceof ForkNodeDef){
+ forkList.add(node.getName());
+ }
+
+ if(node instanceof JoinNodeDef){
+ joinList.add(node.getName());
+ }
+
if (node instanceof EndNodeDef) {
traversed.put(node.getName(), VisitStatus.VISITED);
return;
Modified: incubator/oozie/trunk/core/src/main/resources/oozie-default.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/resources/oozie-default.xml?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/resources/oozie-default.xml (original)
+++ incubator/oozie/trunk/core/src/main/resources/oozie-default.xml Wed Feb 8 20:41:31 2012
@@ -1522,4 +1522,13 @@
</description>
</property>
+ <!-- ForkJoin validation -->
+ <property>
+ <name>oozie.validate.ForkJoin</name>
+ <value>true</value>
+ <description>
+ If true, fork and join should be validated at wf submission time.
+ </description>
+ </property>
+
</configuration>
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java Wed Feb 8 20:41:31 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -244,21 +244,21 @@ public class TestLiteWorkflowAppService
assertEquals("a", app.getNode("::start::").getTransitions().get(0));
assertEquals("b", app.getNode("a").getTransitions().get(0));
assertEquals("c", app.getNode("a").getTransitions().get(1));
- assertEquals("d", app.getNode("a").getTransitions().get(2));
+ assertEquals("c", app.getNode("a").getTransitions().get(2));
assertTrue(app.getNode("b").getConf().contains("kill"));
assertEquals("d", app.getNode("c").getTransitions().get(0));
assertEquals("e", app.getNode("c").getTransitions().get(1));
assertEquals(2, app.getNode("c").getTransitions().size());
- assertEquals("e", app.getNode("d").getTransitions().get(0));
+ assertEquals("f", app.getNode("d").getTransitions().get(0));
assertEquals("b", app.getNode("d").getTransitions().get(1));
assertTrue(app.getNode("d").getConf().startsWith("<map-reduce"));
- assertEquals("z", app.getNode("e").getTransitions().get(0));
+ assertEquals("f", app.getNode("e").getTransitions().get(0));
assertEquals("b", app.getNode("e").getTransitions().get(1));
assertTrue(app.getNode("e").getConf().startsWith("<pig"));
- assertEquals("g", app.getNode("f").getTransitions().get(0));
+ assertEquals("z", app.getNode("f").getTransitions().get(0));
assertNotNull(app.getNode("z"));
}
@@ -357,7 +357,7 @@ public class TestLiteWorkflowAppService
services.destroy();
}
}
-
+
public void testCreateprotoConfWithMulipleLibPath() throws Exception {
Services services = new Services();
try {
@@ -376,7 +376,7 @@ public class TestLiteWorkflowAppService
createTestCaseSubDir("libx");
writer = new FileWriter(getTestCaseDir() + "/libx/maputil_x.jar");
writer.write("bla bla");
- writer.close();
+ writer.close();
createTestCaseSubDir("liby");
writer = new FileWriter(getTestCaseDir() + "/liby/maputil_y1.jar");
writer.write("bla bla");
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java Wed Feb 8 20:41:31 2012
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,21 +17,34 @@
*/
package org.apache.oozie.workflow.lite;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.workflow.WorkflowException;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
+import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestDecisionNodeHandler;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.ErrorCode;
public class TestLiteWorkflowAppParser extends XTestCase {
+ public static String dummyConf = "<java></java>";
+ @Override
protected void setUp() throws Exception {
super.setUp();
new Services().init();
}
+ @Override
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
@@ -89,4 +102,297 @@ public class TestLiteWorkflowAppParser e
}
}
+ /*
+ * 1->ok->2
+ * 2->ok->end
+ */
+ public void testWfNoForkJoin() throws WorkflowException {
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "two", "three"))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "end", "end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "end", "end"))
+ .addNode(new EndNodeDef( "end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /*
+ f->(2,3)
+ (2,3)->j
+ */
+ public void testSimpleForkJoin() throws WorkflowException {
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("wf", "<worklfow-app/>", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f", "end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new JoinNodeDef("j", "four"))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "end", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /*
+ f->(2,3)
+ 2->f2
+ 3->j
+ f2->(4,5,6)
+ (4,5,6)->j2
+ j2->7
+ 7->j
+ */
+ public void testNestedForkJoin() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "f2","end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j","end"))
+ .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"four", "five", "six"})))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j2","end"))
+ .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2","end"))
+ .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class,"j2", "end"))
+ .addNode(new JoinNodeDef("j2", "seven"))
+ .addNode(new ActionNodeDef("seven", dummyConf, TestActionNodeHandler.class, "j","end"))
+ .addNode(new JoinNodeDef("j", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /*
+ f->(2,3)
+ 2->j
+ 3->end
+ */
+ public void testForkJoinFailure() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two", "three"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j","end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "end","end"))
+ .addNode(new JoinNodeDef("j", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail();
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0730, we.getErrorCode());
+ }
+ }
+
+ /*
+ f->(2,3,4)
+ 2->j
+ 3->j
+ 4->f2
+ f2->(5,6)
+ 5-j2
+ 6-j2
+ j-j2
+ j2-end
+ */
+ public void testNestedForkJoinFailure() throws WorkflowException {
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("testWf", "<worklfow-app/>", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"four", "three", "two"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j","end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j","end"))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "f2","end"))
+ .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"five", "six"})))
+ .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2","end"))
+ .addNode(new ActionNodeDef("six", dummyConf, TestActionNodeHandler.class, "j2","end"))
+ .addNode(new JoinNodeDef("j", "j2"))
+ .addNode(new JoinNodeDef("j2", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail();
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0730, we.getErrorCode());
+ }
+ }
+
+ /*
+ f->(2,3)
+ 2->ok->3
+ 2->fail->j
+ 3->ok->j
+ 3->fail->end
+ */
+ public void testTransitionFailure() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "three", "j"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new JoinNodeDef("j", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail();
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0734, we.getErrorCode());
+ // Make sure the message contains the nodes involved in the invalid transition
+ assertTrue(we.getMessage().contains("two"));
+ assertTrue(we.getMessage().contains("three"));
+ }
+
+ }
+
+ /*
+ f->(2,3)
+ 2->decision node->{4,5,4}
+ 4->j
+ 5->j
+ 3->j
+ */
+ public void testDecisionForkJoin() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+ .addNode(new DecisionNodeDef("two", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"four","five","four"})))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new JoinNodeDef("j", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ /*
+ *f->(2,3)
+ *2->decision node->{3,4}
+ *3->end
+ *4->end
+ */
+ public void testDecisionForkJoinFailure() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+ .addNode(new ActionNodeDef("one", dummyConf, TestActionNodeHandler.class, "f","end"))
+ .addNode(new ForkNodeDef("f", Arrays.asList(new String[]{"two","three"})))
+ .addNode(new DecisionNodeDef("two", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"four","three"})))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j", "end"))
+ .addNode(new JoinNodeDef("j", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ fail();
+ } catch (Exception ex) {
+ WorkflowException we = (WorkflowException) ex.getCause();
+ assertEquals(ErrorCode.E0734, we.getErrorCode());
+ // Make sure the message contains the nodes involved in the invalid transition
+ assertTrue(we.getMessage().contains("two"));
+ assertTrue(we.getMessage().contains("three"));
+ }
+ }
+
+ /*
+ * 1->decision node->{f1, f2}
+ * f1->(2,3)
+ * f2->(4,5)
+ * (2,3)->j1
+ * (4,5)->j2
+ * j1->end
+ * j2->end
+ */
+ public void testDecisionMultipleForks() throws WorkflowException{
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+ LiteWorkflowApp def = new LiteWorkflowApp("name", "def", new StartNodeDef("one"))
+ .addNode(new DecisionNodeDef("one", dummyConf, TestDecisionNodeHandler.class, Arrays.asList(new String[]{"f1","f2"})))
+ .addNode(new ForkNodeDef("f1", Arrays.asList(new String[]{"two","three"})))
+ .addNode(new ForkNodeDef("f2", Arrays.asList(new String[]{"four","five"})))
+ .addNode(new ActionNodeDef("two", dummyConf, TestActionNodeHandler.class, "j1", "end"))
+ .addNode(new ActionNodeDef("three", dummyConf, TestActionNodeHandler.class, "j1", "end"))
+ .addNode(new ActionNodeDef("four", dummyConf, TestActionNodeHandler.class, "j2", "end"))
+ .addNode(new ActionNodeDef("five", dummyConf, TestActionNodeHandler.class, "j2", "end"))
+ .addNode(new JoinNodeDef("j1", "end"))
+ .addNode(new JoinNodeDef("j2", "end"))
+ .addNode(new EndNodeDef("end"));
+
+ try {
+ invokeForkJoin(parser, def);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
+ // Invoke private validateForkJoin method using Reflection API
+ private void invokeForkJoin(LiteWorkflowAppParser parser, LiteWorkflowApp def) throws Exception {
+ Class<? extends LiteWorkflowAppParser> c = parser.getClass();
+ Class d = Class.forName("org.apache.oozie.workflow.lite.LiteWorkflowAppParser$VisitStatus");
+ Field f = d.getField("VISITING");
+ Map traversed = new HashMap();
+ traversed.put(def.getNode(StartNodeDef.START).getName(), f);
+ Method validate = c.getDeclaredMethod("validate", LiteWorkflowApp.class, NodeDef.class, Map.class);
+ validate.setAccessible(true);
+ // invoke validate method to populate the fork and join list
+ validate.invoke(parser, def, def.getNode(StartNodeDef.START), traversed);
+ Method validateForkJoin = c.getDeclaredMethod("validateForkJoin", LiteWorkflowApp.class);
+ validateForkJoin.setAccessible(true);
+ // invoke validateForkJoin
+ validateForkJoin.invoke(parser, def);
+ }
+
}
Modified: incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml (original)
+++ incubator/oozie/trunk/core/src/test/resources/wf-schema-valid.xml Wed Feb 8 20:41:31 2012
@@ -21,7 +21,7 @@
<switch>
<case to="b">true</case>
<case to="c">false</case>
- <default to="d"/>
+ <default to="c"/>
</switch>
</decision>
<kill name="b">
@@ -59,7 +59,7 @@
<file>/tmp</file>
<archive>/tmp</archive>
</map-reduce>
- <ok to="e"/>
+ <ok to="f"/>
<error to="b"/>
</action>
@@ -86,11 +86,11 @@
<file>/tmp</file>
<file>/tmp</file>
</pig>
- <ok to="z"/>
+ <ok to="f"/>
<error to="b"/>
</action>
- <join name="f" to="g"/>
+ <join name="f" to="z"/>
<end name="z"/>
</workflow-app>
Modified: incubator/oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/release-log.txt?rev=1242082&r1=1242081&r2=1242082&view=diff
==============================================================================
--- incubator/oozie/trunk/release-log.txt (original)
+++ incubator/oozie/trunk/release-log.txt Wed Feb 8 20:41:31 2012
@@ -1,18 +1,19 @@
-- Oozie 3.2.0 release
+OOZIE-636 Validate fork-join (virag via tucu)
OOZIE-685 Update License file with 3rd party license information. (Mohammad)
OOZIE-678 Update NOTICE.txt to reflect the workcount binaries into oozie src(Mohammad)
OOZIE-667 Change the way Oozie brings in Hadoop JARs into the build (tucu)
-OOZIE-673: Offset and len option not working as expected.(Virag via Mohammad)
+OOZIE-673 Offset and len option not working as expected.(Virag via Mohammad)
OOZIE-668 Adding license header into minitest/pom.xml.(Mohammad)
OOZIE-665 Shell action doesn't capture multiple key-value pairs.(Mohammad)
OOZIE-666 Oozie's Tomcat admin port is hardcoded. (tucu)
OOZIE-662 Unit test failing: TestHostnameFilter. (tucu)
-OOZIE-651: Coordinator rerun fails due to NPE in some cases.(Virag via Mohammad)
-OOZIE-655: Information added to Oozie help.(Virag via Mohammad)
-OOZIE-642: Year support in dateOffset() El function.(Virag via Mohammad)
+OOZIE-651 Coordinator rerun fails due to NPE in some cases.(Virag via Mohammad)
+OOZIE-655 Information added to Oozie help.(Virag via Mohammad)
+OOZIE-642 Year support in dateOffset() El function.(Virag via Mohammad)
OOZIE-652 Add proxyuser capabilities to Oozie HTTP API. (tucu)
-OOZIE-591: Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
+OOZIE-591 Oozie continues to materialize new actions after end date modification (Mohamed Battisha vis Angelo)
OOZIE-639 Hive sharelib POM must exclude hadoop-core. (tucu)
OOZIE-635 ShellMain closes the STD/ERR stream while shell is processing. (tucu)
OOZIE-629 Oozie server to prevent usage of dataset initial-instance earlier than the system-defined default value.(Mona via Mohammad)
@@ -27,10 +28,10 @@ OOZIE-578 Support shell action in Oozie
OOZIE-620 POMs clean/tune up before 3.2 release. (tucu)
OOZIE-613 Update documentation for execution order.(Mohammad)
OOZIE-589 Make the command requeue interval configurable.(Mohammad)
-OOZIE-156. Add support for a SQOOP action. (tucu)
-OOZIE-77. Oozie should support Kerberos authentication on its HTTP REST API. (tucu)
-OOZIE-622. Remove system sharelib tests from TestLiteWorkflowAppService. (tucu)
-OOZIE-68 Add Hive action. (tucu)
+OOZIE-156 Add support for a SQOOP action. (tucu)
+OOZIE-77 Oozie should support Kerberos authentication on its HTTP REST API. (tucu)
+OOZIE-622 Remove system sharelib tests from TestLiteWorkflowAppService. (tucu)
+OOZIE-68 Add Hive action. (tucu)
OOZIE-608 Fix test failure for testCoordChangeXCommand, testCoordChangeEndTime Unit
OOZIE-610 Oozie system share lib should have jars per action type. (tucu)
OOZIE-565 Make Oozie compile against Hadoop 0.23. (tucu)