You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/02/26 01:13:34 UTC

git commit: OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 872db60c8 -> b3b75189e


OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b3b75189
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b3b75189
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b3b75189

Branch: refs/heads/master
Commit: b3b75189e0c1d336d2b3ba7c41a0efea4b8b67de
Parents: 872db60
Author: Robert Kanter <rk...@cloudera.com>
Authored: Tue Feb 25 16:05:00 2014 -0800
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Tue Feb 25 16:05:00 2014 -0800

----------------------------------------------------------------------
 .../command/coord/CoordSubmitXCommand.java      |  17 ++-
 core/src/main/resources/oozie-default.xml       |  10 ++
 .../oozie/TestCoordinatorEngineStreamLog.java   | 109 ++++++++++---------
 .../command/coord/TestCoordSubmitXCommand.java  |  52 +++++++++
 release-log.txt                                 |   1 +
 5 files changed, 134 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index 712fe51..9e6a3d5 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -131,9 +131,10 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
      */
     public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
 
-
     public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
 
+    public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency";
+
     private ELEvaluator evalFreq = null;
     private ELEvaluator evalNofuncs = null;
     private ELEvaluator evalData = null;
@@ -314,9 +315,19 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
             throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
         }
 
-        // Check if a coord job with cron frequency will materialize actions
         try {
-            Integer.parseInt(coordJob.getFrequency());
+            // Check if a coord job with cron frequency will materialize actions
+            int freq = Integer.parseInt(coordJob.getFrequency());
+
+            // Check if the frequency is faster than 5 min if enabled
+            if (Services.get().getConf().getBoolean(CONF_CHECK_MAX_FREQUENCY, true)) {
+                CoordinatorJob.Timeunit unit = coordJob.getTimeUnit();
+                if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) {
+                    throw new IllegalArgumentException("Coordinator job with frequency [" + freq +
+                            "] minutes is faster than allowed maximum of 5 minutes ("
+                            + CONF_CHECK_MAX_FREQUENCY + " is set to true)");
+                }
+            }
         } catch (NumberFormatException e) {
             Date start = coordJob.getStartTime();
             Calendar cal = Calendar.getInstance();

http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 0fd1796..34362aa 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -497,6 +497,16 @@
 		this factor X the total queue size.</description>
 	</property>
 
+    <property>
+        <name>oozie.service.coord.check.maximum.frequency</name>
+        <value>true</value>
+        <description>
+            When true, Oozie will reject any coordinators with a frequency faster than 5 minutes.  It is not recommended to disable
+            this check or submit coordinators with frequencies faster than 5 minutes: doing so can cause unintended behavior and
+            additional system stress.
+        </description>
+    </property>
+
 	<!-- ELService -->
     <!--  List of supported groups for ELService -->
 	<property>

http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index 5a1cb9b..b4f161a 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -233,64 +233,69 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
     }
 
     private String runJobsImpl(final CoordinatorEngine ce, int count) throws Exception {
-        services.setService(DummyXLogStreamingService.class);
-        // need to re-define the parameters that are cleared upon the service
-        // reset:
-        new DagXLogInfoService().init(services);
-
-        Configuration conf = new XConfiguration();
-
-        final String appPath = getTestCaseFileUri("coordinator.xml");
-        final long now = System.currentTimeMillis();
-        final String start = DateUtils.formatDateOozieTZ(new Date(now));
-        long e = now + 1000 * 60 * count;
-        final String end = DateUtils.formatDateOozieTZ(new Date(e));
-
-        String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1);
-        writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
-
-        String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start
-                + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> "
-                + "  <timeout>1</timeout> " + "  <concurrency>1</concurrency> " + "  <execution>LIFO</execution> "
-                + "</controls> " + "<action> " + "  <workflow> " + "  <app-path>" + getFsTestCaseDir()
-                + "/workflow.xml</app-path>"
-                + "  <configuration> <property> <name>inputA</name> <value>valueA</value> </property> "
-                + "  <property> <name>inputB</name> <value>valueB</value> " + "  </property></configuration> "
-                + "</workflow>" + "</action> " + "</coordinator-app>";
-        writeToFile(appXml, appPath);
-        conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
-        conf.set(OozieClient.USER_NAME, getTestUser());
-
-        final String jobId = ce.submitJob(conf, true);
-        waitFor(1000 * 60 * count, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                try {
-                    List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
-                    if (actions.size() < 1) {
-                        return false;
-                    }
-                    for (CoordinatorAction action : actions) {
-                        CoordinatorAction.Status actionStatus = action.getStatus();
-                        if (actionStatus != CoordinatorAction.Status.SUCCEEDED) {
+        try {
+            Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false);
+            services.setService(DummyXLogStreamingService.class);
+            // need to re-define the parameters that are cleared upon the service
+            // reset:
+            new DagXLogInfoService().init(services);
+
+            Configuration conf = new XConfiguration();
+
+            final String appPath = getTestCaseFileUri("coordinator.xml");
+            final long now = System.currentTimeMillis();
+            final String start = DateUtils.formatDateOozieTZ(new Date(now));
+            long e = now + 1000 * 60 * count;
+            final String end = DateUtils.formatDateOozieTZ(new Date(e));
+
+            String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1);
+            writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+
+            String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start
+                    + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> "
+                    + "  <timeout>1</timeout> " + "  <concurrency>1</concurrency> " + "  <execution>LIFO</execution> "
+                    + "</controls> " + "<action> " + "  <workflow> " + "  <app-path>" + getFsTestCaseDir()
+                    + "/workflow.xml</app-path>"
+                    + "  <configuration> <property> <name>inputA</name> <value>valueA</value> </property> "
+                    + "  <property> <name>inputB</name> <value>valueB</value> " + "  </property></configuration> "
+                    + "</workflow>" + "</action> " + "</coordinator-app>";
+            writeToFile(appXml, appPath);
+            conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+            conf.set(OozieClient.USER_NAME, getTestUser());
+
+            final String jobId = ce.submitJob(conf, true);
+            waitFor(1000 * 60 * count, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    try {
+                        List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
+                        if (actions.size() < 1) {
                             return false;
                         }
+                        for (CoordinatorAction action : actions) {
+                            CoordinatorAction.Status actionStatus = action.getStatus();
+                            if (actionStatus != CoordinatorAction.Status.SUCCEEDED) {
+                                return false;
+                            }
+                        }
+                        return true;
+                    }
+                    catch (Exception ex) {
+                        ex.printStackTrace();
+                        return false;
                     }
-                    return true;
-                }
-                catch (Exception ex) {
-                    ex.printStackTrace();
-                    return false;
                 }
+            });
+            // Assert all the actions are succeeded (useful for waitFor() timeout
+            // case):
+            final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
+            for (CoordinatorAction action : actions) {
+                assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
             }
-        });
-        // Assert all the actions are succeeded (useful for waitFor() timeout
-        // case):
-        final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
-        for (CoordinatorAction action : actions) {
-            assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
+            return jobId;
+        } finally {
+            Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true);
         }
