You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 21:46:34 UTC
[2/5] oozie git commit: OOZIE-1976 Specifying coordinator input
datasets in more logical ways
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
new file mode 100644
index 0000000..c58b18b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
@@ -0,0 +1,645 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.Writer;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XHCatTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public class TestCoordInputLogicPush extends XHCatTestCase {
+
+ private Services services;
+ private String server;
+ private static final String table = "table1";
+
+ final long TIME_DAYS = 60 * 60 * 1000 * 24;
+
+ enum TEST_TYPE {
+ CURRENT_SINGLE, CURRENT_RANGE, LATEST_SINGLE, LATEST_RANGE;
+ };
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ services = super.setupServicesForHCatalog();
+ services.init();
+ createTestTable();
+ server = getMetastoreAuthority();
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ dropTestTable();
+ }
+
+ private void createSingleTestTable(String db) throws Exception {
+ dropTable(db, table, true);
+ dropDatabase(db, true);
+ createDatabase(db);
+ createTable(db, table, "dt,country");
+ }
+
+ private void createTestTable() throws Exception {
+
+ createSingleTestTable("db_a");
+ createSingleTestTable("db_b");
+ createSingleTestTable("db_c");
+ createSingleTestTable("db_d");
+ createSingleTestTable("db_e");
+ createSingleTestTable("db_f");
+
+ }
+
+ private void dropSingleTestTable(String db) throws Exception {
+ dropTable(db, table, false);
+ dropDatabase(db, false);
+ }
+
+ private void dropTestTable() throws Exception {
+
+ dropSingleTestTable("db_a");
+ dropSingleTestTable("db_b");
+ dropSingleTestTable("db_c");
+ dropSingleTestTable("db_d");
+ dropSingleTestTable("db_e");
+ dropSingleTestTable("db_f");
+
+ }
+
+ public void testExists() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<data-in dataset=\"B\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</or>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+ String input = addPartition("db_b", "table1", "dt=20141008;country=usa");
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ Configuration runConf = getActionConf(actionBean);
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 1);
+ checkDataSets(dataSets, input);
+
+ }
+
+ public void testNestedCondition3() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<and>" +
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</and>" +
+ "<and>"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</and>"+
+ "<and>"+
+ "<data-in dataset=\"E\" />"+
+ "<data-in dataset=\"F\" />"+
+ "</and>"+
+ "</and>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ final String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+ String input1 = addPartition("db_a", "table1", "dt=20141008;country=usa");
+ String input2 = addPartition("db_b", "table1", "dt=20141008;country=usa");
+ String input3 = addPartition("db_c", "table1", "dt=20141008;country=usa");
+ String input4 = addPartition("db_d", "table1", "dt=20141008;country=usa");
+ String input5 = addPartition("db_e", "table1", "dt=20141008;country=usa");
+ String input6 = addPartition("db_f", "table1", "dt=20141008;country=usa");
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ Configuration runConf = getActionConf(actionBean);
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 6);
+ checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+
+ }
+
+ public void testNestedConditionWithRange() throws Exception {
+
+ Configuration conf = getConfForCombine();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\" min=\"2\" >"+
+ "<or min=\"2\">" +
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</or>" +
+ "<or min=\"2\">"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</or>"+
+ "<and min=\"2\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"C\" />"+
+ "</and>"+
+ "</and>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ final String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_RANGE,
+ TEST_TYPE.LATEST_RANGE);
+ List<String> inputPartition = createPartitionWithTime("db_a", now, 0, 1, 2);
+ inputPartition.addAll(createPartitionWithTime("db_c", now, 0, 1, 2));
+ startCoordAction(jobId);
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ Configuration runConf = getActionConf(actionBean);
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 12);
+ checkDataSets(dataSets, inputPartition.toArray(new String[inputPartition.size()]));
+
+ }
+
+
+ public void testLatestRange() throws Exception {
+
+ Configuration conf = getConfForCombine();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+ String inputLogic =
+ //@formatter:off
+ "<and name=\"test\">"+
+ "<data-in dataset=\"A\" />" +
+ "<data-in dataset=\"B\" />" +
+ "</and>";
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+
+ List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+ inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 12);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+ }
+
+ public void testCurrentLatest() throws Exception {
+
+ Configuration conf = getConfForCombine();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+ String inputLogic =
+//@formatter:off
+ "<and name=\"test\">"+
+ "<data-in dataset=\"A\"/>" +
+ "<data-in dataset=\"B\"/>" +
+ "</and>";
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE,
+ TEST_TYPE.CURRENT_RANGE);
+
+ List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+ inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 12);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+ }
+
+ public void testLatestRangeComplex() throws Exception {
+
+ Configuration conf = getConfForCombine();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+ String inputLogic =
+ //@formatter:off
+ "<or name=\"test\">" +
+ "<and>"+
+ "<data-in name=\"testA\" dataset=\"A\" />" +
+ "<data-in name=\"testB\" dataset=\"B\" />" +
+ "</and>" +
+ "<and name=\"test\">"+
+ "<data-in name=\"testC\" dataset=\"C\" />" +
+ "<data-in name=\"testD\" dataset=\"D\" />" +
+ "</and>" +
+ "</or>";
+
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+ List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+ inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 12);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+ }
+
+ public void testHcatHdfs() throws Exception {
+ Configuration conf = getConfForCombine();
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String inputLogic =
+ //@formatter:off
+ "<and name=\"test\">" +
+ "<data-in name=\"testA\" dataset=\"A\" />" +
+ "<data-in name=\"testB\" dataset=\"B\" />" +
+ "</and>";
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+ String input1 = createTestCaseSubDir("input-data/b/2014/10/08/_SUCCESS".split("/"));
+ String input2 = addPartition("db_a", "table1", "dt=20141008;country=usa");
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+ checkDataSets(dataSets, input1, input2);
+
+ }
+
+ public void testHcatHdfsLatest() throws Exception {
+ Configuration conf = getConfForCombine();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+ conf.set("initial_instance", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+ SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd");
+ TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+ sd.setTimeZone(tzUTC);
+
+ String inputLogic =
+ // @formatter:off
+ "<and name=\"test\" min = \"1\" >" +
+ "<data-in dataset=\"A\" />" +
+ "<data-in dataset=\"D\" />" +
+ "</and>";
+
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+
+ String input1 = createTestCaseSubDir(("input-data/d/" + sd.format(now) + "/_SUCCESS").split("/"));
+ sd = new SimpleDateFormat("yyyyMMdd");
+ String input2 = addPartition("db_a", "table1", "dt=" + sd.format(now) + ";country=usa");
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+ checkDataSets(dataSets, input1, input2);
+
+ }
+
+ private Configuration getConf() throws Exception {
+ Configuration conf = new XConfiguration();
+ conf.set("start_time", "2014-10-08T00:00Z");
+ conf.set("end_time", "2015-10-08T00:00Z");
+ conf.set("initial_instance", "2014-10-08T00:00Z");
+
+ String dataset1 = "hcat://" + getMetastoreAuthority();
+
+ conf.set("data_set", dataset1.toString());
+ conf.set("db_a", "db_a");
+ conf.set("db_b", "db_b");
+ conf.set("db_c", "db_c");
+ conf.set("db_d", "db_d");
+ conf.set("db_e", "db_e");
+ conf.set("db_f", "db_f");
+ conf.set("table", table);
+ conf.set("wfPath", getWFPath());
+ conf.set("partitionName", "test");
+
+ return conf;
+
+ }
+
+ private Configuration getConfForCombine() throws Exception {
+ Configuration conf = new XConfiguration();
+ conf.set("start_time", "2014-10-08T00:00Z");
+ conf.set("end_time", "2015-10-08T00:00Z");
+ conf.set("initial_instance", "2014-10-08T00:00Z");
+
+ conf.set("data_set_b", "file://" + getTestCaseDir() + "/input-data/b");
+ conf.set("data_set_d", "file://" + getTestCaseDir() + "/input-data/d");
+ conf.set("data_set_f", "file://" + getTestCaseDir() + "/input-data/f");
+
+ conf.set("start_time", "2014-10-08T00:00Z");
+ conf.set("end_time", "2015-10-08T00:00Z");
+ conf.set("initial_instance_a", "2014-10-08T00:00Z");
+ conf.set("initial_instance_b", "2014-10-08T00:00Z");
+
+ String dataset1 = "hcat://" + getMetastoreAuthority();
+
+ conf.set("data_set", dataset1.toString());
+ conf.set("db_a", "db_a");
+ conf.set("db_b", "db_b");
+ conf.set("db_c", "db_c");
+ conf.set("db_d", "db_d");
+ conf.set("db_e", "db_e");
+ conf.set("db_f", "db_f");
+ conf.set("table", table);
+ conf.set("wfPath", getWFPath());
+ conf.set("partitionName", "test");
+
+ return conf;
+
+ }
+
+ private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, TEST_TYPE... testType)
+ throws Exception {
+ String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
+
+ String content = IOUtils.getResourceAsString(coordinatorXml, -1);
+ content = content.replaceAll("=input-logic=", inputLogic);
+ for (int i = 1; i <= 6; i++) {
+ if (i - 1 < testType.length) {
+ content = content.replaceAll("=data-in-param-" + i + "=", getEnumText(testType[i - 1]));
+ }
+ else {
+ content = content.replaceAll("=data-in-param-" + i + "=", getEnumText(testType[testType.length - 1]));
+ }
+ }
+
+ Writer writer = new FileWriter(new URI(appPath).getPath());
+ IOUtils.copyCharStream(new StringReader(content), writer);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nameNode", "hdfs://localhost:9000");
+ conf.set("queueName", "default");
+ conf.set("jobTracker", "localhost:9001");
+ conf.set("examplesRoot", "examples");
+
+ String coordId = null;
+
+ try {
+ coordId = new CoordSubmitXCommand(conf).call();
+ }
+ catch (CommandException e) {
+ e.printStackTrace();
+ fail("should not throw exception " + e.getMessage());
+ }
+ return coordId;
+ }
+
+ public String getWFPath() throws Exception {
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ String appXml = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='map-reduce-wf'> " + "<start to='end' /> "
+ + "<end name='end' /> " + "</workflow-app>";
+
+ writeToFile(appXml, workflowUri);
+ return workflowUri;
+ }
+
+ private void writeToFile(String appXml, String appPath) throws IOException {
+ File wf = new File(URI.create(appPath));
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(wf));
+ out.println(appXml);
+ }
+ catch (IOException iOException) {
+ throw iOException;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ public void checkDataSets(String dataSets, String... values) {
+
+ Set<String> inputDataSets = new HashSet<String>();
+ for (String dataSet : dataSets.split(",")) {
+ if (dataSet.indexOf(getTestCaseDir()) >= 0) {
+ inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+ }
+ else {
+ inputDataSets.add(dataSet);
+ }
+ }
+
+ for (String value : values) {
+ assertTrue(inputDataSets.contains(value.replace("/_SUCCESS","")));
+ }
+
+ }
+
+ private void startCoordAction(final String jobId) throws CommandException, JPAExecutorException {
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ new CoordPushDependencyCheckXCommand(jobId + "@1").call();
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+ waitFor(50 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+ }
+ });
+
+ CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+ jobId + "@1");
+ assertFalse("Action status should not be waiting",
+ actionBean.getStatus().equals(CoordinatorAction.Status.WAITING));
+
+ waitFor(50 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ return !actionBean.getStatus().equals(CoordinatorAction.Status.READY);
+ }
+ });
+ CoordinatorJob coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+ new CoordActionStartXCommand(actionBean.getId(), coordJob.getUser(), coordJob.getAppName(),
+ actionBean.getJobId()).call();
+ }
+
+ @SuppressWarnings("unchecked")
+ public Configuration getActionConf(CoordinatorActionBean actionBean) throws JDOMException {
+ Configuration conf = new XConfiguration();
+ Element eAction = XmlUtils.parseXml(actionBean.getActionXml());
+ Element configElem = eAction.getChild("action", eAction.getNamespace())
+ .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+ List<Element> elementList = configElem.getChildren("property", eAction.getNamespace());
+ for (Element element : elementList) {
+ conf.set(((Element) element.getChildren().get(0)).getText(),
+ ((Element) element.getChildren().get(1)).getText());
+ }
+ return conf;
+ }
+
+ private String getEnumText(TEST_TYPE testType) {
+ switch (testType) {
+ case LATEST_SINGLE:
+ return "<instance>\\${coord:latest(0)}</instance>";
+ case LATEST_RANGE:
+ return "<start-instance>\\${coord:latest(-5)}</start-instance>"
+ + "<end-instance>\\${coord:latest(0)}</end-instance>";
+ case CURRENT_SINGLE:
+ return "<instance>\\${coord:current(0)}</instance>";
+ case CURRENT_RANGE:
+ return "<start-instance>\\${coord:current(-5)}</start-instance>"
+ + "<end-instance>\\${coord:current(0)}</end-instance>";
+ }
+ return "";
+
+ }
+
+ public List<String> createDirWithTime(String dirPrefix, Date date, int... hours) {
+
+ SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd");
+
+ TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+ sd.setTimeZone(tzUTC);
+ List<String> createdDirPath = new ArrayList<String>();
+
+ for (int hour : hours) {
+ createdDirPath
+ .add(createTestCaseSubDir((dirPrefix + sd.format(new Date(date.getTime() - hour * TIME_DAYS)) + "/_SUCCESS")
+ .split("/")));
+ }
+ return createdDirPath;
+ }
+
+ public List<String> createPartitionWithTime(String database, Date date, int... hours) throws Exception {
+
+ List<String> createdPartition = new ArrayList<String>();
+ SimpleDateFormat sd = new SimpleDateFormat("yyyyMMdd");
+ TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+ sd.setTimeZone(tzUTC);
+ for (int hour : hours) {
+ createdPartition.add(addPartition(database, "table1",
+ "dt=" + sd.format(new Date(date.getTime() - hour * TIME_DAYS)) + ";country=usa"));
+
+ }
+ return createdPartition;
+ }
+
+ protected String addPartition(String db, String table, String partitionSpec) throws Exception {
+ super.addPartition(db, table, partitionSpec);
+ return "hcat://" + server + "/" + db + "/" + table + "/" + partitionSpec;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
new file mode 100644
index 0000000..0679c8c
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
@@ -0,0 +1,1054 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.Writer;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.jdom.JDOMException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordinatorInputLogic extends XDataTestCase {
+ private Services services;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ services = new Services();
+ services.init();
+ }
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ services.destroy();
+ super.tearDown();
+ }
+
+
+ @Test(expected = CommandException.class)
+ public void testValidateRange() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<combine name=\"test\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"b\" />"+
+ "</combine>";
+ String inputEvent =
+ "<data-in name=\"A\" dataset=\"a\">" +
+ "<start-instance>${coord:current(-5)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"B\" dataset=\"b\">" +
+ "<start-instance>${coord:current(-4)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ try {
+ _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic, inputEvent, true);
+ fail();
+ }
+ catch (CommandException e) {
+ assertEquals(e.getErrorCode(), ErrorCode.E0803);
+ }
+ }
+
+ public void testDryRun() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<and>"+
+ "<or>"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</or>"+
+ "<or>"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</or>"+
+ "</and>"+
+ "<and>"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</and>"+
+ "</or>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic, "", true);
+
+ }
+
+ public void testNestedCondition() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<and>"+
+ "<or>"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</or>"+
+ "<or>"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</or>"+
+ "</and>"+
+ "<and>"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</and>"+
+ "</or>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+
+ final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+ checkDataSets(dataSets, input1, input2);
+
+ }
+
+ public void testNestedCondition1() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<or>"+
+ "<and>" +
+ "<data-in dataset=\"A\"/>"+
+ "<data-in dataset=\"B\"/>"+
+ "</and>" +
+ "<and>"+
+ "<data-in dataset=\"C\"/>"+
+ "<data-in dataset=\"D\"/>"+
+ "</and>"+
+ "</or>"+
+ "<and>"+
+ "<data-in dataset=\"E\"/>"+
+ "<data-in dataset=\"F\"/>"+
+ "</and>"+
+ "</and>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 4);
+ checkDataSets(dataSets, input1, input2, input3, input4);
+
+ }
+
+ public void testNestedCondition2() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"${partitionName}\">"+
+ "<and>" +
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</and>" +
+ "<and>"+
+ "<data-in dataset=\"E\" />"+
+ "<data-in dataset=\"F\" />"+
+ "</and>"+
+ "</or>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/c/2014/10/08/00/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+ checkDataSets(dataSets, input4, input5);
+ checkDataSetsForFalse(dataSets, input1, input2, input3);
+
+ }
+
+ public void testNestedCondition3() throws Exception {
+ Configuration conf = getConf();
+
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<and>" +
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</and>" +
+ "<and>"+
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</and>"+
+ "<and>"+
+ "<data-in dataset=\"E\" />"+
+ "<data-in dataset=\"F\" />"+
+ "</and>"+
+ "</and>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/c/2014/10/08/00/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/d/2014/10/08/00/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+ String input6 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 6);
+ checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+
+ }
+
+ public void testSimpleOr() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</or>";
+ //@formatter:on
+ conf.set("partitionName", "test");
+ String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 1);
+ checkDataSets(dataSets, input1);
+ }
+
+ public void testSimpleOr1() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<and>" +
+ "<data-in dataset=\"C\" />"+
+ "<data-in dataset=\"D\" />"+
+ "</and>" +
+ "<or>"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</or>"+
+ "</or>";
+
+ String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ String input1=createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+ startCoordAction(jobId);
+ actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 1);
+ checkDataSets(dataSets, input1);
+
+ }
+
+ public void testOrWithMin() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<data-in dataset=\"A\" min=\"3\"/>"+
+ "<data-in dataset=\"B\" min=\"3\"/>"+
+ "</or>";
+ //@formatter:on
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 3);
+ checkDataSets(dataSets, input3, input4, input5);
+ }
+
+ public void testAndWithMin() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<data-in dataset=\"A\" min=\"2\"/>"+
+ "<data-in dataset=\"B\" min=\"3\"/>"+
+ "<data-in dataset=\"C\" min=\"0\"/>"+
+
+ "</and>";
+ //@formatter:on
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 5);
+ checkDataSets(dataSets, input1, input2, input3, input4, input5, input5);
+ }
+
+ public void testMultipleInstance() throws Exception {
+ Configuration conf = getConf();
+ Date now = new Date();
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<data-in dataset=\"A\" min=\"2\"/>"+
+ "<data-in dataset=\"B\"/>"+
+
+ "</and>";
+ String event =
+ "<data-in name=\"A\" dataset=\"a\">" +
+ "<instance>${coord:current(-5)}</instance>" +
+ "<instance>${coord:latest(-1)}</instance>" +
+ "<instance>${coord:futureRange(0,2,10)}</instance>" +
+ "</data-in>" +
+ "<data-in name=\"B\" dataset=\"b\">" +
+ "<instance>${coord:latest(0)}</instance>" +
+ "<instance>${coord:latestRange(-3,0)}</instance>" +
+ "</data-in>" ;
+
+ //@formatter:on
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+ // 5 hour before
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+ // 5 hour before
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, event);
+
+ List<String> inputDir = createDirWithTime("input-data/a/", now, 3, 5, 0, -1, -2);
+ inputDir.addAll(createDirWithTime("input-data/b/", now, 0, 1));
+
+ startCoordActionForWaiting(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertTrue(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+
+ inputDir.addAll(createDirWithTime("input-data/b/", now, 2, 3));
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 10);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+ }
+
+ public void testAnd() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<and name=\"test\">"+
+ "<data-in dataset=\"A\"/>"+
+ "<data-in dataset=\"B\"/>"+
+ "</and>";
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+ checkDataSets(dataSets, input1, input2);
+
+ }
+
+ public void testCombine() throws Exception {
+
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<combine name=\"test\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</combine>";
+ //@formatter:on
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+ String input6 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 6);
+ checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+ }
+
+ public void testCombineNegative() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<combine name=\"test\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</combine>";
+ //@formatter:on
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ final String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ waitFor(5 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+ }
+ });
+
+ CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+ jobId + "@1");
+ assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+
+ }
+
+ public void testSingeSetWithMin() throws Exception {
+ Configuration conf = getConf();
+ //@formatter:off
+ String inputLogic =
+ "<or name=\"test\">"+
+ "<data-in dataset=\"A\" min=\"3\" />"+
+ "</or>";
+ //@formatter:on
+
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ // dataset with gap
+ String input3 = createTestCaseSubDir("input-data/a/2014/10/07/19/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 3);
+ checkDataSets(dataSets, input1, input2, input3);
+ }
+
+ public void testCombineWithMin() throws Exception {
+ Configuration conf = getConf();
+ String inputLogic =
+ //@formatter:off
+ "<combine name=\"test\" min=\"4\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</combine>";
+ //@formatter:on
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ final String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ String input3 = createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+ String input4 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ String input5 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 5);
+ checkDataSets(dataSets, input1, input2, input3, input4, input5);
+
+ }
+
+ public void testMinWait() throws Exception {
+ Configuration conf = getConf();
+ Date now = new Date();
+ String inputLogic =
+ //@formatter:off
+ "<combine name=\"test\" min= \"4\" wait=\"1\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</combine>";
+ //@formatter:on
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+ // 5 hour before
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+ // 5 hour before
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+ startCoordActionForWaiting(jobId);
+ // wait for 1 min
+ sleep(60 * 1000);
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 5);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+ }
+
+ public void testWait() throws Exception {
+ Configuration conf = getConf();
+ Date now = new Date();
+ String inputLogic =
+ //@formatter:off
+ "<combine name=\"test\" wait=\"1\">"+
+ "<data-in dataset=\"A\" />"+
+ "<data-in dataset=\"B\" />"+
+ "</combine>";
+ //@formatter:on
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+ startCoordActionForWaiting(jobId);
+ // wait for 1 min
+ sleep(60 * 1000);
+
+ inputDir.addAll(createDirWithTime("input-data/b/", now, 5));
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 6);
+ checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+ }
+
+ public void testWaitFail() throws Exception {
+ Configuration conf = getConf();
+ Date now = new Date();
+ String inputLogic =
+ //@formatter:off
+ "<or name=\"test\" min=\"${min}\" wait=\"${wait}\">"+
+ "<data-in dataset=\"${dataA}\" />"+
+ "<data-in dataset=\"${dataB}\" />"+
+ "</or>";
+ //@formatter:on
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("min", "4");
+ conf.set("wait", "180");
+ conf.set("dataA", "A");
+ conf.set("dataB", "B");
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+ String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+ createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+ startCoordActionForWaiting(jobId);
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertTrue(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ }
+
+ public void testLatest() throws Exception {
+
+ Configuration conf = getConf();
+ conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+ String inputLogic = "<data-in name=\"test\" dataset=\"A\"/>";
+ String jobId = _testCoordSubmit("coord-inputlogic-latest.xml", conf, inputLogic);
+
+ String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 1);
+ checkDataSets(dataSets, input1);
+
+ }
+
+ public void testLatestRange() throws Exception {
+
+ Configuration conf = getConf();
+ Date now = new Date();
+ conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+ conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+ conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+ conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+ String inputLogic =
+ //@formatter:off
+ "<data-in name=\"test\" dataset=\"A\" min =\"2\" />";
+ //@formatter:on
+ String jobId = _testCoordSubmit("coord-inputlogic-range-latest.xml", conf, inputLogic);
+
+ createDirWithTime("input-data/a/", now, 0, 1);
+
+ startCoordAction(jobId);
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+ assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+ String dataSets = runConf.get("inputLogicData");
+ assertEquals(dataSets.split(",").length, 2);
+
+ }
+
+ //TODO combine support for unresolved
+ // public void testLatestWithCombine() throws Exception {
+ // Configuration conf = getConf();
+ // conf.set("input_check", "combine(\"A\", \"B\")");
+ // conf.set("initial_instance_a", "2014-10-07T00:00Z");
+ // conf.set("initial_instance_b", "2014-10-07T00:00Z");
+ //
+ // String jobId = _testCoordSubmit("coord-inputlogic-range-latest.xml", conf);
+ //
+ // new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+ // CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ // CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ // sleep(2000);
+ //
+ // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ // assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+ // createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+ // createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+ // createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+ // createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+ //
+ // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ //
+ // actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ // assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+ //
+ // createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+ // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ //
+ // actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ // assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+ //
+ // }
+ public void testCoordWithoutInputCheck() throws Exception {
+ Configuration conf = new XConfiguration();
+ String jobId = setupCoord(conf, "coord-multiple-input-instance3.xml");
+ sleep(1000);
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+ CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+ jobId + "@1");
+
+ assertEquals(actionBean.getMissingDependencies(), "!!${coord:latest(0)}#${coord:latest(-1)}");
+
+ }
+
+ private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic) throws Exception {
+ return _testCoordSubmit(coordinatorXml, conf, inputLogic, "", false);
+ }
+
+ private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, String inputEvent)
+ throws Exception {
+ return _testCoordSubmit(coordinatorXml, conf, inputLogic, inputEvent, false);
+ }
+
+ private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, String inputEvent,
+ boolean dryRun) throws Exception {
+ String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
+
+ String content = IOUtils.getResourceAsString(coordinatorXml, -1);
+ content = content.replace("=input-logic=", inputLogic);
+ content = content.replace("=input-events=", inputEvent);
+
+ Writer writer = new FileWriter(new URI(appPath).getPath());
+ IOUtils.copyCharStream(new StringReader(content), writer);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ conf.set("nameNode", "hdfs://localhost:9000");
+ conf.set("queueName", "default");
+ conf.set("jobTracker", "localhost:9001");
+ conf.set("examplesRoot", "examples");
+
+ return new CoordSubmitXCommand(dryRun, conf).call();
+ }
+
+ private Configuration getConf() throws Exception {
+ Configuration conf = new XConfiguration();
+ conf.set("data_set_a", "file://" + getTestCaseDir() + "/input-data/a");
+ conf.set("data_set_b", "file://" + getTestCaseDir() + "/input-data/b");
+ conf.set("data_set_c", "file://" + getTestCaseDir() + "/input-data/c");
+ conf.set("data_set_d", "file://" + getTestCaseDir() + "/input-data/d");
+ conf.set("data_set_e", "file://" + getTestCaseDir() + "/input-data/e");
+ conf.set("data_set_f", "file://" + getTestCaseDir() + "/input-data/f");
+ conf.set("partitionName", "test");
+
+ conf.set("start_time", "2014-10-08T00:00Z");
+ conf.set("end_time", "2015-10-08T00:00Z");
+ conf.set("initial_instance_a", "2014-10-08T00:00Z");
+ conf.set("initial_instance_b", "2014-10-08T00:00Z");
+ conf.set("wfPath", getWFPath());
+ return conf;
+
+ }
+
+ public String getWFPath() throws Exception {
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ String appXml = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='map-reduce-wf'> " + "<start to='end' /> "
+ + "<end name='end' /> " + "</workflow-app>";
+
+ writeToFile(appXml, workflowUri);
+ return workflowUri;
+
+ }
+
+ private void writeToFile(String appXml, String appPath) throws IOException {
+ File wf = new File(URI.create(appPath));
+ PrintWriter out = null;
+ try {
+ out = new PrintWriter(new FileWriter(wf));
+ out.println(appXml);
+ }
+ catch (IOException iex) {
+ throw iex;
+ }
+ finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ public void checkDataSets(String dataSets, String... values) {
+
+ Set<String> inputDataSets = new HashSet<String>();
+ for (String dataSet : dataSets.split(",")) {
+ inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+ }
+
+ for (String value : values) {
+ assertTrue(inputDataSets.contains(value.replace("/_SUCCESS","")));
+ }
+ }
+
+ public void checkDataSetsForFalse(String dataSets, String... values) {
+
+ Set<String> inputDataSets = new HashSet<String>();
+ for (String dataSet : dataSets.split(",")) {
+ inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+ }
+
+ for (String value : values) {
+ assertFalse(inputDataSets.contains(value));
+ }
+
+ }
+
+ private void startCoordAction(final String jobId) throws CommandException, JPAExecutorException {
+ startCoordAction(jobId, CoordinatorAction.Status.WAITING);
+
+ }
+
+ private void startCoordAction(final String jobId, final CoordinatorAction.Status coordActionStatus)
+ throws CommandException, JPAExecutorException {
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ waitFor(50 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+ }
+ });
+
+ CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+ jobId + "@1");
+ assertFalse(actionBean.getStatus().equals(coordActionStatus));
+
+ CoordinatorJob coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+
+ new CoordActionStartXCommand(actionBean.getId(), coordJob.getUser(), coordJob.getAppName(),
+ actionBean.getJobId()).call();
+ }
+
+ private void startCoordActionForWaiting(final String jobId) throws CommandException, JPAExecutorException,
+ JDOMException {
+ new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+ new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+ waitFor(5 * 1000, new Predicate() {
+ public boolean evaluate() throws Exception {
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+ }
+ });
+
+ CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+ CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+ assertTrue("should be waiting", actionBean.getStatus().equals(CoordinatorAction.Status.WAITING));
+ }
+
+ private String setupCoord(Configuration conf, String coordFile) throws CommandException, IOException {
+ File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+ Reader reader = IOUtils.getResourceAsReader(coordFile, -1);
+ Writer writer = new FileWriter(appPathFile);
+ conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+ conf.set(OozieClient.USER_NAME, getTestUser());
+ CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+ IOUtils.copyCharStream(reader, writer);
+ sc = new CoordSubmitXCommand(conf);
+ return sc.call();
+
+ }
+
+ private String getInputEventForRange() {
+ //@formatter:off
+ return
+ "<data-in name=\"A\" dataset=\"a\">" +
+ "<start-instance>${coord:current(-5)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"B\" dataset=\"b\">" +
+ "<start-instance>${coord:current(-5)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"C\" dataset=\"c\">" +
+ "<start-instance>${coord:current(-5)}</start-instance> " +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"D\" dataset=\"d\">" +
+ "<start-instance>${coord:current(-5)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"E\" dataset=\"e\">" +
+ "<start-instance>${coord:current(-5)}</start-instance>" +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>" +
+ "<data-in name=\"F\" dataset=\"f\">" +
+ "<start-instance>${coord:current(-5)}</start-instance> " +
+ "<end-instance>${coord:current(0)}</end-instance>" +
+ "</data-in>";
+ //@formatter:on
+ }
+
+ public List<String> createDirWithTime(String dirPrefix, Date date, int... hours) {
+
+ SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd/HH");
+
+ TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+ sd.setTimeZone(tzUTC);
+ List<String> createdDirPath = new ArrayList<String>();
+
+ for (int hour : hours) {
+ createdDirPath.add(createTestCaseSubDir((dirPrefix
+ + sd.format(new Date(date.getTime() - hour * 60 * 60 * 1000)) + "/_SUCCESS").split("/")));
+ }
+ return createdDirPath;
+ }
+
+}