You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2013/03/14 00:51:54 UTC

svn commit: r1456281 - in /oozie/trunk: ./ client/src/main/java/org/apache/oozie/client/ core/src/main/java/org/apache/oozie/command/wf/ core/src/main/java/org/apache/oozie/util/ core/src/test/java/org/apache/oozie/command/wf/ core/src/test/resources/ ...

Author: rkanter
Date: Wed Mar 13 23:51:54 2013
New Revision: 1456281

URL: http://svn.apache.org/r1456281
Log:
OOZIE-1245 Add ability to automatically suspend workflow at specified actions (rkanter)

Added:
    oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
    oozie/trunk/core/src/test/resources/wf-suspendpoints.xml
Modified:
    oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
    oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java
    oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
    oozie/trunk/release-log.txt

Modified: oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java
URL: http://svn.apache.org/viewvc/oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java (original)
+++ oozie/trunk/client/src/main/java/org/apache/oozie/client/OozieClient.java Wed Mar 13 23:51:54 2013
@@ -140,6 +140,8 @@ public class OozieClient {
 
     public static final String USE_SYSTEM_LIBPATH = "oozie.use.system.libpath";
 
+    public static final String OOZIE_SUSPEND_ON_NODES = "oozie.suspend.on.nodes";
+
     public static enum SYSTEM_MODE {
         NORMAL, NOWEBSERVICE, SAFEMODE
     };

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java Wed Mar 13 23:51:54 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.oozie.command.wf;
 
+import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
@@ -60,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import org.apache.oozie.client.OozieClient;
 
 public class SignalXCommand extends WorkflowXCommand<Void> {
 
@@ -288,6 +290,7 @@ public class SignalXCommand extends Work
                         queue(new SignalXCommand(jobId, oldAction.getId()));
                     }
                     else {
+                        checkForSuspendNode(newAction);
                         newAction.setPending();
                         String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
                                 .getDefinition(), wfJob.getConf());
@@ -392,4 +395,30 @@ public class SignalXCommand extends Work
 
     }
 
+    private void checkForSuspendNode(WorkflowActionBean newAction) {
+        try {
+            XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
+            String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
+            if (values != null) {
+                if (values.length == 1 && values[0].equals("*")) {
+                    LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(), wfJob.getId());
+                    queue(new SuspendXCommand(jobId));
+                }
+                else {
+                    for (String suspendPoint : values) {
+                        if (suspendPoint.equals(newAction.getName())) {
+                            LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
+                                    wfJob.getId());
+                            queue(new SuspendXCommand(jobId));
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+        catch (IOException ex) {
+            LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
+        }
+    }
+
 }

Modified: oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java (original)
+++ oozie/trunk/core/src/main/java/org/apache/oozie/util/XConfiguration.java Wed Mar 13 23:51:54 2013
@@ -355,4 +355,20 @@ public class XConfiguration extends Conf
         return xml;
     }
 
+    /**
+     * Get the comma delimited values of the name property as an array of trimmed Strings. If no such property is specified then
+     * null is returned.
+     *
+     * @param name property name.
+     * @return property value as an array of trimmed Strings, or null.
+     */
+    public String[] getTrimmedStrings(String name) {
+        String[] values = getStrings(name);
+        if (values != null) {
+            for (int i = 0; i < values.length; i++) {
+                values[i] = values[i].trim();
+            }
+        }
+        return values;
+    }
 }