-        return jobId;
     }
 
     private void writeToFile(String content, Path appPath, String fileName) throws IOException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
index 88756f6..8dc0c44 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
@@ -1066,6 +1066,58 @@ public class TestCoordSubmitXCommand extends XDataTestCase {
                 , job.getJobXml().contains(URI_TEMPLATE_INCLUDE_XML));
     }
 
+    /**
+     * Frequency faster/slower than than maximum
+     *
+     * @throws Exception
+     */
+    public void testCheckMaximumFrequency() throws Exception {
+        assertTrue(Services.get().getConf().getBoolean("oozie.service.coord.check.maximum.frequency", false));
+        _testCheckMaximumFrequencyHelper("5");
+        _testCheckMaximumFrequencyHelper("10");
+        _testCheckMaximumFrequencyHelper("${coord:hours(2)}");
+        _testCheckMaximumFrequencyHelper("${coord:days(3)}");
+        _testCheckMaximumFrequencyHelper("${coord:months(4)}");
+        try {
+            _testCheckMaximumFrequencyHelper("3");
+            fail();
+        } catch (CommandException ce) {
+            assertEquals(ErrorCode.E1003, ce.getErrorCode());
+            assertTrue(ce.getMessage().contains("Coordinator job with frequency [3] minutes is faster than allowed maximum of 5 "
+                    + "minutes"));
+        }
+        try {
+            Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false);
+            _testCheckMaximumFrequencyHelper("5");
+            _testCheckMaximumFrequencyHelper("10");
+            _testCheckMaximumFrequencyHelper("${coord:hours(2)}");
+            _testCheckMaximumFrequencyHelper("${coord:days(3)}");
+            _testCheckMaximumFrequencyHelper("${coord:months(4)}");
+            _testCheckMaximumFrequencyHelper("3");
+        } finally {
+            Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true);
+        }
+    }
+
+    private void _testCheckMaximumFrequencyHelper(String freq) throws Exception {
+        Configuration conf = new XConfiguration();
+        File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+        String appXml = "<coordinator-app name=\"NAME\" frequency=\"" + freq + "\" start=\"2009-02-01T01:00Z\" "
+                + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+                + "xmlns=\"uri:oozie:coordinator:0.2\"> "
+                + "<action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> "
+                + "<configuration> <property> <name>inputA</name> <value>blah</value> </property> "
+                + "</configuration> </workflow> </action> </coordinator-app>";
+        writeToFile(appXml, appPathFile);
+        conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+        String jobId = sc.call();
+
+        assertEquals(jobId.substring(jobId.length() - 2), "-C");
+        checkCoordJobs(jobId);
+    }
+
     private void _testConfigDefaults(boolean withDefaults) throws Exception {
         Configuration conf = new XConfiguration();
         File appPathFile = new File(getTestCaseDir(), "coordinator.xml");

http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fa0540f..be20154 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter)
 OOZIE-1699 Some of the commands submitted to Oozie internal queue are never executed (sriksun via virag)
 OOZIE-1671 add an option to limit # of coordinator actions for log retrieval (ryota)
 OOZIE-1629 EL function in <timeout> is not evaluated properly (ryota)