You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/03/14 00:51:54 UTC
svn commit: r1456281 - in /oozie/trunk: ./
client/src/main/java/org/apache/oozie/client/
core/src/main/java/org/apache/oozie/command/wf/
core/src/main/java/org/apache/oozie/util/
core/src/test/java/org/apache/oozie/command/wf/ core/src/test/resources/ ...
Author: rkanter
Date: Wed Mar 13 23:51:54 2013
New Revision: 1456281
URL: http://svn.apache.org/r1456281
Log:
OOZIE-1245 Add ability to automatically suspend workflow at specified actions (rkanter)
Added:
oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
oozie/trunk/core/src/test/resources/wf-suspendpoints.xml
Modified:
oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java
oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
oozie/trunk/release-log.txt
Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Mar 13 23:51:54 2013
@@ -140,6 +140,8 @@ public class OozieClient {
public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
+ public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
+
public static enum SYSTEM_MODE {
NORMAL, NOWEBSERVICE, SAFEMODE
};
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Mar 13 23:51:54 2013
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.command.wf;
+import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -60,6 +61,7 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
+import org.apache.oozie.client.OozieClient;
public class SignalXCommand extends WorkflowXCommand<Void> {
@@ -288,6 +290,7 @@ public class SignalXCommand extends Work
queue(new SignalXCommand(jobId, oldAction.getId()));
}
else {
+ checkForSuspendNode(newAction);
newAction.setPending();
String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
.getDefinition(), wfJob.getConf());
@@ -392,4 +395,30 @@ public class SignalXCommand extends Work
}
+ private void checkForSuspendNode(WorkflowActionBean newAction) {
+ try {
+ XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
+ String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
+ if (values != null) {
+ if (values.length == 1 && values[0].equals("*")) {
+ LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId());
+ queue(new SuspendXCommand(jobId));
+ }
+ else {
+ for (String suspendPoint : values) {
+ if (suspendPoint.equals(newAction.getName())) {
+ LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
+ wfJob.getId());
+ queue(new SuspendXCommand(jobId));
+ break;
+ }
+ }
+ }
+ }
+ }
+ catch (IOException ex) {
+ LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
+ }
+ }
+
}
Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java Wed Mar 13 23:51:54 2013
@@ -355,4 +355,20 @@ public class XConfiguration extends Conf
return xml;
}
+ /**
+ * Get the comma delimited values of the name property as an array of trimmed Strings. If no such property is specified then
+ * null is returned.
+ *
+ * @param name property name.
+ * @return property value as an array of trimmed Strings, or null.
+ */
+ public String[] getTrimmedStrings(String name) {
+ String[] values = getStrings(name);
+ if (values != null) {
+ for (int i = 0; i < values.length; i++) {
+ values[i] = values[i].trim();
+ }
+ }
+ return values;
+ }
}
Added: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java?rev=1456281&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java Wed Mar 13 23:51:54 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.Properties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.IOUtils;
+
+public class TestSignalXCommand extends XDataTestCase {
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ LocalOozie.start();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ LocalOozie.stop();
+ super.tearDown();
+ }
+
+ public void testSuspendPoints() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path appPath = new Path(getFsTestCaseDir(), "app");
+ fs.mkdirs(appPath);
+ Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
+ Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")));
+ IOUtils.copyCharStream(reader, writer);
+ writer.close();
+ reader.close();
+
+ final OozieClient oc = LocalOozie.getClient();
+
+ Properties conf = oc.createConfiguration();
+ conf.setProperty(OozieClient.APP_PATH, appPath.toString() + File.separator + "workflow.xml");
+ conf.setProperty(OozieClient.USER_NAME, getTestUser());
+ conf.setProperty("oozie.suspend.on.nodes", "action1,nonexistant_action_name,decision1, action3,join1 ,fork1,action4b");
+
+ final String jobId = oc.submit(conf);
+ assertNotNull(jobId);
+
+ WorkflowJob wf = oc.getJobInfo(jobId);
+ assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
+
+ oc.start(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action1"},
+ new String[]{":start:"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"decision1"},
+ new String[]{":start:", "action1", "action2"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action3"},
+ new String[]{":start:", "action1", "action2", "decision1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"fork1"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action4a", "action4b", "action4c"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"join1"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
+ new String[]{},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+ "join1", "end"});
+ }
+
+ public void testSuspendPointsAll() throws Exception {
+ FileSystem fs = getFileSystem();
+ Path appPath = new Path(getFsTestCaseDir(), "app");
+ fs.mkdirs(appPath);
+ Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
+ Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")));
+ IOUtils.copyCharStream(reader, writer);
+ writer.close();
+ reader.close();
+
+ final OozieClient oc = LocalOozie.getClient();
+
+ Properties conf = oc.createConfiguration();
+ conf.setProperty(OozieClient.APP_PATH, appPath.toString() + File.separator + "workflow.xml");
+ conf.setProperty(OozieClient.USER_NAME, getTestUser());
+ conf.setProperty("oozie.suspend.on.nodes", "*");
+
+ final String jobId = oc.submit(conf);
+ assertNotNull(jobId);
+
+ WorkflowJob wf = oc.getJobInfo(jobId);
+ assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
+
+ oc.start(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{":start:"},
+ new String[]{});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action1"},
+ new String[]{":start:"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action2"},
+ new String[]{":start:", "action1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"decision1"},
+ new String[]{":start:", "action1", "action2"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action3"},
+ new String[]{":start:", "action1", "action2", "decision1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"fork1"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"action4a", "action4b", "action4c"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"join1"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+ new String[]{"end"},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+ "join1"});
+
+ oc.resume(jobId);
+ checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
+ new String[]{},
+ new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+ "join1", "end"});
+ }
+
+ private void checkSuspendActions(WorkflowJob wf, final OozieClient oc, final String jobId, final WorkflowJob.Status status,
+ String[] prepActions, String[] okActions) throws Exception {
+ // Wait for the WF to transition to status
+ waitFor(30 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ WorkflowJob wf = oc.getJobInfo(jobId);
+ return wf.getStatus() == status;
+ }
+ });
+ wf = oc.getJobInfo(jobId);
+ assertEquals(status, wf.getStatus());
+
+ // Check the actions' statuses
+ int numPrep = 0;
+ int numOK = 0;
+ for (WorkflowAction action : wf.getActions()) {
+ boolean checked = false;
+ for (String name : prepActions) {
+ if (!checked && name.equals(action.getName())) {
+ assertEquals("action [" + action.getName() + "] had incorrect status",
+ WorkflowAction.Status.PREP, action.getStatus());
+ numPrep++;
+ checked = true;
+ }
+ }
+ if (!checked) {
+ for (String name : okActions) {
+ if (!checked && name.equals(action.getName())) {
+ assertEquals("action [" + action.getName() + "] had incorrect status",
+ WorkflowAction.Status.OK, action.getStatus());
+ numOK++;
+ checked = true;
+ }
+ }
+ }
+ if (!checked) {
+ fail("Unexpected action [" + action.getName() + "] with status [" + action.getStatus() + "]");
+ }
+ }
+ assertEquals(prepActions.length, numPrep);
+ assertEquals(okActions.length, numOK);
+ }
+}
Added: oozie/trunk/core/src/test/resources/wf-suspendpoints.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/resources/wf-suspendpoints.xml?rev=1456281&view=auto
==============================================================================
--- oozie/trunk/core/src/test/resources/wf-suspendpoints.xml (added)
+++ oozie/trunk/core/src/test/resources/wf-suspendpoints.xml Wed Mar 13 23:51:54 2013
@@ -0,0 +1,72 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf-suspendpoints">
+ <start to="action1"/>
+ <action name="action1">
+ <fs>
+ </fs>
+ <ok to="action2"/>
+ <error to="kill"/>
+ </action>
+ <action name="action2">
+ <fs>
+ </fs>
+ <ok to="decision1"/>
+ <error to="kill"/>
+ </action>
+ <decision name="decision1">
+ <switch>
+ <case to="action3">true</case>
+ <default to="kill"/>
+ </switch>
+ </decision>
+ <action name="action3">
+ <fs>
+ </fs>
+ <ok to="fork1"/>
+ <error to="kill"/>
+ </action>
+ <fork name="fork1">
+ <path start="action4a"/>
+ <path start="action4b"/>
+ <path start="action4c"/>
+ </fork>
+ <action name="action4a">
+ <fs>
+ </fs>
+ <ok to="join1"/>
+ <error to="kill"/>
+ </action>
+ <action name="action4b">
+ <fs>
+ </fs>
+ <ok to="join1"/>
+ <error to="kill"/>
+ </action>
+ <action name="action4c">
+ <fs>
+ </fs>
+ <ok to="join1"/>
+ <error to="kill"/>
+ </action>
+ <join name="join1" to="end"/>
+ <kill name="kill">
+ <message>killed</message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
Modified: oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Wed Mar 13 23:51:54 2013
@@ -2367,6 +2367,21 @@ Example of a global element:
...
</verbatim>
+---++ 20 Suspend On Nodes
+
+Specifying =oozie.suspend.on.nodes= in a job.properties file lets users specify a list of actions that will cause Oozie to
+automatically suspend the workflow upon reaching any of those actions; like breakpoints in a debugger. To continue running the
+workflow, the user simply uses the
+[[DG_CommandLineTool#Resuming_a_Workflow_Coordinator_or_Bundle_Job][-resume command from the Oozie CLI]]. Specifying a * will
+cause Oozie to suspend the workflow on all nodes.
+
+For example:
+<verbatim>
+oozie.suspend.on.nodes=mr-node,my-pig-action,my-fork
+</verbatim>
+Specifying the above in a job.properties file will cause Oozie to suspend the workflow when any of those three actions are about
+to be executed.
+
---++ Appendixes
Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Mar 13 23:51:54 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1245 Add ability to automatically suspend workflow at specified actions (rkanter)
OOZIE-894 support for hive in Oozie CLI (bowenzhangusa via tucu)
OOZIE-1239 Bump up trunk to 4.1.0-SNAPSHOT (virag)