Added: oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java?rev=1456281&view=auto
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java (added)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/command/wf/TestSignalXCommand.java Wed Mar 13 23:51:54 2013
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.command.wf;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.Reader;
+import java.io.Writer;
+import java.util.Properties;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.IOUtils;
+
+public class TestSignalXCommand extends XDataTestCase {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        LocalOozie.start();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        LocalOozie.stop();
+        super.tearDown();
+    }
+
+    public void testSuspendPoints() throws Exception {
+        FileSystem fs = getFileSystem();
+        Path appPath = new Path(getFsTestCaseDir(), "app");
+        fs.mkdirs(appPath);
+        Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
+        Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")));
+        IOUtils.copyCharStream(reader, writer);
+        writer.close();
+        reader.close();
+
+        final OozieClient oc = LocalOozie.getClient();
+
+        Properties conf = oc.createConfiguration();
+        conf.setProperty(OozieClient.APP_PATH, appPath.toString() + File.separator + "workflow.xml");
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("oozie.suspend.on.nodes", "action1,nonexistant_action_name,decision1, action3,join1 ,fork1,action4b");
+
+        final String jobId = oc.submit(conf);
+        assertNotNull(jobId);
+
+        WorkflowJob wf = oc.getJobInfo(jobId);
+        assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
+
+        oc.start(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action1"},
+                new String[]{":start:"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"decision1"},
+                new String[]{":start:", "action1", "action2"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action3"},
+                new String[]{":start:", "action1", "action2", "decision1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"fork1"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action4a", "action4b", "action4c"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"join1"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
+                new String[]{},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+                             "join1", "end"});
+    }
+
+    public void testSuspendPointsAll() throws Exception {
+        FileSystem fs = getFileSystem();
+        Path appPath = new Path(getFsTestCaseDir(), "app");
+        fs.mkdirs(appPath);
+        Reader reader = IOUtils.getResourceAsReader("wf-suspendpoints.xml", -1);
+        Writer writer = new OutputStreamWriter(fs.create(new Path(appPath, "workflow.xml")));
+        IOUtils.copyCharStream(reader, writer);
+        writer.close();
+        reader.close();
+
+        final OozieClient oc = LocalOozie.getClient();
+
+        Properties conf = oc.createConfiguration();
+        conf.setProperty(OozieClient.APP_PATH, appPath.toString() + File.separator + "workflow.xml");
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("oozie.suspend.on.nodes", "*");
+
+        final String jobId = oc.submit(conf);
+        assertNotNull(jobId);
+
+        WorkflowJob wf = oc.getJobInfo(jobId);
+        assertEquals(WorkflowJob.Status.PREP, wf.getStatus());
+
+        oc.start(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{":start:"},
+                new String[]{});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action1"},
+                new String[]{":start:"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action2"},
+                new String[]{":start:", "action1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"decision1"},
+                new String[]{":start:", "action1", "action2"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action3"},
+                new String[]{":start:", "action1", "action2", "decision1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"fork1"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"action4a", "action4b", "action4c"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"join1"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUSPENDED,
+                new String[]{"end"},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+                             "join1"});
+
+        oc.resume(jobId);
+        checkSuspendActions(wf, oc, jobId, WorkflowJob.Status.SUCCEEDED,
+                new String[]{},
+                new String[]{":start:", "action1", "action2", "decision1", "action3", "fork1", "action4a", "action4b", "action4c",
+                             "join1", "end"});
+    }
+
+    private void checkSuspendActions(WorkflowJob wf, final OozieClient oc, final String jobId, final WorkflowJob.Status status,
+            String[] prepActions, String[] okActions) throws Exception {
+        // Wait for the WF to transition to status
+        waitFor(30 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                WorkflowJob wf = oc.getJobInfo(jobId);
+                return wf.getStatus() == status;
+            }
+        });
+        wf = oc.getJobInfo(jobId);
+        assertEquals(status, wf.getStatus());
+
+        // Check the actions' statuses
+        int numPrep = 0;
+        int numOK = 0;
+        for (WorkflowAction action : wf.getActions()) {
+            boolean checked = false;
+            for (String name : prepActions) {
+                if (!checked && name.equals(action.getName())) {
+                    assertEquals("action [" + action.getName() + "] had incorrect status",
+                            WorkflowAction.Status.PREP, action.getStatus());
+                    numPrep++;
+                    checked = true;
+                }
+            }
+            if (!checked) {
+                for (String name : okActions) {
+                    if (!checked && name.equals(action.getName())) {
+                        assertEquals("action [" + action.getName() + "] had incorrect status",
+                                WorkflowAction.Status.OK, action.getStatus());
+                    numOK++;
+                    checked = true;
+                    }
+                }
+            }
+            if (!checked) {
+                fail("Unexpected action [" + action.getName() + "] with status [" + action.getStatus() + "]");
+            }
+        }
+        assertEquals(prepActions.length, numPrep);
+        assertEquals(okActions.length, numOK);
+    }
+}

