You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2014/02/26 01:13:34 UTC
git commit: OOZIE-1680 Add a check for a maximum frequency of 5 min
on Coord jobs (rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 872db60c8 -> b3b75189e
OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b3b75189
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b3b75189
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b3b75189
Branch: refs/heads/master
Commit: b3b75189e0c1d336d2b3ba7c41a0efea4b8b67de
Parents: 872db60
Author: Robert Kanter <rk...@cloudera.com>
Authored: Tue Feb 25 16:05:00 2014 -0800
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Tue Feb 25 16:05:00 2014 -0800
----------------------------------------------------------------------
.../command/coord/CoordSubmitXCommand.java | 17 ++-
core/src/main/resources/oozie-default.xml | 10 ++
.../oozie/TestCoordinatorEngineStreamLog.java | 109 ++++++++++---------
.../command/coord/TestCoordSubmitXCommand.java | 52 +++++++++
release-log.txt | 1 +
5 files changed, 134 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index 712fe51..9e6a3d5 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -131,9 +131,10 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
*/
public static final String CONF_DEFAULT_MAX_TIMEOUT = Service.CONF_PREFIX + "coord.default.max.timeout";
-
public static final String CONF_QUEUE_SIZE = Service.CONF_PREFIX + "CallableQueueService.queue.size";
+ public static final String CONF_CHECK_MAX_FREQUENCY = Service.CONF_PREFIX + "coord.check.maximum.frequency";
+
private ELEvaluator evalFreq = null;
private ELEvaluator evalNofuncs = null;
private ELEvaluator evalData = null;
@@ -314,9 +315,19 @@ public class CoordSubmitXCommand extends SubmitTransitionXCommand {
throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
}
- // Check if a coord job with cron frequency will materialize actions
try {
- Integer.parseInt(coordJob.getFrequency());
+ // Check if a coord job with cron frequency will materialize actions
+ int freq = Integer.parseInt(coordJob.getFrequency());
+
+ // Check if the frequency is faster than 5 min if enabled
+ if (Services.get().getConf().getBoolean(CONF_CHECK_MAX_FREQUENCY, true)) {
+ CoordinatorJob.Timeunit unit = coordJob.getTimeUnit();
+ if (freq == 0 || (freq < 5 && unit == CoordinatorJob.Timeunit.MINUTE)) {
+ throw new IllegalArgumentException("Coordinator job with frequency [" + freq +
+ "] minutes is faster than allowed maximum of 5 minutes ("
+ + CONF_CHECK_MAX_FREQUENCY + " is set to true)");
+ }
+ }
} catch (NumberFormatException e) {
Date start = coordJob.getStartTime();
Calendar cal = Calendar.getInstance();
http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 0fd1796..34362aa 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -497,6 +497,16 @@
this factor X the total queue size.</description>
</property>
+ <property>
+ <name>oozie.service.coord.check.maximum.frequency</name>
+ <value>true</value>
+ <description>
+ When true, Oozie will reject any coordinators with a frequency faster than 5 minutes. It is not recommended to disable
+ this check or submit coordinators with frequencies faster than 5 minutes: doing so can cause unintended behavior and
+ additional system stress.
+ </description>
+ </property>
+
<!-- ELService -->
<!-- List of supported groups for ELService -->
<property>
http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
index 5a1cb9b..b4f161a 100644
--- a/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
+++ b/core/src/test/java/org/apache/oozie/TestCoordinatorEngineStreamLog.java
@@ -233,64 +233,69 @@ public class TestCoordinatorEngineStreamLog extends XFsTestCase {
}
private String runJobsImpl(final CoordinatorEngine ce, int count) throws Exception {
- services.setService(DummyXLogStreamingService.class);
- // need to re-define the parameters that are cleared upon the service
- // reset:
- new DagXLogInfoService().init(services);
-
- Configuration conf = new XConfiguration();
-
- final String appPath = getTestCaseFileUri("coordinator.xml");
- final long now = System.currentTimeMillis();
- final String start = DateUtils.formatDateOozieTZ(new Date(now));
- long e = now + 1000 * 60 * count;
- final String end = DateUtils.formatDateOozieTZ(new Date(e));
-
- String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1);
- writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
-
- String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start
- + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> "
- + " <timeout>1</timeout> " + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> "
- + "</controls> " + "<action> " + " <workflow> " + " <app-path>" + getFsTestCaseDir()
- + "/workflow.xml</app-path>"
- + " <configuration> <property> <name>inputA</name> <value>valueA</value> </property> "
- + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> "
- + "</workflow>" + "</action> " + "</coordinator-app>";
- writeToFile(appXml, appPath);
- conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
- conf.set(OozieClient.USER_NAME, getTestUser());
-
- final String jobId = ce.submitJob(conf, true);
- waitFor(1000 * 60 * count, new Predicate() {
- @Override
- public boolean evaluate() throws Exception {
- try {
- List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
- if (actions.size() < 1) {
- return false;
- }
- for (CoordinatorAction action : actions) {
- CoordinatorAction.Status actionStatus = action.getStatus();
- if (actionStatus != CoordinatorAction.Status.SUCCEEDED) {
+ try {
+ Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false);
+ services.setService(DummyXLogStreamingService.class);
+ // need to re-define the parameters that are cleared upon the service
+ // reset:
+ new DagXLogInfoService().init(services);
+
+ Configuration conf = new XConfiguration();
+
+ final String appPath = getTestCaseFileUri("coordinator.xml");
+ final long now = System.currentTimeMillis();
+ final String start = DateUtils.formatDateOozieTZ(new Date(now));
+ long e = now + 1000 * 60 * count;
+ final String end = DateUtils.formatDateOozieTZ(new Date(e));
+
+ String wfXml = IOUtils.getResourceAsString("wf-no-op.xml", -1);
+ writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
+
+ String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(1)}\" start=\"" + start
+ + "\" end=\"" + end + "\" timezone=\"UTC\" " + "xmlns=\"uri:oozie:coordinator:0.1\"> " + "<controls> "
+ + " <timeout>1</timeout> " + " <concurrency>1</concurrency> " + " <execution>LIFO</execution> "
+ + "</controls> " + "<action> " + " <workflow> " + " <app-path>" + getFsTestCaseDir()
+ + "/workflow.xml</app-path>"
+ + " <configuration> <property> <name>inputA</name> <value>valueA</value> </property> "
+ + " <property> <name>inputB</name> <value>valueB</value> " + " </property></configuration> "
+ + "</workflow>" + "</action> " + "</coordinator-app>";
+ writeToFile(appXml, appPath);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+
+ final String jobId = ce.submitJob(conf, true);
+ waitFor(1000 * 60 * count, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ try {
+ List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
+ if (actions.size() < 1) {
return false;
}
+ for (CoordinatorAction action : actions) {
+ CoordinatorAction.Status actionStatus = action.getStatus();
+ if (actionStatus != CoordinatorAction.Status.SUCCEEDED) {
+ return false;
+ }
+ }
+ return true;
+ }
+ catch (Exception ex) {
+ ex.printStackTrace();
+ return false;
}
- return true;
- }
- catch (Exception ex) {
- ex.printStackTrace();
- return false;
}
+ });
+ // Assert all the actions are succeeded (useful for waitFor() timeout
+ // case):
+ final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
+ for (CoordinatorAction action : actions) {
+ assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
}
- });
- // Assert all the actions are succeeded (useful for waitFor() timeout
- // case):
- final List<CoordinatorAction> actions = ce.getCoordJob(jobId).getActions();
- for (CoordinatorAction action : actions) {
- assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
+ return jobId;
+ } finally {
+ Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true);
}
- return jobId;
}
private void writeToFile(String content, Path appPath, String fileName) throws IOException {
http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
index 88756f6..8dc0c44 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
@@ -1066,6 +1066,58 @@ public class TestCoordSubmitXCommand extends XDataTestCase {
, job.getJobXml().contains(URI_TEMPLATE_INCLUDE_XML));
}
+ /**
+ * Frequency faster/slower than than maximum
+ *
+ * @throws Exception
+ */
+ public void testCheckMaximumFrequency() throws Exception {
+ assertTrue(Services.get().getConf().getBoolean("oozie.service.coord.check.maximum.frequency", false));
+ _testCheckMaximumFrequencyHelper("5");
+ _testCheckMaximumFrequencyHelper("10");
+ _testCheckMaximumFrequencyHelper("${coord:hours(2)}");
+ _testCheckMaximumFrequencyHelper("${coord:days(3)}");
+ _testCheckMaximumFrequencyHelper("${coord:months(4)}");
+ try {
+ _testCheckMaximumFrequencyHelper("3");
+ fail();
+ } catch (CommandException ce) {
+ assertEquals(ErrorCode.E1003, ce.getErrorCode());
+ assertTrue(ce.getMessage().contains("Coordinator job with frequency [3] minutes is faster than allowed maximum of 5 "
+ + "minutes"));
+ }
+ try {
+ Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", false);
+ _testCheckMaximumFrequencyHelper("5");
+ _testCheckMaximumFrequencyHelper("10");
+ _testCheckMaximumFrequencyHelper("${coord:hours(2)}");
+ _testCheckMaximumFrequencyHelper("${coord:days(3)}");
+ _testCheckMaximumFrequencyHelper("${coord:months(4)}");
+ _testCheckMaximumFrequencyHelper("3");
+ } finally {
+ Services.get().getConf().setBoolean("oozie.service.coord.check.maximum.frequency", true);
+ }
+ }
+
+ private void _testCheckMaximumFrequencyHelper(String freq) throws Exception {
+ Configuration conf = new XConfiguration();
+ File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+ String appXml = "<coordinator-app name=\"NAME\" frequency=\"" + freq + "\" start=\"2009-02-01T01:00Z\" "
+ + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ + "xmlns=\"uri:oozie:coordinator:0.2\"> "
+ + "<action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> "
+ + "<configuration> <property> <name>inputA</name> <value>blah</value> </property> "
+ + "</configuration> </workflow> </action> </coordinator-app>";
+ writeToFile(appXml, appPathFile);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+ String jobId = sc.call();
+
+ assertEquals(jobId.substring(jobId.length() - 2), "-C");
+ checkCoordJobs(jobId);
+ }
+
private void _testConfigDefaults(boolean withDefaults) throws Exception {
Configuration conf = new XConfiguration();
File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
http://git-wip-us.apache.org/repos/asf/oozie/blob/b3b75189/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fa0540f..be20154 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1680 Add a check for a maximum frequency of 5 min on Coord jobs (rkanter)
OOZIE-1699 Some of the commands submitted to Oozie internal queue are never executed (sriksun via virag)
OOZIE-1671 add an option to limit # of coordinator actions for log retrieval (ryota)
OOZIE-1629 EL function in <timeout> is not evaluated properly (ryota)