You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/08/03 06:59:55 UTC

svn commit: r1368795 [1/2] - in /incubator/oozie/trunk: ./ core/ core/src/main/java/org/apache/oozie/action/control/ core/src/main/java/org/apache/oozie/action/hadoop/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org/apache/oozie/...

Author: tucu
Date: Fri Aug  3 04:59:53 2012
New Revision: 1368795

URL: http://svn.apache.org/viewvc?rev=1368795&view=rev
Log:
OOZIE-243 Workflow nodes START/END/KILL/FORK/JOIN should create rows in the action DB table (tucu)

Added:
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
Modified:
    incubator/oozie/trunk/core/pom.xml
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
    incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestPurgeXCommand.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowIdGetForExternalIdJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestActionCheckerService.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/store/TestDBWorkflowStore.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
    incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
    incubator/oozie/trunk/release-log.txt

Modified: incubator/oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/pom.xml?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/pom.xml (original)
+++ incubator/oozie/trunk/core/pom.xml Fri Aug  3 04:59:53 2012
@@ -39,7 +39,7 @@
         <dependency>
             <groupId>org.apache.oozie</groupId>
             <artifactId>oozie-hadoop-test</artifactId>
-            <scope>test</scope>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.hadoop.FsActionExecutor;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.workflow.lite.ControlNodeHandler;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.Namespace;
+
+import java.util.List;
+
+/**
+ * Base action executor for control nodes: START/END/KILL/FORK/JOIN
+ * <p/>
+ * This action executor, similar to {@link FsActionExecutor}, is completed during the
+ * {@link #start(Context, WorkflowAction)}.
+ * <p/>
+ * By hooking control nodes to an action executor, control nodes get WF action entries in the DB.
+ */
+public abstract class ControlNodeActionExecutor extends ActionExecutor {
+
+    public ControlNodeActionExecutor(String type) {
+        super(type);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
+        context.setStartData("-", "-", "-");
+        context.setExecutionData("OK", null);
+    }
+
+    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
+        context.setEndData(WorkflowAction.Status.OK, getActionSignal(WorkflowAction.Status.OK));
+    }
+
+    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
+        throw new UnsupportedOperationException();
+    }
+
+    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean isCompleted(String externalStatus) {
+        return true;
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for END control node.
+ */
+public class EndActionExecutor extends ControlNodeActionExecutor {
+    public static final String TYPE = ":END:";
+
+    public EndActionExecutor() {
+        super(TYPE);
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for FORK control node.
+ */
+public class ForkActionExecutor extends ControlNodeActionExecutor {
+    public static final String TYPE = ":FORK:";
+
+    public ForkActionExecutor() {
+        super(TYPE);
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for JOIN control node.
+ */
+public class JoinActionExecutor extends ControlNodeActionExecutor {
+    public static final String TYPE = ":JOIN:";
+
+    public JoinActionExecutor() {
+        super(TYPE);
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for KILL control node.
+ */
+public class KillActionExecutor extends ControlNodeActionExecutor {
+    public static final String TYPE = ":KILL:";
+
+    public KillActionExecutor() {
+        super(TYPE);
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for START control node.
+ */
+public class StartActionExecutor extends ControlNodeActionExecutor {
+    public static final String TYPE = ":START:";
+
+    public StartActionExecutor() {
+        super(TYPE);
+    }
+
+}

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -156,7 +156,7 @@ public class JavaActionExecutor extends 
                     ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
             registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
                     ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
-            registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
+            registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "  JA006");
             registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
             registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
             registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Fri Aug  3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -126,8 +127,15 @@ public class ActionEndXCommand extends A
         LOG.debug("STARTED ActionEndXCommand for action " + actionId);
 
         Configuration conf = wfJob.getWorkflowInstance().getConf();
-        int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
-        long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+
+        int maxRetries = 0;
+        long retryInterval = 0;
+
+        if (!(executor instanceof ControlNodeActionExecutor)) {
+            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
+            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+        }
+
         executor.setMaxRetries(maxRetries);
         executor.setRetryInterval(retryInterval);
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Fri Aug  3 04:59:53 2012
@@ -28,6 +28,7 @@ import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -135,8 +136,14 @@ public class ActionStartXCommand extends
         LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
         Configuration conf = wfJob.getWorkflowInstance().getConf();
 
-        int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
-        long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+        int maxRetries = 0;
+        long retryInterval = 0;
+
+        if (!(executor instanceof ControlNodeActionExecutor)) {
+            maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
+            retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+        }
+
         executor.setMaxRetries(maxRetries);
         executor.setRetryInterval(retryInterval);
 
@@ -153,11 +160,13 @@ public class ActionStartXCommand extends
             }
             context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
             try {
-                String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
-                String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
-                wfAction.setConf(actionConf);
-                LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
-                        .getType(), actionConf);
+                if (!(executor instanceof ControlNodeActionExecutor)) {
+                    String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
+                    String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
+                    wfAction.setConf(actionConf);
+                    LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
+                            .getType(), actionConf);
+                }
             }
             catch (ELEvaluationException ex) {
                 throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java Fri Aug  3 04:59:53 2012
@@ -19,6 +19,11 @@ package org.apache.oozie.service;
 
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
@@ -45,8 +50,12 @@ public class ActionService implements Se
         ActionExecutor.resetInitInfo();
         ActionExecutor.disableInit();
         executors = new HashMap<String, Class<? extends ActionExecutor>>();
-        Class<? extends ActionExecutor>[] classes =
-                (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_CLASSES);
+
+        Class<? extends ActionExecutor>[] classes = new Class[] { StartActionExecutor.class,
+            EndActionExecutor.class, KillActionExecutor.class,  ForkActionExecutor.class, JoinActionExecutor.class };
+        registerExecutors(classes);
+
+        classes = (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_CLASSES);
         registerExecutors(classes);
 
         classes = (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_EXT_CLASSES);

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java Fri Aug  3 04:59:53 2012
@@ -134,7 +134,8 @@ public class DBLiteWorkflowStoreService 
 
     private WorkflowLib getWorkflowLib(Connection conn) {
         javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
-        return new DBLiteWorkflowLib(schema, LiteDecisionHandler.class, LiteActionHandler.class, conn);
+        return new DBLiteWorkflowLib(schema, LiteControlNodeHandler.class,
+                                     LiteDecisionHandler.class, LiteActionHandler.class, conn);
     }
 
     @Override

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java Fri Aug  3 04:59:53 2012
@@ -19,6 +19,11 @@ package org.apache.oozie.service;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
 import org.apache.oozie.command.wf.ReRunXCommand;
 
 import org.apache.oozie.client.WorkflowAction;
@@ -28,10 +33,17 @@ import org.apache.oozie.ErrorCode;
 import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.workflow.WorkflowInstance;
 import org.apache.oozie.workflow.lite.ActionNodeHandler;
+import org.apache.oozie.workflow.lite.ControlNodeHandler;
 import org.apache.oozie.workflow.lite.DecisionNodeHandler;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.ForkNodeDef;
+import org.apache.oozie.workflow.lite.JoinNodeDef;
+import org.apache.oozie.workflow.lite.KillNodeDef;
+import org.apache.oozie.workflow.lite.NodeDef;
 import org.apache.oozie.workflow.lite.NodeHandler;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.workflow.lite.StartNodeDef;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 
@@ -59,10 +71,11 @@ public abstract class LiteWorkflowStoreS
      * necessary information to create ActionExecutors.
      *
      * @param context NodeHandler context.
+     * @param actionType the action type.
      * @throws WorkflowException thrown if there was an error parsing the action configuration.
      */
     @SuppressWarnings("unchecked")
-    protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
+    protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
         XLog log = XLog.getLog(LiteWorkflowStoreService.class);
         String jobId = context.getProcessInstance().getId();
         String nodeName = context.getNodeDef().getName();
@@ -79,16 +92,16 @@ public abstract class LiteWorkflowStoreS
             String nodeConf = context.getNodeDef().getConf();
             String executionPath = context.getExecutionPath();
 
-            String actionType;
-            try {
-                Element element = XmlUtils.parseXml(nodeConf);
-                actionType = element.getName();
-                nodeConf = XmlUtils.prettyPrint(element).toString();
-            }
-            catch (JDOMException ex) {
-                throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
+            if (actionType == null) {
+                try {
+                    Element element = XmlUtils.parseXml(nodeConf);
+                    actionType = element.getName();
+                    nodeConf = XmlUtils.prettyPrint(element).toString();
+                }
+                catch (JDOMException ex) {
+                    throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
+                }
             }
-
             log.debug(" Creating action for node [{0}]", nodeName);
             action.setType(actionType);
             action.setExecutionPath(executionPath);
@@ -236,7 +249,7 @@ public abstract class LiteWorkflowStoreS
 
         @Override
         public void start(Context context) throws WorkflowException {
-            liteExecute(context);
+            liteExecute(context, null);
         }
 
         @Override
@@ -259,7 +272,7 @@ public abstract class LiteWorkflowStoreS
 
         @Override
         public void start(Context context) throws WorkflowException {
-            liteExecute(context);
+            liteExecute(context, null);
         }
 
         @Override
@@ -276,4 +289,34 @@ public abstract class LiteWorkflowStoreS
             liteFail(context);
         }
     }
+
+    // wires workflow lib control nodes with Oozie Dag
+    public static class LiteControlNodeHandler extends ControlNodeHandler {
+
+      @Override
+      public void touch(Context context) throws WorkflowException {
+          Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+          String nodeType;
+          if (nodeClass.equals(StartNodeDef.class)) {
+            nodeType = StartActionExecutor.TYPE;
+          }
+          else if (nodeClass.equals(EndNodeDef.class)) {
+              nodeType = EndActionExecutor.TYPE;
+          }
+          else if (nodeClass.equals(KillNodeDef.class)) {
+              nodeType = KillActionExecutor.TYPE;
+          }
+          else if (nodeClass.equals(ForkNodeDef.class)) {
+              nodeType = ForkActionExecutor.TYPE;
+          }
+          else if (nodeClass.equals(JoinNodeDef.class)) {
+              nodeType = JoinActionExecutor.TYPE;
+          } else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+          }
+
+          liteExecute(context, nodeType);
+      }
+
+    }
 }

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.workflow.lite;
+
+import java.util.List;
+
+/**
+ * Node definition for control nodes: START/END/KILL/FORK/JOIN.
+ */
+public abstract class ControlNodeDef extends NodeDef {
+
+    ControlNodeDef() {
+    }
+
+    @SuppressWarnings("unchecked")
+    public ControlNodeDef(String name, String conf, Class<? extends ControlNodeHandler> controlHandlerClass,
+                          List <String> transitions) {
+        super(name, conf, controlHandlerClass, transitions);
+    }
+
+}

Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java Fri Aug  3 04:59:53 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.workflow.lite;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.workflow.WorkflowException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN.
+ */
+public abstract class ControlNodeHandler extends NodeHandler {
+
+    public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+
+    /**
+     * Called by {@link #enter(Context)} when returning TRUE.
+     *
+     * @param context workflow context
+     * @throws WorkflowException thrown if an error occurred.
+     */
+    public abstract void touch(Context context) throws WorkflowException;
+
+    @Override
+    public boolean enter(Context context) throws WorkflowException {
+        boolean doTouch;
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+            if (!context.getSignalValue().equals(StartNodeDef.START)) {
+                throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
+            }
+            doTouch = true;
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+            doTouch = true;
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+            doTouch = true;
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+            doTouch = true;
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+            if (forkCount == null) {
+                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
+            }
+            int count = Integer.parseInt(forkCount) - 1;
+            if (count > 0) {
+                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
+                context.deleteExecutionPath();
+            }
+            else {
+                context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
+            }
+            doTouch = (count == 0);
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+        if (doTouch) {
+            touch(context);
+        }
+        return false;
+    }
+
+    @Override
+    public String exit(Context context) throws WorkflowException {
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+            return context.getNodeDef().getTransitions().get(0);
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+            context.completeJob();
+            return null;
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+            context.killJob();
+            return null;
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+            throw new UnsupportedOperationException();
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            throw new UnsupportedOperationException();
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+    }
+
+    @Override
+    public void loopDetection(Context context)
+        throws WorkflowException {
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            String flag = getLoopFlag(context.getNodeDef().getName());
+            if (context.getVar(flag) != null) {
+                throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
+            }
+            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+            String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+            if (forkCount == null) {
+                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
+            }
+            int count = Integer.parseInt(forkCount) - 1;
+            if (count == 0) {
+                context.setVar(flag, "true");
+            }
+
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+    }
+
+    @Override
+    public List<String> multiExit(Context context)
+        throws WorkflowException {
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+            return super.multiExit(context);
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+            return super.multiExit(context);
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+            return super.multiExit(context);
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+            List<String> transitions = context.getNodeDef().getTransitions();
+            context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
+
+            List<String> fullTransitions = new ArrayList<String>(transitions.size());
+
+            for (String transition : transitions) {
+                String childExecutionPath = context.createExecutionPath(transition);
+                String fullTransition = context.createFullTransition(childExecutionPath, transition);
+                fullTransitions.add(fullTransition);
+            }
+            return fullTransitions;
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+            // NOW we delete..
+            context.deleteExecutionPath();
+
+            String transition = context.getNodeDef().getTransitions().get(0);
+            String fullTransition = context.createFullTransition(parentExecutionPath, transition);
+            List<String> transitions = new ArrayList<String>(1);
+            transitions.add(fullTransition);
+            return transitions;
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+    }
+
+    @Override
+    public void kill(Context context) {
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            //NOP
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+    }
+
+    @Override
+    public void fail(Context context) {
+        Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+        if (nodeClass.equals(StartNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(EndNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(KillNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(ForkNodeDef.class)) {
+            //NOP
+        }
+        else if (nodeClass.equals(JoinNodeDef.class)) {
+            //NOP
+        }
+        else {
+            throw new IllegalStateException("Invalid node type: " + nodeClass);
+        }
+    }
+}

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java Fri Aug  3 04:59:53 2012
@@ -35,9 +35,11 @@ import org.apache.oozie.ErrorCode;
 public class DBLiteWorkflowLib extends LiteWorkflowLib {
     private final Connection connection;
 
-    public DBLiteWorkflowLib(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+    public DBLiteWorkflowLib(Schema schema,
+                             Class<? extends ControlNodeHandler> controlNodeHandler,
+                             Class<? extends DecisionNodeHandler> decisionHandlerClass,
                              Class<? extends ActionNodeHandler> actionHandlerClass, Connection connection) {
-        super(schema, decisionHandlerClass, actionHandlerClass);
+        super(schema, controlNodeHandler, decisionHandlerClass, actionHandlerClass);
         this.connection = connection;
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java Fri Aug  3 04:59:53 2012
@@ -19,34 +19,17 @@ package org.apache.oozie.workflow.lite;
 
 import java.util.Collections;
 
-//TODO javadoc
-public class EndNodeDef extends NodeDef {
+/**
+ * Node definition for END control node.
+ */
+public class EndNodeDef extends ControlNodeDef {
 
     EndNodeDef() {
     }
 
     @SuppressWarnings("unchecked")
-    public EndNodeDef(String name) {
-        super(name, null, EndNodeHandler.class, Collections.EMPTY_LIST);
-    }
-
-    public static class EndNodeHandler extends NodeHandler {
-
-        public boolean enter(Context context) {
-            return true;
-        }
-
-        public String exit(Context context) {
-            context.completeJob();
-            return null;
-        }
-
-        public void kill(Context context) {
-        }
-
-        public void fail(Context context) {
-        }
-
+    public EndNodeDef(String name, Class<? extends ControlNodeHandler> klass) {
+        super(name, "", klass, Collections.EMPTY_LIST);
     }
 
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java Fri Aug  3 04:59:53 2012
@@ -17,52 +17,19 @@
  */
 package org.apache.oozie.workflow.lite;
 
-import java.util.ArrayList;
 import java.util.List;
 
-//TODO javadoc
-public class ForkNodeDef extends NodeDef {
-
-    public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+/**
+ * Node definition for FORK control node.
+ */
+public class ForkNodeDef extends ControlNodeDef {
 
     ForkNodeDef() {
     }
 
-    public ForkNodeDef(String name, List<String> transitions) {
-        super(name, null, ForkNodeHandler.class, transitions);
-    }
-
-    public static class ForkNodeHandler extends NodeHandler {
-
-        public boolean enter(Context context) {
-            return true;
-        }
-
-        // the return list contains (parentExecutionPath/transition#transition)+
-        public List<String> multiExit(Context context) {
-            List<String> transitions = context.getNodeDef().getTransitions();
-            context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
-
-            List<String> fullTransitions = new ArrayList<String>(transitions.size());
-
-            for (String transition : transitions) {
-                String childExecutionPath = context.createExecutionPath(transition);
-                String fullTransition = context.createFullTransition(childExecutionPath, transition);
-                fullTransitions.add(fullTransition);
-            }
-            return fullTransitions;
-        }
-
-        public String exit(Context context) {
-            throw new UnsupportedOperationException();
-        }
-
-        public void kill(Context context) {
-        }
-
-        public void fail(Context context) {
-        }
-
+    public ForkNodeDef(String name, Class<? extends ControlNodeHandler> klass,
+                       List<String> transitions) {
+        super(name, "", klass, transitions);
     }
 
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java Fri Aug  3 04:59:53 2012
@@ -17,79 +17,18 @@
  */
 package org.apache.oozie.workflow.lite;
 
-import org.apache.oozie.workflow.WorkflowException;
-import org.apache.oozie.ErrorCode;
-
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
-//TODO javadoc
-public class JoinNodeDef extends NodeDef {
+/**
+ * Node definition for JOIN control node.
+ */
+public class JoinNodeDef extends ControlNodeDef {
 
     JoinNodeDef() {
     }
 
-    public JoinNodeDef(String name, String transition) {
-        super(name, null, JoinNodeHandler.class, Arrays.asList(transition));
-    }
-
-    public static class JoinNodeHandler extends NodeHandler {
-
-        public void loopDetection(Context context) throws WorkflowException {
-            String flag = getLoopFlag(context.getNodeDef().getName());
-            if (context.getVar(flag) != null) {
-                throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
-            }
-            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
-            String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
-            if (forkCount == null) {
-                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
-            }
-            int count = Integer.parseInt(forkCount) - 1;
-            if (count == 0) {
-                context.setVar(flag, "true");
-            }
-        }
-
-        public boolean enter(Context context) throws WorkflowException {
-            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
-            String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
-            if (forkCount == null) {
-                throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
-            }
-            int count = Integer.parseInt(forkCount) - 1;
-            if (count > 0) {
-                context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
-                context.deleteExecutionPath();
-            }
-            else {
-                context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null);
-            }
-            return (count == 0);
-        }
-
-        public List<String> multiExit(Context context) {
-            String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
-            // NOW we delete..
-            context.deleteExecutionPath();
-
-            String transition = context.getNodeDef().getTransitions().get(0);
-            String fullTransition = context.createFullTransition(parentExecutionPath, transition);
-            List<String> transitions = new ArrayList<String>(1);
-            transitions.add(fullTransition);
-            return transitions;
-        }
-
-        public String exit(Context context) {
-            throw new UnsupportedOperationException();
-        }
-
-        public void kill(Context context) {
-        }
-
-        public void fail(Context context) {
-        }
+    public JoinNodeDef(String name, Class<? extends ControlNodeHandler> klass, String transition) {
+        super(name, "", klass, Arrays.asList(transition));
     }
 
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java Fri Aug  3 04:59:53 2012
@@ -19,33 +19,17 @@ package org.apache.oozie.workflow.lite;
 
 import java.util.Collections;
 
-//TODO javadoc
-public class KillNodeDef extends NodeDef {
+/**
+ * Node definition for KILL control node.
+ */
+public class KillNodeDef extends ControlNodeDef {
 
     KillNodeDef() {
     }
 
     @SuppressWarnings("unchecked")
-    public KillNodeDef(String name, String message) {
-        super(name, message, KillNodeHandler.class, Collections.EMPTY_LIST);
-    }
-
-    public static class KillNodeHandler extends NodeHandler {
-
-        public boolean enter(Context context) {
-            return true;
-        }
-
-        public String exit(Context context) {
-            context.killJob();
-            return null;
-        }
-
-        public void kill(Context context) {
-        }
-
-        public void fail(Context context) {
-        }
+    public KillNodeDef(String name, String message, Class<? extends ControlNodeHandler> klass) {
+        super(name, message, klass, Collections.EMPTY_LIST);
     }
 
 }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java Fri Aug  3 04:59:53 2012
@@ -41,8 +41,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
 
 /**
  * Class to parse and validate workflow xml
@@ -81,6 +79,7 @@ public class LiteWorkflowAppParser {
     public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
 
     private Schema schema;
+    private Class<? extends ControlNodeHandler> controlNodeHandler;
     private Class<? extends DecisionNodeHandler> decisionHandlerClass;
     private Class<? extends ActionNodeHandler> actionHandlerClass;
 
@@ -91,9 +90,12 @@ public class LiteWorkflowAppParser {
     private List<String> forkList = new ArrayList<String>();
     private List<String> joinList = new ArrayList<String>();
 
-    public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+    public LiteWorkflowAppParser(Schema schema,
+                                 Class<? extends ControlNodeHandler> controlNodeHandler,
+                                 Class<? extends DecisionNodeHandler> decisionHandlerClass,
                                  Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
         this.schema = schema;
+        this.controlNodeHandler = controlNodeHandler;
         this.decisionHandlerClass = decisionHandlerClass;
         this.actionHandlerClass = actionHandlerClass;
     }
@@ -260,15 +262,16 @@ public class LiteWorkflowAppParser {
         for (Element eNode : (List<Element>) root.getChildren()) {
             if (eNode.getName().equals(START_E)) {
                 def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
-                                          new StartNodeDef(eNode.getAttributeValue(TO_A)));
+                                          new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
             }
             else {
                 if (eNode.getName().equals(END_E)) {
-                    def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
+                    def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
                 }
                 else {
                     if (eNode.getName().equals(KILL_E)) {
-                        def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
+                        def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
+                                                    eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
                     }
                     else {
                         if (eNode.getName().equals(FORK_E)) {
@@ -276,11 +279,12 @@ public class LiteWorkflowAppParser {
                             for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
                                 paths.add(tran.getAttributeValue(FORK_START_A));
                             }
-                            def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
+                            def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
                         }
                         else {
                             if (eNode.getName().equals(JOIN_E)) {
-                                def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
+                                def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
+                                                            eNode.getAttributeValue(TO_A)));
                             }
                             else {
                                 if (eNode.getName().equals(DECISION_E)) {

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java Fri Aug  3 04:59:53 2012
@@ -222,7 +222,10 @@ public class LiteWorkflowInstance implem
                     // TEST THIS
                     if (last >= 0) {
                         String transitionTo = getTransitionNode(fullTransitions.get(last));
-
+                        if (nodeDef instanceof ForkNodeDef) {
+                            transitionTo = "*"; // WF action cannot hold all transitions for a fork.
+                                                // transitions are hardcoded in the WF app.
+                        }
                         persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
                                            transitionTo);
                     }
@@ -299,6 +302,7 @@ public class LiteWorkflowInstance implem
             }
             else {
                 List<String> killedNodes = terminateNodes(Status.KILLED);
+
                 if (killedNodes.size() > 1) {
                     log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
                             .size());
@@ -428,21 +432,23 @@ public class LiteWorkflowInstance implem
         for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
             if (entry.getValue().started) {
                 NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
-                NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
-                try {
-                    if (endStatus == Status.KILLED) {
-                        nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
-                    }
-                    else {
-                        if (endStatus == Status.FAILED) {
-                            nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
+                if (!(nodeDef instanceof ControlNodeDef)) {
+                    NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
+                    try {
+                        if (endStatus == Status.KILLED) {
+                            nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
                         }
+                        else {
+                            if (endStatus == Status.FAILED) {
+                                nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
+                            }
+                        }
+                        endNodes.add(nodeDef.getName());
+                    }
+                    catch (Exception ex) {
+                        log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
+                                 nodeDef.getName(), ex);
                     }
-                    endNodes.add(nodeDef.getName());
-                }
-                catch (Exception ex) {
-                    log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
-                             nodeDef.getName(), ex);
                 }
             }
         }

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java Fri Aug  3 04:59:53 2012
@@ -34,12 +34,16 @@ import java.io.StringReader;
 //TODO javadoc
 public abstract class LiteWorkflowLib implements WorkflowLib {
     private Schema schema;
+    private Class<? extends ControlNodeHandler> controlHandlerClass;
     private Class<? extends DecisionNodeHandler> decisionHandlerClass;
     private Class<? extends ActionNodeHandler> actionHandlerClass;
 
-    public LiteWorkflowLib(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+    public LiteWorkflowLib(Schema schema,
+                           Class<? extends ControlNodeHandler> controlNodeHandler,
+                           Class<? extends DecisionNodeHandler> decisionHandlerClass,
                            Class<? extends ActionNodeHandler> actionHandlerClass) {
         this.schema = schema;
+        this.controlHandlerClass = controlNodeHandler;
         this.decisionHandlerClass = decisionHandlerClass;
         this.actionHandlerClass = actionHandlerClass;
     }
@@ -47,7 +51,7 @@ public abstract class LiteWorkflowLib im
     @Override
     public WorkflowApp parseDef(String appXml) throws WorkflowException {
         ParamChecker.notEmpty(appXml, "appXml");
-        return new LiteWorkflowAppParser(schema, decisionHandlerClass, actionHandlerClass)
+        return new LiteWorkflowAppParser(schema, controlHandlerClass, decisionHandlerClass, actionHandlerClass)
                 .validateAndParse(new StringReader(appXml));
     }
 

Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java Fri Aug  3 04:59:53 2012
@@ -18,22 +18,19 @@
 package org.apache.oozie.workflow.lite;
 
 
-import org.apache.oozie.workflow.WorkflowException;
 import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.ErrorCode;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
 
 /**
  * Workflow lite start node definition.
  */
-public class StartNodeDef extends NodeDef {
+public class StartNodeDef extends ControlNodeDef {
 
     /**
      * Reserved name fo the start node. <p/> It is an invalid token, it will never match an application node name.
      */
-    public static final String START = "::start::";
+    public static final String START = ":start:";
 
     /**
      * Default constructor.
@@ -44,39 +41,11 @@ public class StartNodeDef extends NodeDe
     /**
      * Create a start node definition.
      *
+     * @param klass control node handler class.
      * @param transitionTo transition on workflow start.
      */
-    public StartNodeDef(String transitionTo) {
-        super(START, null, StartNodeHandler.class, createList(ParamChecker.notEmpty(transitionTo, "transitionTo")));
-    }
-
-    private static List<String> createList(String transition) {
-        List<String> list = new ArrayList<String>();
-        list.add(transition);
-        return list;
-    }
-
-    /**
-     * Start node handler. <p/> It does an immediate transition to the transitionTo node.
-     */
-    public static class StartNodeHandler extends NodeHandler {
-
-        public boolean enter(Context context) throws WorkflowException {
-            if (!context.getSignalValue().equals(StartNodeDef.START)) {
-                throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
-            }
-            return true;
-        }
-
-        public String exit(Context context) {
-            return context.getNodeDef().getTransitions().get(0);
-        }
-
-        public void kill(Context context) {
-        }
-
-        public void fail(Context context) {
-        }
+    public StartNodeDef(Class<? extends ControlNodeHandler> klass, String transitionTo) {
+        super(START, "", klass, Arrays.asList(ParamChecker.notEmpty(transitionTo, "transitionTo")));
     }
 
 }

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java Fri Aug  3 04:59:53 2012
@@ -19,6 +19,7 @@ package org.apache.oozie;
 
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.workflow.lite.EndNodeDef;
 import org.apache.oozie.workflow.lite.LiteWorkflowApp;
@@ -53,7 +54,9 @@ public class TestDagELFunctions extends 
         conf.set(OozieClient.USER_NAME, "user");
         conf.set("a", "A");
         LiteWorkflowApp def =
-                new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef("end")).addNode(new EndNodeDef("end"));
+                new LiteWorkflowApp("name", "<workflow-app/>",
+                    new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+                        addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
 
         WorkflowJobBean wf = new WorkflowJobBean();

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java Fri Aug  3 04:59:53 2012
@@ -127,21 +127,21 @@ public class TestDagEngine extends XTest
         assertEquals("CCCC", conf.get("e"));
         assertEquals("CCCCCCCC", conf.get("f"));
 
-        waitFor(5000, new Predicate() {
+        waitFor(10000, new Predicate() {
             public boolean evaluate() throws Exception {
                 WorkflowJobBean bean = Services.get().get(WorkflowStoreService.class).create().getWorkflow(jobId1, false);
                 return bean.getWorkflowInstance().getStatus().isEndState();
             }
         });
         assertEquals(WorkflowJob.Status.KILLED, engine.getJob(jobId1).getStatus());
-        waitFor(5000, new Predicate() {
+        waitFor(10000, new Predicate() {
             public boolean evaluate() throws Exception {
                 return CallbackServlet.JOB_ID != null;
             }
         });
         assertEquals(wf.getId(), CallbackServlet.JOB_ID);
-        assertEquals("a", CallbackServlet.NODE_NAME);
-        assertEquals("T:kill", CallbackServlet.STATUS);
+        assertEquals("kill", CallbackServlet.NODE_NAME);
+        assertEquals("T:null", CallbackServlet.STATUS);
     }
 
     public void testJobDefinition() throws Exception {

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java Fri Aug  3 04:59:53 2012
@@ -27,6 +27,8 @@ import java.io.Writer;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.oozie.action.control.StartActionExecutor;
+import org.apache.oozie.action.hadoop.FsActionExecutor;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.OozieClient;
@@ -87,7 +89,7 @@ public class TestActionFailover extends 
                 return getFileSystem().exists(target);
             }
         });
-        assertTrue(getFileSystem().exists(target));
+        assertFalse(getFileSystem().exists(target));
 
         waitFor(10 * 1000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -101,8 +103,10 @@ public class TestActionFailover extends 
         WorkflowStore store = Services.get().get(WorkflowStoreService.class).create();
 
         List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId1, false);
+        assertEquals(1, actions.size());
         WorkflowActionBean action = actions.get(0);
         assertEquals(WorkflowAction.Status.PREP, action.getStatus());
+        assertEquals(StartActionExecutor.TYPE, action.getType());
 
         setSystemProperty(FaultInjection.FAULT_INJECTION, "false");
         setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "false");

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java Fri Aug  3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.service.CallbackService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
@@ -226,8 +227,10 @@ public abstract class ActionExecutorTest
         content += "<end name='end' /></workflow-app>";
         writeToFile(content, getAppPath(), "workflow.xml");
 
-        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef("end"))
-                .addNode(new EndNodeDef("end"));
+        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+                                              new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class,
+                                                               "end"))
+                .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         XConfiguration wfConf = new XConfiguration();
         wfConf.set(OozieClient.USER_NAME, getTestUser());
         wfConf.set(OozieClient.APP_PATH, appUri.toString());
@@ -259,8 +262,9 @@ public abstract class ActionExecutorTest
 
         writeToFile(wfxml, getAppPath(), "workflow.xml");
 
-        WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml, new StartNodeDef("start")).addNode(new EndNodeDef(
-                "end"));
+        WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml,
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")).
+            addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         XConfiguration wfConf = new XConfiguration();
         wfConf.set(OozieClient.USER_NAME, getTestUser());
         wfConf.set(OozieClient.APP_PATH, appUri.toString());

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java Fri Aug  3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.DagELFunctions;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.workflow.lite.EndNodeDef;
 import org.apache.oozie.workflow.lite.LiteWorkflowApp;
 import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
@@ -74,7 +75,9 @@ public class TestFsELFunctions extends X
         conf.set("dir", dir);
 
         LiteWorkflowApp def =
-                new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef("end")).addNode(new EndNodeDef("end"));
+                new LiteWorkflowApp("name", "<workflow-app/>",
+                                    new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+                    addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
 
         WorkflowJobBean wf = new WorkflowJobBean();

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java Fri Aug  3 04:59:53 2012
@@ -22,6 +22,7 @@ import org.apache.oozie.WorkflowActionBe
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.UUIDService.ApplicationType;
@@ -49,8 +50,9 @@ public class TestHadoopELFunctions exten
 
         WorkflowJobBean workflow = new WorkflowJobBean();
         workflow.setProtoActionConf("<configuration/>");
-        LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>", new StartNodeDef("a"));
-        wfApp.addNode(new EndNodeDef("a"));
+        LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "a"));
+        wfApp.addNode(new EndNodeDef("a", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         WorkflowInstance wi = new LiteWorkflowInstance(wfApp, new XConfiguration(), "1");
 
         workflow.setWorkflowInstance(wi);
@@ -93,8 +95,9 @@ public class TestHadoopELFunctions exten
 
         WorkflowJobBean workflow = new WorkflowJobBean();
         workflow.setProtoActionConf("<configuration/>");
-        LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>", new StartNodeDef("a"));
-        wfApp.addNode(new EndNodeDef("a"));
+        LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>",
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "a"));
+        wfApp.addNode(new EndNodeDef("a", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         WorkflowInstance wi = new LiteWorkflowInstance(wfApp, new XConfiguration(), "1");
 
         workflow.setWorkflowInstance(wi);

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -50,6 +50,7 @@ import org.apache.oozie.client.OozieClie
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
@@ -794,8 +795,9 @@ public class TestJavaActionExecutor exte
     }
 
     private WorkflowJobBean addRecordToWfJobTable(String wfId, String wfxml) throws Exception {
-        WorkflowApp app = new LiteWorkflowApp("testApp", wfxml, new StartNodeDef("start"))
-                .addNode(new EndNodeDef("end"));
+        WorkflowApp app = new LiteWorkflowApp("testApp", wfxml,
+            new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")).
+                addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
         Configuration conf = Services.get().get(HadoopAccessorService.class).
             createJobConf(new URI(getNameNodeUri()).getAuthority());
         conf.set(OozieClient.APP_PATH, getNameNodeUri() + "/testPath");

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java Fri Aug  3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.WorkflowJ
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.workflow.lite.NodeHandler;
 
 import java.io.File;
 import java.io.StringReader;
@@ -229,7 +230,7 @@ public class TestSubWorkflowActionExecut
         writer.close();
 
         XConfiguration protoConf = getBaseProtoConf();
-        WorkflowJobBean workflow = createBaseWorkflow(protoConf, "W");
+        final WorkflowJobBean workflow = createBaseWorkflow(protoConf, "W");
         String defaultConf = workflow.getConf();
         XConfiguration newConf = new XConfiguration(new StringReader(defaultConf));
         String actionConf = "<sub-workflow xmlns='uri:oozie:workflow:0.1' name='subwf'>" +
@@ -246,13 +247,20 @@ public class TestSubWorkflowActionExecut
         action.setConf(actionConf);
 
         // negative test
-        SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor();
+        final SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor();
         workflow.setConf(newConf.toXmlString());
 
         subWorkflow.start(new Context(workflow, action), action);
 
         OozieClient oozieClient = subWorkflow.getWorkflowClient(new Context(workflow, action),
                                                                       SubWorkflowActionExecutor.LOCAL);
+        waitFor(5000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                subWorkflow.check(new Context(workflow, action), action);
+                return action.getStatus() == WorkflowActionBean.Status.DONE;
+            }
+        });
 
         subWorkflow.check(new Context(workflow, action), action);
         subWorkflow.end(new Context(workflow, action), action);
@@ -267,7 +275,7 @@ public class TestSubWorkflowActionExecut
         // positive test
         newConf.set(OozieClient.GROUP_NAME, getTestGroup());
         workflow.setConf(newConf.toXmlString());
-        WorkflowActionBean action1 = new WorkflowActionBean();
+        final WorkflowActionBean action1 = new WorkflowActionBean();
         action1.setConf(actionConf);
         action1.setId("W1");
 
@@ -276,6 +284,14 @@ public class TestSubWorkflowActionExecut
         oozieClient = subWorkflow.getWorkflowClient(new Context(workflow, action1),
                                                                       SubWorkflowActionExecutor.LOCAL);
 
+        waitFor(5000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                subWorkflow.check(new Context(workflow, action1), action1);
+                return action1.getStatus() == WorkflowActionBean.Status.DONE;
+            }
+        });
+
         subWorkflow.check(new Context(workflow, action1), action1);
         subWorkflow.end(new Context(workflow, action1), action1);
 

Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Fri Aug  3 04:59:53 2012
@@ -30,6 +30,7 @@ import org.apache.oozie.DagEngine;
 import org.apache.oozie.ForTestingActionExecutor;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.KillActionExecutor;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
@@ -278,9 +279,16 @@ public class TestActionErrors extends XD
         List<WorkflowActionBean> actions = jpaService.execute(wfActionsGetCmd);
 
         int n = actions.size();
-        WorkflowActionBean action = actions.get(n - 1);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
         assertEquals("TEST_ERROR", action.getErrorCode());
-        assertEquals("[end]", action.getErrorMessage());
+        assertEquals("end", action.getErrorMessage());
         assertEquals(WorkflowAction.Status.ERROR, action.getStatus());
     }
 
@@ -456,7 +464,14 @@ public class TestActionErrors extends XD
         store.beginTrx();
         while (retryCount <= maxRetries) {
             List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false);
-            WorkflowActionBean action = actions.get(0);
+            WorkflowActionBean action = null;
+            for (WorkflowActionBean bean : actions) {
+                if (bean.getType().equals("test")) {
+                    action = bean;
+                    break;
+                }
+            }
+            assertNotNull(action);
             aId = action.getId();
             assertEquals(expectedStatus, action.getStatus());
             assertEquals(expectedRetryCount, action.getRetries());
@@ -534,7 +549,7 @@ public class TestActionErrors extends XD
         store.commitTrx();
         store.closeTrx();
     }
-    
+
     /**
      * Provides functionality to test user retry
      *
@@ -562,18 +577,31 @@ public class TestActionErrors extends XD
 
         final JPAService jpaService = Services.get().get(JPAService.class);
         final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId);
-    
+
         final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
         waitFor(5000, new Predicate() {
             public boolean evaluate() throws Exception {
-            	List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
-                WorkflowActionBean action = actions.get(0);
-                return (action.getUserRetryCount() == 2);
+                List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+                WorkflowActionBean action = null;
+                for (WorkflowActionBean bean : actions) {
+                    if (bean.getType().equals("test")) {
+                        action = bean;
+                        break;
+                    }
+                }
+                return (action != null && action.getUserRetryCount() == 2);
             }
         });
-        
+
         List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
-        WorkflowActionBean action = actions.get(0);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
         assertEquals(2, action.getUserRetryCount());
     }
 
@@ -620,7 +648,14 @@ public class TestActionErrors extends XD
         assertEquals(WorkflowJob.Status.FAILED, engine.getJob(jobId).getStatus());
 
         List<WorkflowActionBean> actions = store2.getActionsForWorkflow(jobId, false);
-        WorkflowActionBean action = actions.get(0);
+        WorkflowActionBean action = null;
+        for (WorkflowActionBean bean : actions) {
+            if (bean.getType().equals("test")) {
+                action = bean;
+                break;
+            }
+        }
+        assertNotNull(action);
         assertEquals(expActionErrorCode, action.getErrorCode());
         store2.commitTrx();
         store2.closeTrx();