Added: oozie/trunk/core/src/test/resources/wf-suspendpoints.xml
URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/resources/wf-suspendpoints.xml?rev=1456281&view=auto
==============================================================================
--- oozie/trunk/core/src/test/resources/wf-suspendpoints.xml (added)
+++ oozie/trunk/core/src/test/resources/wf-suspendpoints.xml Wed Mar 13 23:51:54 2013
@@ -0,0 +1,72 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf-suspendpoints">
+    <start to="action1"/>
+    <action name="action1">
+        <fs>
+        </fs>
+        <ok to="action2"/>
+        <error to="kill"/>
+    </action>
+    <action name="action2">
+        <fs>
+        </fs>
+        <ok to="decision1"/>
+        <error to="kill"/>
+    </action>
+    <decision name="decision1">
+        <switch>
+            <case to="action3">true</case>
+            <default to="kill"/>
+        </switch>
+    </decision>
+    <action name="action3">
+        <fs>
+        </fs>
+        <ok to="fork1"/>
+        <error to="kill"/>
+    </action>
+    <fork name="fork1">
+        <path start="action4a"/>
+        <path start="action4b"/>
+        <path start="action4c"/>
+    </fork>
+    <action name="action4a">
+        <fs>
+        </fs>
+        <ok to="join1"/>
+        <error to="kill"/>
+    </action>
+    <action name="action4b">
+        <fs>
+        </fs>
+        <ok to="join1"/>
+        <error to="kill"/>
+    </action>
+    <action name="action4c">
+        <fs>
+        </fs>
+        <ok to="join1"/>
+        <error to="kill"/>
+    </action>
+    <join name="join1" to="end"/>
+    <kill name="kill">
+        <message>killed</message>
+    </kill>
+    <end name="end"/>
+</workflow-app>

Modified: oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki
URL: http://svn.apache.org/viewvc/oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki (original)
+++ oozie/trunk/docs/src/site/twiki/WorkflowFunctionalSpec.twiki Wed Mar 13 23:51:54 2013
@@ -2367,6 +2367,21 @@ Example of a global element:
 ...
 </verbatim>
 
+---++ 20 Suspend On Nodes
+
+Specifying =oozie.suspend.on.nodes= in a job.properties file lets users specify a list of actions that will cause Oozie to
+automatically suspend the workflow upon reaching any of those actions; like breakpoints in a debugger. To continue running the
+workflow, the user simply uses the
+[[DG_CommandLineTool#Resuming_a_Workflow_Coordinator_or_Bundle_Job][-resume command from the Oozie CLI]]. Specifying a * will
+cause Oozie to suspend the workflow on all nodes.
+
+For example:
+<verbatim>
+oozie.suspend.on.nodes=mr-node,my-pig-action,my-fork
+</verbatim>
+Specifying the above in a job.properties file will cause Oozie to suspend the workflow when any of those three actions are about
+to be executed.
+
 
 ---++ Appendixes
 

Modified: oozie/trunk/release-log.txt
URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1456281&r1=1456280&r2=1456281&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Mar 13 23:51:54 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1245 Add ability to automatically suspend workflow at specified actions (rkanter)
 OOZIE-894 support for hive in Oozie CLI (bowenzhangusa via tucu)
 OOZIE-1239 Bump up trunk to 4.1.0-SNAPSHOT (virag)