You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by tu...@apache.org on 2012/08/03 06:59:55 UTC
svn commit: r1368795 [1/2] - in /incubator/oozie/trunk: ./ core/
core/src/main/java/org/apache/oozie/action/control/
core/src/main/java/org/apache/oozie/action/hadoop/
core/src/main/java/org/apache/oozie/command/wf/
core/src/main/java/org/apache/oozie/...
Author: tucu
Date: Fri Aug 3 04:59:53 2012
New Revision: 1368795
URL: http://svn.apache.org/viewvc?rev=1368795&view=rev
Log:
OOZIE-243 Workflow nodes START/END/KILL/FORK/JOIN should create rows in the action DB table (tucu)
Added:
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
Modified:
incubator/oozie/trunk/core/pom.xml
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestPurgeXCommand.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestBulkUpdateInsertJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowIdGetForExternalIdJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/executor/jpa/TestWorkflowJobsGetForPurgeJPAExecutor.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestActionCheckerService.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestLiteWorkflowAppService.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/store/TestDBWorkflowStore.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
incubator/oozie/trunk/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowLib.java
incubator/oozie/trunk/release-log.txt
Modified: incubator/oozie/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/pom.xml?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/pom.xml (original)
+++ incubator/oozie/trunk/core/pom.xml Fri Aug 3 04:59:53 2012
@@ -39,7 +39,7 @@
<dependency>
<groupId>org.apache.oozie</groupId>
<artifactId>oozie-hadoop-test</artifactId>
- <scope>test</scope>
+ <scope>provided</scope>
</dependency>
<dependency>
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ControlNodeActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.hadoop.FsActionExecutor;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.workflow.lite.ControlNodeHandler;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.Namespace;
+
+import java.util.List;
+
+/**
+ * Base action executor for control nodes: START/END/KILL/FORK/JOIN
+ * <p/>
+ * This action executor, similar to {@link FsActionExecutor}, is completed during the
+ * {@link #start(Context, WorkflowAction)}.
+ * <p/>
+ * By hooking control nodes to an action executor, control nodes get WF action entries in the DB.
+ */
+public abstract class ControlNodeActionExecutor extends ActionExecutor {
+
+ public ControlNodeActionExecutor(String type) {
+ super(type);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void start(Context context, WorkflowAction action) throws ActionExecutorException {
+ context.setStartData("-", "-", "-");
+ context.setExecutionData("OK", null);
+ }
+
+ public void end(Context context, WorkflowAction action) throws ActionExecutorException {
+ context.setEndData(WorkflowAction.Status.OK, getActionSignal(WorkflowAction.Status.OK));
+ }
+
+ public void check(Context context, WorkflowAction action) throws ActionExecutorException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isCompleted(String externalStatus) {
+ return true;
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/EndActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for END control node.
+ */
+public class EndActionExecutor extends ControlNodeActionExecutor {
+ public static final String TYPE = ":END:";
+
+ public EndActionExecutor() {
+ super(TYPE);
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/ForkActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for FORK control node.
+ */
+public class ForkActionExecutor extends ControlNodeActionExecutor {
+ public static final String TYPE = ":FORK:";
+
+ public ForkActionExecutor() {
+ super(TYPE);
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/JoinActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for JOIN control node.
+ */
+public class JoinActionExecutor extends ControlNodeActionExecutor {
+ public static final String TYPE = ":JOIN:";
+
+ public JoinActionExecutor() {
+ super(TYPE);
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/KillActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for KILL control node.
+ */
+public class KillActionExecutor extends ControlNodeActionExecutor {
+ public static final String TYPE = ":KILL:";
+
+ public KillActionExecutor() {
+ super(TYPE);
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/control/StartActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.control;
+
+/**
+ * Action executor for START control node.
+ */
+public class StartActionExecutor extends ControlNodeActionExecutor {
+ public static final String TYPE = ":START:";
+
+ public StartActionExecutor() {
+ super(TYPE);
+ }
+
+}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -156,7 +156,7 @@ public class JavaActionExecutor extends
ActionExecutorException.ErrorType.NON_TRANSIENT, "JA004");
registerError(org.apache.hadoop.hdfs.server.namenode.SafeModeException.class.getName(),
ActionExecutorException.ErrorType.NON_TRANSIENT, "JA005");
- registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA006");
+ registerError(ConnectException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, " JA006");
registerError(JDOMException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA007");
registerError(FileNotFoundException.class.getName(), ActionExecutorException.ErrorType.ERROR, "JA008");
registerError(IOException.class.getName(), ActionExecutorException.ErrorType.TRANSIENT, "JA009");
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java Fri Aug 3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -126,8 +127,15 @@ public class ActionEndXCommand extends A
LOG.debug("STARTED ActionEndXCommand for action " + actionId);
Configuration conf = wfJob.getWorkflowInstance().getConf();
- int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
- long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+
+ int maxRetries = 0;
+ long retryInterval = 0;
+
+ if (!(executor instanceof ControlNodeActionExecutor)) {
+ maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
+ retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+ }
+
executor.setMaxRetries(maxRetries);
executor.setRetryInterval(retryInterval);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java Fri Aug 3 04:59:53 2012
@@ -28,6 +28,7 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -135,8 +136,14 @@ public class ActionStartXCommand extends
LOG.debug("STARTED ActionStartXCommand for wf actionId=" + actionId);
Configuration conf = wfJob.getWorkflowInstance().getConf();
- int maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
- long retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+ int maxRetries = 0;
+ long retryInterval = 0;
+
+ if (!(executor instanceof ControlNodeActionExecutor)) {
+ maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries());
+ retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval());
+ }
+
executor.setMaxRetries(maxRetries);
executor.setRetryInterval(retryInterval);
@@ -153,11 +160,13 @@ public class ActionStartXCommand extends
}
context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry);
try {
- String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
- String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
- wfAction.setConf(actionConf);
- LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
- .getType(), actionConf);
+ if (!(executor instanceof ControlNodeActionExecutor)) {
+ String tmpActionConf = XmlUtils.removeComments(wfAction.getConf());
+ String actionConf = context.getELEvaluator().evaluate(tmpActionConf, String.class);
+ wfAction.setConf(actionConf);
+ LOG.debug("Start, name [{0}] type [{1}] configuration{E}{E}{2}{E}", wfAction.getName(), wfAction
+ .getType(), actionConf);
+ }
}
catch (ELEvaluationException ex) {
throw new ActionExecutorException(ActionExecutorException.ErrorType.TRANSIENT, EL_EVAL_ERROR, ex
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/ActionService.java Fri Aug 3 04:59:53 2012
@@ -19,6 +19,11 @@ package org.apache.oozie.service;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
@@ -45,8 +50,12 @@ public class ActionService implements Se
ActionExecutor.resetInitInfo();
ActionExecutor.disableInit();
executors = new HashMap<String, Class<? extends ActionExecutor>>();
- Class<? extends ActionExecutor>[] classes =
- (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_CLASSES);
+
+ Class<? extends ActionExecutor>[] classes = new Class[] { StartActionExecutor.class,
+ EndActionExecutor.class, KillActionExecutor.class, ForkActionExecutor.class, JoinActionExecutor.class };
+ registerExecutors(classes);
+
+ classes = (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_CLASSES);
registerExecutors(classes);
classes = (Class<? extends ActionExecutor>[]) services.getConf().getClasses(CONF_ACTION_EXECUTOR_EXT_CLASSES);
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/DBLiteWorkflowStoreService.java Fri Aug 3 04:59:53 2012
@@ -134,7 +134,8 @@ public class DBLiteWorkflowStoreService
private WorkflowLib getWorkflowLib(Connection conn) {
javax.xml.validation.Schema schema = Services.get().get(SchemaService.class).getSchema(SchemaName.WORKFLOW);
- return new DBLiteWorkflowLib(schema, LiteDecisionHandler.class, LiteActionHandler.class, conn);
+ return new DBLiteWorkflowLib(schema, LiteControlNodeHandler.class,
+ LiteDecisionHandler.class, LiteActionHandler.class, conn);
}
@Override
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java Fri Aug 3 04:59:53 2012
@@ -19,6 +19,11 @@ package org.apache.oozie.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.command.wf.ReRunXCommand;
import org.apache.oozie.client.WorkflowAction;
@@ -28,10 +33,17 @@ import org.apache.oozie.ErrorCode;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.ActionNodeHandler;
+import org.apache.oozie.workflow.lite.ControlNodeHandler;
import org.apache.oozie.workflow.lite.DecisionNodeHandler;
+import org.apache.oozie.workflow.lite.EndNodeDef;
+import org.apache.oozie.workflow.lite.ForkNodeDef;
+import org.apache.oozie.workflow.lite.JoinNodeDef;
+import org.apache.oozie.workflow.lite.KillNodeDef;
+import org.apache.oozie.workflow.lite.NodeDef;
import org.apache.oozie.workflow.lite.NodeHandler;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
+import org.apache.oozie.workflow.lite.StartNodeDef;
import org.jdom.Element;
import org.jdom.JDOMException;
@@ -59,10 +71,11 @@ public abstract class LiteWorkflowStoreS
* necessary information to create ActionExecutors.
*
* @param context NodeHandler context.
+ * @param actionType the action type.
* @throws WorkflowException thrown if there was an error parsing the action configuration.
*/
@SuppressWarnings("unchecked")
- protected static void liteExecute(NodeHandler.Context context) throws WorkflowException {
+ protected static void liteExecute(NodeHandler.Context context, String actionType) throws WorkflowException {
XLog log = XLog.getLog(LiteWorkflowStoreService.class);
String jobId = context.getProcessInstance().getId();
String nodeName = context.getNodeDef().getName();
@@ -79,16 +92,16 @@ public abstract class LiteWorkflowStoreS
String nodeConf = context.getNodeDef().getConf();
String executionPath = context.getExecutionPath();
- String actionType;
- try {
- Element element = XmlUtils.parseXml(nodeConf);
- actionType = element.getName();
- nodeConf = XmlUtils.prettyPrint(element).toString();
- }
- catch (JDOMException ex) {
- throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
+ if (actionType == null) {
+ try {
+ Element element = XmlUtils.parseXml(nodeConf);
+ actionType = element.getName();
+ nodeConf = XmlUtils.prettyPrint(element).toString();
+ }
+ catch (JDOMException ex) {
+ throw new WorkflowException(ErrorCode.E0700, ex.getMessage(), ex);
+ }
}
-
log.debug(" Creating action for node [{0}]", nodeName);
action.setType(actionType);
action.setExecutionPath(executionPath);
@@ -236,7 +249,7 @@ public abstract class LiteWorkflowStoreS
@Override
public void start(Context context) throws WorkflowException {
- liteExecute(context);
+ liteExecute(context, null);
}
@Override
@@ -259,7 +272,7 @@ public abstract class LiteWorkflowStoreS
@Override
public void start(Context context) throws WorkflowException {
- liteExecute(context);
+ liteExecute(context, null);
}
@Override
@@ -276,4 +289,34 @@ public abstract class LiteWorkflowStoreS
liteFail(context);
}
}
+
+ // wires workflow lib control nodes with Oozie Dag
+ public static class LiteControlNodeHandler extends ControlNodeHandler {
+
+ @Override
+ public void touch(Context context) throws WorkflowException {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ String nodeType;
+ if (nodeClass.equals(StartNodeDef.class)) {
+ nodeType = StartActionExecutor.TYPE;
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ nodeType = EndActionExecutor.TYPE;
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ nodeType = KillActionExecutor.TYPE;
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ nodeType = ForkActionExecutor.TYPE;
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ nodeType = JoinActionExecutor.TYPE;
+ } else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+
+ liteExecute(context, nodeType);
+ }
+
+ }
}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeDef.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.workflow.lite;
+
+import java.util.List;
+
+/**
+ * Node definition for control nodes: START/END/KILL/FORK/JOIN.
+ */
+public abstract class ControlNodeDef extends NodeDef {
+
+ ControlNodeDef() {
+ }
+
+ @SuppressWarnings("unchecked")
+ public ControlNodeDef(String name, String conf, Class<? extends ControlNodeHandler> controlHandlerClass,
+ List <String> transitions) {
+ super(name, conf, controlHandlerClass, transitions);
+ }
+
+}
Added: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java?rev=1368795&view=auto
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java (added)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java Fri Aug 3 04:59:53 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.workflow.lite;
+
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.workflow.WorkflowException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * Node handler that provides the necessary workflow logic for control nodes: START/END/KILL/FORK/JOIN.
+ */
+public abstract class ControlNodeHandler extends NodeHandler {
+
+ public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+
+ /**
+ * Called by {@link #enter(Context)} when returning TRUE.
+ *
+ * @param context workflow context
+ * @throws WorkflowException thrown if an error occurred.
+ */
+ public abstract void touch(Context context) throws WorkflowException;
+
+ @Override
+ public boolean enter(Context context) throws WorkflowException {
+ boolean doTouch;
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ if (!context.getSignalValue().equals(StartNodeDef.START)) {
+ throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
+ }
+ doTouch = true;
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ doTouch = true;
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ doTouch = true;
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ doTouch = true;
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+ String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+ if (forkCount == null) {
+ throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
+ }
+ int count = Integer.parseInt(forkCount) - 1;
+ if (count > 0) {
+ context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
+ context.deleteExecutionPath();
+ }
+ else {
+ context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null);
+ }
+ doTouch = (count == 0);
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ if (doTouch) {
+ touch(context);
+ }
+ return false;
+ }
+
+ @Override
+ public String exit(Context context) throws WorkflowException {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ return context.getNodeDef().getTransitions().get(0);
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ context.completeJob();
+ return null;
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ context.killJob();
+ return null;
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ throw new UnsupportedOperationException();
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ throw new UnsupportedOperationException();
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ }
+
+ @Override
+ public void loopDetection(Context context)
+ throws WorkflowException {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ String flag = getLoopFlag(context.getNodeDef().getName());
+ if (context.getVar(flag) != null) {
+ throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
+ }
+ String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+ String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath);
+ if (forkCount == null) {
+ throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
+ }
+ int count = Integer.parseInt(forkCount) - 1;
+ if (count == 0) {
+ context.setVar(flag, "true");
+ }
+
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ }
+
+ @Override
+ public List<String> multiExit(Context context)
+ throws WorkflowException {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ return super.multiExit(context);
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ return super.multiExit(context);
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ return super.multiExit(context);
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ List<String> transitions = context.getNodeDef().getTransitions();
+ context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
+
+ List<String> fullTransitions = new ArrayList<String>(transitions.size());
+
+ for (String transition : transitions) {
+ String childExecutionPath = context.createExecutionPath(transition);
+ String fullTransition = context.createFullTransition(childExecutionPath, transition);
+ fullTransitions.add(fullTransition);
+ }
+ return fullTransitions;
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
+ // NOW we delete..
+ context.deleteExecutionPath();
+
+ String transition = context.getNodeDef().getTransitions().get(0);
+ String fullTransition = context.createFullTransition(parentExecutionPath, transition);
+ List<String> transitions = new ArrayList<String>(1);
+ transitions.add(fullTransition);
+ return transitions;
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ }
+
+ @Override
+ public void kill(Context context) {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ //NOP
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ }
+
+ @Override
+ public void fail(Context context) {
+ Class<? extends NodeDef> nodeClass = context.getNodeDef().getClass();
+ if (nodeClass.equals(StartNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(EndNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(KillNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(ForkNodeDef.class)) {
+ //NOP
+ }
+ else if (nodeClass.equals(JoinNodeDef.class)) {
+ //NOP
+ }
+ else {
+ throw new IllegalStateException("Invalid node type: " + nodeClass);
+ }
+ }
+}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/DBLiteWorkflowLib.java Fri Aug 3 04:59:53 2012
@@ -35,9 +35,11 @@ import org.apache.oozie.ErrorCode;
public class DBLiteWorkflowLib extends LiteWorkflowLib {
private final Connection connection;
- public DBLiteWorkflowLib(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+ public DBLiteWorkflowLib(Schema schema,
+ Class<? extends ControlNodeHandler> controlNodeHandler,
+ Class<? extends DecisionNodeHandler> decisionHandlerClass,
Class<? extends ActionNodeHandler> actionHandlerClass, Connection connection) {
- super(schema, decisionHandlerClass, actionHandlerClass);
+ super(schema, controlNodeHandler, decisionHandlerClass, actionHandlerClass);
this.connection = connection;
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/EndNodeDef.java Fri Aug 3 04:59:53 2012
@@ -19,34 +19,17 @@ package org.apache.oozie.workflow.lite;
import java.util.Collections;
-//TODO javadoc
-public class EndNodeDef extends NodeDef {
+/**
+ * Node definition for END control node.
+ */
+public class EndNodeDef extends ControlNodeDef {
EndNodeDef() {
}
@SuppressWarnings("unchecked")
- public EndNodeDef(String name) {
- super(name, null, EndNodeHandler.class, Collections.EMPTY_LIST);
- }
-
- public static class EndNodeHandler extends NodeHandler {
-
- public boolean enter(Context context) {
- return true;
- }
-
- public String exit(Context context) {
- context.completeJob();
- return null;
- }
-
- public void kill(Context context) {
- }
-
- public void fail(Context context) {
- }
-
+ public EndNodeDef(String name, Class<? extends ControlNodeHandler> klass) {
+ super(name, "", klass, Collections.EMPTY_LIST);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/ForkNodeDef.java Fri Aug 3 04:59:53 2012
@@ -17,52 +17,19 @@
*/
package org.apache.oozie.workflow.lite;
-import java.util.ArrayList;
import java.util.List;
-//TODO javadoc
-public class ForkNodeDef extends NodeDef {
-
- public static final String FORK_COUNT_PREFIX = "workflow.fork.";
+/**
+ * Node definition for FORK control node.
+ */
+public class ForkNodeDef extends ControlNodeDef {
ForkNodeDef() {
}
- public ForkNodeDef(String name, List<String> transitions) {
- super(name, null, ForkNodeHandler.class, transitions);
- }
-
- public static class ForkNodeHandler extends NodeHandler {
-
- public boolean enter(Context context) {
- return true;
- }
-
- // the return list contains (parentExecutionPath/transition#transition)+
- public List<String> multiExit(Context context) {
- List<String> transitions = context.getNodeDef().getTransitions();
- context.setVar(FORK_COUNT_PREFIX + context.getExecutionPath(), "" + transitions.size());
-
- List<String> fullTransitions = new ArrayList<String>(transitions.size());
-
- for (String transition : transitions) {
- String childExecutionPath = context.createExecutionPath(transition);
- String fullTransition = context.createFullTransition(childExecutionPath, transition);
- fullTransitions.add(fullTransition);
- }
- return fullTransitions;
- }
-
- public String exit(Context context) {
- throw new UnsupportedOperationException();
- }
-
- public void kill(Context context) {
- }
-
- public void fail(Context context) {
- }
-
+ public ForkNodeDef(String name, Class<? extends ControlNodeHandler> klass,
+ List<String> transitions) {
+ super(name, "", klass, transitions);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/JoinNodeDef.java Fri Aug 3 04:59:53 2012
@@ -17,79 +17,18 @@
*/
package org.apache.oozie.workflow.lite;
-import org.apache.oozie.workflow.WorkflowException;
-import org.apache.oozie.ErrorCode;
-
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
-//TODO javadoc
-public class JoinNodeDef extends NodeDef {
+/**
+ * Node definition for JOIN control node.
+ */
+public class JoinNodeDef extends ControlNodeDef {
JoinNodeDef() {
}
- public JoinNodeDef(String name, String transition) {
- super(name, null, JoinNodeHandler.class, Arrays.asList(transition));
- }
-
- public static class JoinNodeHandler extends NodeHandler {
-
- public void loopDetection(Context context) throws WorkflowException {
- String flag = getLoopFlag(context.getNodeDef().getName());
- if (context.getVar(flag) != null) {
- throw new WorkflowException(ErrorCode.E0709, context.getNodeDef().getName());
- }
- String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
- String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
- if (forkCount == null) {
- throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
- }
- int count = Integer.parseInt(forkCount) - 1;
- if (count == 0) {
- context.setVar(flag, "true");
- }
- }
-
- public boolean enter(Context context) throws WorkflowException {
- String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
- String forkCount = context.getVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath);
- if (forkCount == null) {
- throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName());
- }
- int count = Integer.parseInt(forkCount) - 1;
- if (count > 0) {
- context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, "" + count);
- context.deleteExecutionPath();
- }
- else {
- context.setVar(ForkNodeDef.FORK_COUNT_PREFIX + parentExecutionPath, null);
- }
- return (count == 0);
- }
-
- public List<String> multiExit(Context context) {
- String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath());
- // NOW we delete..
- context.deleteExecutionPath();
-
- String transition = context.getNodeDef().getTransitions().get(0);
- String fullTransition = context.createFullTransition(parentExecutionPath, transition);
- List<String> transitions = new ArrayList<String>(1);
- transitions.add(fullTransition);
- return transitions;
- }
-
- public String exit(Context context) {
- throw new UnsupportedOperationException();
- }
-
- public void kill(Context context) {
- }
-
- public void fail(Context context) {
- }
+ public JoinNodeDef(String name, Class<? extends ControlNodeHandler> klass, String transition) {
+ super(name, "", klass, Arrays.asList(transition));
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/KillNodeDef.java Fri Aug 3 04:59:53 2012
@@ -19,33 +19,17 @@ package org.apache.oozie.workflow.lite;
import java.util.Collections;
-//TODO javadoc
-public class KillNodeDef extends NodeDef {
+/**
+ * Node definition for KILL control node.
+ */
+public class KillNodeDef extends ControlNodeDef {
KillNodeDef() {
}
@SuppressWarnings("unchecked")
- public KillNodeDef(String name, String message) {
- super(name, message, KillNodeHandler.class, Collections.EMPTY_LIST);
- }
-
- public static class KillNodeHandler extends NodeHandler {
-
- public boolean enter(Context context) {
- return true;
- }
-
- public String exit(Context context) {
- context.killJob();
- return null;
- }
-
- public void kill(Context context) {
- }
-
- public void fail(Context context) {
- }
+ public KillNodeDef(String name, String message, Class<? extends ControlNodeHandler> klass) {
+ super(name, message, klass, Collections.EMPTY_LIST);
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java Fri Aug 3 04:59:53 2012
@@ -41,8 +41,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
/**
* Class to parse and validate workflow xml
@@ -81,6 +79,7 @@ public class LiteWorkflowAppParser {
public static final String VALIDATE_FORK_JOIN = "oozie.validate.ForkJoin";
private Schema schema;
+ private Class<? extends ControlNodeHandler> controlNodeHandler;
private Class<? extends DecisionNodeHandler> decisionHandlerClass;
private Class<? extends ActionNodeHandler> actionHandlerClass;
@@ -91,9 +90,12 @@ public class LiteWorkflowAppParser {
private List<String> forkList = new ArrayList<String>();
private List<String> joinList = new ArrayList<String>();
- public LiteWorkflowAppParser(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+ public LiteWorkflowAppParser(Schema schema,
+ Class<? extends ControlNodeHandler> controlNodeHandler,
+ Class<? extends DecisionNodeHandler> decisionHandlerClass,
Class<? extends ActionNodeHandler> actionHandlerClass) throws WorkflowException {
this.schema = schema;
+ this.controlNodeHandler = controlNodeHandler;
this.decisionHandlerClass = decisionHandlerClass;
this.actionHandlerClass = actionHandlerClass;
}
@@ -260,15 +262,16 @@ public class LiteWorkflowAppParser {
for (Element eNode : (List<Element>) root.getChildren()) {
if (eNode.getName().equals(START_E)) {
def = new LiteWorkflowApp(root.getAttributeValue(NAME_A), strDef,
- new StartNodeDef(eNode.getAttributeValue(TO_A)));
+ new StartNodeDef(controlNodeHandler, eNode.getAttributeValue(TO_A)));
}
else {
if (eNode.getName().equals(END_E)) {
- def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A)));
+ def.addNode(new EndNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler));
}
else {
if (eNode.getName().equals(KILL_E)) {
- def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A), eNode.getChildText(KILL_MESSAGE_E, ns)));
+ def.addNode(new KillNodeDef(eNode.getAttributeValue(NAME_A),
+ eNode.getChildText(KILL_MESSAGE_E, ns), controlNodeHandler));
}
else {
if (eNode.getName().equals(FORK_E)) {
@@ -276,11 +279,12 @@ public class LiteWorkflowAppParser {
for (Element tran : (List<Element>) eNode.getChildren(FORK_PATH_E, ns)) {
paths.add(tran.getAttributeValue(FORK_START_A));
}
- def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), paths));
+ def.addNode(new ForkNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler, paths));
}
else {
if (eNode.getName().equals(JOIN_E)) {
- def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), eNode.getAttributeValue(TO_A)));
+ def.addNode(new JoinNodeDef(eNode.getAttributeValue(NAME_A), controlNodeHandler,
+ eNode.getAttributeValue(TO_A)));
}
else {
if (eNode.getName().equals(DECISION_E)) {
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java Fri Aug 3 04:59:53 2012
@@ -222,7 +222,10 @@ public class LiteWorkflowInstance implem
// TEST THIS
if (last >= 0) {
String transitionTo = getTransitionNode(fullTransitions.get(last));
-
+ if (nodeDef instanceof ForkNodeDef) {
+ transitionTo = "*"; // WF action cannot hold all transitions for a fork.
+ // transitions are hardcoded in the WF app.
+ }
persistentVars.put(nodeDef.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + TRANSITION_TO,
transitionTo);
}
@@ -299,6 +302,7 @@ public class LiteWorkflowInstance implem
}
else {
List<String> killedNodes = terminateNodes(Status.KILLED);
+
if (killedNodes.size() > 1) {
log.warn(XLog.STD, "Workflow completed [{0}], killing [{1}] running nodes", status, killedNodes
.size());
@@ -428,21 +432,23 @@ public class LiteWorkflowInstance implem
for (Map.Entry<String, NodeInstance> entry : executionPaths.entrySet()) {
if (entry.getValue().started) {
NodeDef nodeDef = def.getNode(entry.getValue().nodeName);
- NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
- try {
- if (endStatus == Status.KILLED) {
- nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
- }
- else {
- if (endStatus == Status.FAILED) {
- nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
+ if (!(nodeDef instanceof ControlNodeDef)) {
+ NodeHandler nodeHandler = newInstance(nodeDef.getHandlerClass());
+ try {
+ if (endStatus == Status.KILLED) {
+ nodeHandler.kill(new Context(nodeDef, entry.getKey(), null));
}
+ else {
+ if (endStatus == Status.FAILED) {
+ nodeHandler.fail(new Context(nodeDef, entry.getKey(), null));
+ }
+ }
+ endNodes.add(nodeDef.getName());
+ }
+ catch (Exception ex) {
+ log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
+ nodeDef.getName(), ex);
}
- endNodes.add(nodeDef.getName());
- }
- catch (Exception ex) {
- log.warn(XLog.STD, "Error Changing node state to [{0}] for Node [{1}]", endStatus.toString(),
- nodeDef.getName(), ex);
}
}
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java Fri Aug 3 04:59:53 2012
@@ -34,12 +34,16 @@ import java.io.StringReader;
//TODO javadoc
public abstract class LiteWorkflowLib implements WorkflowLib {
private Schema schema;
+ private Class<? extends ControlNodeHandler> controlHandlerClass;
private Class<? extends DecisionNodeHandler> decisionHandlerClass;
private Class<? extends ActionNodeHandler> actionHandlerClass;
- public LiteWorkflowLib(Schema schema, Class<? extends DecisionNodeHandler> decisionHandlerClass,
+ public LiteWorkflowLib(Schema schema,
+ Class<? extends ControlNodeHandler> controlNodeHandler,
+ Class<? extends DecisionNodeHandler> decisionHandlerClass,
Class<? extends ActionNodeHandler> actionHandlerClass) {
this.schema = schema;
+ this.controlHandlerClass = controlNodeHandler;
this.decisionHandlerClass = decisionHandlerClass;
this.actionHandlerClass = actionHandlerClass;
}
@@ -47,7 +51,7 @@ public abstract class LiteWorkflowLib im
@Override
public WorkflowApp parseDef(String appXml) throws WorkflowException {
ParamChecker.notEmpty(appXml, "appXml");
- return new LiteWorkflowAppParser(schema, decisionHandlerClass, actionHandlerClass)
+ return new LiteWorkflowAppParser(schema, controlHandlerClass, decisionHandlerClass, actionHandlerClass)
.validateAndParse(new StringReader(appXml));
}
Modified: incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java (original)
+++ incubator/oozie/trunk/core/src/main/java/org/apache/oozie/workflow/lite/StartNodeDef.java Fri Aug 3 04:59:53 2012
@@ -18,22 +18,19 @@
package org.apache.oozie.workflow.lite;
-import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.util.ParamChecker;
-import org.apache.oozie.ErrorCode;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Arrays;
/**
* Workflow lite start node definition.
*/
-public class StartNodeDef extends NodeDef {
+public class StartNodeDef extends ControlNodeDef {
/**
* Reserved name fo the start node. <p/> It is an invalid token, it will never match an application node name.
*/
- public static final String START = "::start::";
+ public static final String START = ":start:";
/**
* Default constructor.
@@ -44,39 +41,11 @@ public class StartNodeDef extends NodeDe
/**
* Create a start node definition.
*
+ * @param klass control node handler class.
* @param transitionTo transition on workflow start.
*/
- public StartNodeDef(String transitionTo) {
- super(START, null, StartNodeHandler.class, createList(ParamChecker.notEmpty(transitionTo, "transitionTo")));
- }
-
- private static List<String> createList(String transition) {
- List<String> list = new ArrayList<String>();
- list.add(transition);
- return list;
- }
-
- /**
- * Start node handler. <p/> It does an immediate transition to the transitionTo node.
- */
- public static class StartNodeHandler extends NodeHandler {
-
- public boolean enter(Context context) throws WorkflowException {
- if (!context.getSignalValue().equals(StartNodeDef.START)) {
- throw new WorkflowException(ErrorCode.E0715, context.getSignalValue());
- }
- return true;
- }
-
- public String exit(Context context) {
- return context.getNodeDef().getTransitions().get(0);
- }
-
- public void kill(Context context) {
- }
-
- public void fail(Context context) {
- }
+ public StartNodeDef(Class<? extends ControlNodeHandler> klass, String transitionTo) {
+ super(START, "", klass, Arrays.asList(ParamChecker.notEmpty(transitionTo, "transitionTo")));
}
}
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagELFunctions.java Fri Aug 3 04:59:53 2012
@@ -19,6 +19,7 @@ package org.apache.oozie;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
@@ -53,7 +54,9 @@ public class TestDagELFunctions extends
conf.set(OozieClient.USER_NAME, "user");
conf.set("a", "A");
LiteWorkflowApp def =
- new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef("end")).addNode(new EndNodeDef("end"));
+ new LiteWorkflowApp("name", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+ addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
WorkflowJobBean wf = new WorkflowJobBean();
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/TestDagEngine.java Fri Aug 3 04:59:53 2012
@@ -127,21 +127,21 @@ public class TestDagEngine extends XTest
assertEquals("CCCC", conf.get("e"));
assertEquals("CCCCCCCC", conf.get("f"));
- waitFor(5000, new Predicate() {
+ waitFor(10000, new Predicate() {
public boolean evaluate() throws Exception {
WorkflowJobBean bean = Services.get().get(WorkflowStoreService.class).create().getWorkflow(jobId1, false);
return bean.getWorkflowInstance().getStatus().isEndState();
}
});
assertEquals(WorkflowJob.Status.KILLED, engine.getJob(jobId1).getStatus());
- waitFor(5000, new Predicate() {
+ waitFor(10000, new Predicate() {
public boolean evaluate() throws Exception {
return CallbackServlet.JOB_ID != null;
}
});
assertEquals(wf.getId(), CallbackServlet.JOB_ID);
- assertEquals("a", CallbackServlet.NODE_NAME);
- assertEquals("T:kill", CallbackServlet.STATUS);
+ assertEquals("kill", CallbackServlet.NODE_NAME);
+ assertEquals("T:null", CallbackServlet.STATUS);
}
public void testJobDefinition() throws Exception {
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/TestActionFailover.java Fri Aug 3 04:59:53 2012
@@ -27,6 +27,8 @@ import java.io.Writer;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
+import org.apache.oozie.action.control.StartActionExecutor;
+import org.apache.oozie.action.hadoop.FsActionExecutor;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.OozieClient;
@@ -87,7 +89,7 @@ public class TestActionFailover extends
return getFileSystem().exists(target);
}
});
- assertTrue(getFileSystem().exists(target));
+ assertFalse(getFileSystem().exists(target));
waitFor(10 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -101,8 +103,10 @@ public class TestActionFailover extends
WorkflowStore store = Services.get().get(WorkflowStoreService.class).create();
List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId1, false);
+ assertEquals(1, actions.size());
WorkflowActionBean action = actions.get(0);
assertEquals(WorkflowAction.Status.PREP, action.getStatus());
+ assertEquals(StartActionExecutor.TYPE, action.getType());
setSystemProperty(FaultInjection.FAULT_INJECTION, "false");
setSystemProperty(SkipCommitFaultInjection.ACTION_FAILOVER_FAULT_INJECTION, "false");
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java Fri Aug 3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.service.CallbackService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
@@ -226,8 +227,10 @@ public abstract class ActionExecutorTest
content += "<end name='end' /></workflow-app>";
writeToFile(content, getAppPath(), "workflow.xml");
- WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef("end"))
- .addNode(new EndNodeDef("end"));
+ WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ "end"))
+ .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
XConfiguration wfConf = new XConfiguration();
wfConf.set(OozieClient.USER_NAME, getTestUser());
wfConf.set(OozieClient.APP_PATH, appUri.toString());
@@ -259,8 +262,9 @@ public abstract class ActionExecutorTest
writeToFile(wfxml, getAppPath(), "workflow.xml");
- WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml, new StartNodeDef("start")).addNode(new EndNodeDef(
- "end"));
+ WorkflowApp app = new LiteWorkflowApp("test-wf-cred", wfxml,
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")).
+ addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
XConfiguration wfConf = new XConfiguration();
wfConf.set(OozieClient.USER_NAME, getTestUser());
wfConf.set(OozieClient.APP_PATH, appUri.toString());
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestFsELFunctions.java Fri Aug 3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.DagELFunctions;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
@@ -74,7 +75,9 @@ public class TestFsELFunctions extends X
conf.set("dir", dir);
LiteWorkflowApp def =
- new LiteWorkflowApp("name", "<workflow-app/>", new StartNodeDef("end")).addNode(new EndNodeDef("end"));
+ new LiteWorkflowApp("name", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
+ addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
LiteWorkflowInstance job = new LiteWorkflowInstance(def, conf, "wfId");
WorkflowJobBean wf = new WorkflowJobBean();
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopELFunctions.java Fri Aug 3 04:59:53 2012
@@ -22,6 +22,7 @@ import org.apache.oozie.WorkflowActionBe
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.UUIDService.ApplicationType;
@@ -49,8 +50,9 @@ public class TestHadoopELFunctions exten
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setProtoActionConf("<configuration/>");
- LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>", new StartNodeDef("a"));
- wfApp.addNode(new EndNodeDef("a"));
+ LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "a"));
+ wfApp.addNode(new EndNodeDef("a", LiteWorkflowStoreService.LiteControlNodeHandler.class));
WorkflowInstance wi = new LiteWorkflowInstance(wfApp, new XConfiguration(), "1");
workflow.setWorkflowInstance(wi);
@@ -93,8 +95,9 @@ public class TestHadoopELFunctions exten
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setProtoActionConf("<configuration/>");
- LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>", new StartNodeDef("a"));
- wfApp.addNode(new EndNodeDef("a"));
+ LiteWorkflowApp wfApp = new LiteWorkflowApp("x", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "a"));
+ wfApp.addNode(new EndNodeDef("a", LiteWorkflowStoreService.LiteControlNodeHandler.class));
WorkflowInstance wi = new LiteWorkflowInstance(wfApp, new XConfiguration(), "1");
workflow.setWorkflowInstance(wi);
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -50,6 +50,7 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
@@ -794,8 +795,9 @@ public class TestJavaActionExecutor exte
}
private WorkflowJobBean addRecordToWfJobTable(String wfId, String wfxml) throws Exception {
- WorkflowApp app = new LiteWorkflowApp("testApp", wfxml, new StartNodeDef("start"))
- .addNode(new EndNodeDef("end"));
+ WorkflowApp app = new LiteWorkflowApp("testApp", wfxml,
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")).
+ addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
Configuration conf = Services.get().get(HadoopAccessorService.class).
createJobConf(new URI(getNameNodeUri()).getAuthority());
conf.set(OozieClient.APP_PATH, getNameNodeUri() + "/testPath");
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java Fri Aug 3 04:59:53 2012
@@ -27,6 +27,7 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.oozie.workflow.lite.NodeHandler;
import java.io.File;
import java.io.StringReader;
@@ -229,7 +230,7 @@ public class TestSubWorkflowActionExecut
writer.close();
XConfiguration protoConf = getBaseProtoConf();
- WorkflowJobBean workflow = createBaseWorkflow(protoConf, "W");
+ final WorkflowJobBean workflow = createBaseWorkflow(protoConf, "W");
String defaultConf = workflow.getConf();
XConfiguration newConf = new XConfiguration(new StringReader(defaultConf));
String actionConf = "<sub-workflow xmlns='uri:oozie:workflow:0.1' name='subwf'>" +
@@ -246,13 +247,20 @@ public class TestSubWorkflowActionExecut
action.setConf(actionConf);
// negative test
- SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor();
+ final SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor();
workflow.setConf(newConf.toXmlString());
subWorkflow.start(new Context(workflow, action), action);
OozieClient oozieClient = subWorkflow.getWorkflowClient(new Context(workflow, action),
SubWorkflowActionExecutor.LOCAL);
+ waitFor(5000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ subWorkflow.check(new Context(workflow, action), action);
+ return action.getStatus() == WorkflowActionBean.Status.DONE;
+ }
+ });
subWorkflow.check(new Context(workflow, action), action);
subWorkflow.end(new Context(workflow, action), action);
@@ -267,7 +275,7 @@ public class TestSubWorkflowActionExecut
// positive test
newConf.set(OozieClient.GROUP_NAME, getTestGroup());
workflow.setConf(newConf.toXmlString());
- WorkflowActionBean action1 = new WorkflowActionBean();
+ final WorkflowActionBean action1 = new WorkflowActionBean();
action1.setConf(actionConf);
action1.setId("W1");
@@ -276,6 +284,14 @@ public class TestSubWorkflowActionExecut
oozieClient = subWorkflow.getWorkflowClient(new Context(workflow, action1),
SubWorkflowActionExecutor.LOCAL);
+ waitFor(5000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ subWorkflow.check(new Context(workflow, action1), action1);
+ return action1.getStatus() == WorkflowActionBean.Status.DONE;
+ }
+ });
+
subWorkflow.check(new Context(workflow, action1), action1);
subWorkflow.end(new Context(workflow, action1), action1);
Modified: incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
URL: http://svn.apache.org/viewvc/incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java?rev=1368795&r1=1368794&r2=1368795&view=diff
==============================================================================
--- incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java (original)
+++ incubator/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java Fri Aug 3 04:59:53 2012
@@ -30,6 +30,7 @@ import org.apache.oozie.DagEngine;
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.KillActionExecutor;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
@@ -278,9 +279,16 @@ public class TestActionErrors extends XD
List<WorkflowActionBean> actions = jpaService.execute(wfActionsGetCmd);
int n = actions.size();
- WorkflowActionBean action = actions.get(n - 1);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
assertEquals("TEST_ERROR", action.getErrorCode());
- assertEquals("[end]", action.getErrorMessage());
+ assertEquals("end", action.getErrorMessage());
assertEquals(WorkflowAction.Status.ERROR, action.getStatus());
}
@@ -456,7 +464,14 @@ public class TestActionErrors extends XD
store.beginTrx();
while (retryCount <= maxRetries) {
List<WorkflowActionBean> actions = store.getActionsForWorkflow(jobId, false);
- WorkflowActionBean action = actions.get(0);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
aId = action.getId();
assertEquals(expectedStatus, action.getStatus());
assertEquals(expectedRetryCount, action.getRetries());
@@ -534,7 +549,7 @@ public class TestActionErrors extends XD
store.commitTrx();
store.closeTrx();
}
-
+
/**
* Provides functionality to test user retry
*
@@ -562,18 +577,31 @@ public class TestActionErrors extends XD
final JPAService jpaService = Services.get().get(JPAService.class);
final WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(jobId);
-
+
final WorkflowActionsGetForJobJPAExecutor actionsGetExecutor = new WorkflowActionsGetForJobJPAExecutor(jobId);
waitFor(5000, new Predicate() {
public boolean evaluate() throws Exception {
- List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
- WorkflowActionBean action = actions.get(0);
- return (action.getUserRetryCount() == 2);
+ List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ return (action != null && action.getUserRetryCount() == 2);
}
});
-
+
List<WorkflowActionBean> actions = jpaService.execute(actionsGetExecutor);
- WorkflowActionBean action = actions.get(0);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
assertEquals(2, action.getUserRetryCount());
}
@@ -620,7 +648,14 @@ public class TestActionErrors extends XD
assertEquals(WorkflowJob.Status.FAILED, engine.getJob(jobId).getStatus());
List<WorkflowActionBean> actions = store2.getActionsForWorkflow(jobId, false);
- WorkflowActionBean action = actions.get(0);
+ WorkflowActionBean action = null;
+ for (WorkflowActionBean bean : actions) {
+ if (bean.getType().equals("test")) {
+ action = bean;
+ break;
+ }
+ }
+ assertNotNull(action);
assertEquals(expActionErrorCode, action.getErrorCode());
store2.commitTrx();
store2.closeTrx();