You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pu...@apache.org on 2016/01/26 21:46:34 UTC

[2/5] oozie git commit: OOZIE-1976 Specifying coordinator input datasets in more logical ways

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
new file mode 100644
index 0000000..c58b18b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordInputLogicPush.java
@@ -0,0 +1,645 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.Writer;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XHCatTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+
+public class TestCoordInputLogicPush extends XHCatTestCase {
+
+    private Services services;
+    private String server;
+    private static final String table = "table1";
+
+    final long TIME_DAYS = 60 * 60 * 1000 * 24;
+
+    enum TEST_TYPE {
+        CURRENT_SINGLE, CURRENT_RANGE, LATEST_SINGLE, LATEST_RANGE;
+    };
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        services = super.setupServicesForHCatalog();
+        services.init();
+        createTestTable();
+        server = getMetastoreAuthority();
+
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+        dropTestTable();
+    }
+
+    private void createSingleTestTable(String db) throws Exception {
+        dropTable(db, table, true);
+        dropDatabase(db, true);
+        createDatabase(db);
+        createTable(db, table, "dt,country");
+    }
+
+    private void createTestTable() throws Exception {
+
+        createSingleTestTable("db_a");
+        createSingleTestTable("db_b");
+        createSingleTestTable("db_c");
+        createSingleTestTable("db_d");
+        createSingleTestTable("db_e");
+        createSingleTestTable("db_f");
+
+    }
+
+    private void dropSingleTestTable(String db) throws Exception {
+        dropTable(db, table, false);
+        dropDatabase(db, false);
+    }
+
+    private void dropTestTable() throws Exception {
+
+        dropSingleTestTable("db_a");
+        dropSingleTestTable("db_b");
+        dropSingleTestTable("db_c");
+        dropSingleTestTable("db_d");
+        dropSingleTestTable("db_e");
+        dropSingleTestTable("db_f");
+
+    }
+
+    public void testExists() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                  "<data-in dataset=\"B\" />"+
+                  "<data-in dataset=\"D\" />"+
+         "</or>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+        String input = addPartition("db_b", "table1", "dt=20141008;country=usa");
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        Configuration runConf = getActionConf(actionBean);
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 1);
+        checkDataSets(dataSets, input);
+
+    }
+
+    public void testNestedCondition3() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+                  "<and>" +
+                          "<data-in dataset=\"A\" />"+
+                          "<data-in dataset=\"B\" />"+
+                   "</and>" +
+                   "<and>"+
+                          "<data-in dataset=\"C\" />"+
+                          "<data-in dataset=\"D\" />"+
+                   "</and>"+
+                   "<and>"+
+                       "<data-in dataset=\"E\" />"+
+                       "<data-in dataset=\"F\" />"+
+                   "</and>"+
+         "</and>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        final String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+        String input1 = addPartition("db_a", "table1", "dt=20141008;country=usa");
+        String input2 = addPartition("db_b", "table1", "dt=20141008;country=usa");
+        String input3 = addPartition("db_c", "table1", "dt=20141008;country=usa");
+        String input4 = addPartition("db_d", "table1", "dt=20141008;country=usa");
+        String input5 = addPartition("db_e", "table1", "dt=20141008;country=usa");
+        String input6 = addPartition("db_f", "table1", "dt=20141008;country=usa");
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        Configuration runConf = getActionConf(actionBean);
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 6);
+        checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+
+    }
+
+    public void testNestedConditionWithRange() throws Exception {
+
+        Configuration conf = getConfForCombine();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\" min=\"2\" >"+
+                  "<or min=\"2\">" +
+                          "<data-in dataset=\"A\" />"+
+                          "<data-in dataset=\"B\" />"+
+                   "</or>" +
+                   "<or min=\"2\">"+
+                          "<data-in dataset=\"C\" />"+
+                          "<data-in dataset=\"D\" />"+
+                   "</or>"+
+                   "<and min=\"2\">"+
+                       "<data-in dataset=\"A\" />"+
+                       "<data-in dataset=\"C\" />"+
+                   "</and>"+
+         "</and>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        final String jobId = _testCoordSubmit("coord-inputlogic-hcat.xml", conf, inputLogic, TEST_TYPE.CURRENT_RANGE,
+                TEST_TYPE.LATEST_RANGE);
+        List<String> inputPartition = createPartitionWithTime("db_a", now, 0, 1, 2);
+        inputPartition.addAll(createPartitionWithTime("db_c", now, 0, 1, 2));
+        startCoordAction(jobId);
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        Configuration runConf = getActionConf(actionBean);
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 12);
+        checkDataSets(dataSets, inputPartition.toArray(new String[inputPartition.size()]));
+
+    }
+
+
+    public void testLatestRange() throws Exception {
+
+        Configuration conf = getConfForCombine();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+        String inputLogic =
+        //@formatter:off
+        "<and name=\"test\">"+
+              "<data-in dataset=\"A\" />" +
+              "<data-in dataset=\"B\" />" +
+         "</and>";
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+
+        List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+        inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 12);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+    }
+
+    public void testCurrentLatest() throws Exception {
+
+        Configuration conf = getConfForCombine();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+        String inputLogic =
+//@formatter:off
+        "<and name=\"test\">"+
+              "<data-in dataset=\"A\"/>" +
+              "<data-in dataset=\"B\"/>" +
+         "</and>";
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE,
+                TEST_TYPE.CURRENT_RANGE);
+
+        List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+        inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 12);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+    }
+
+    public void testLatestRangeComplex() throws Exception {
+
+        Configuration conf = getConfForCombine();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+        String inputLogic =
+        //@formatter:off
+        "<or name=\"test\">" +
+            "<and>"+
+                   "<data-in name=\"testA\" dataset=\"A\" />" +
+                   "<data-in name=\"testB\" dataset=\"B\" />" +
+             "</and>" +
+             "<and name=\"test\">"+
+                 "<data-in name=\"testC\" dataset=\"C\" />" +
+                 "<data-in name=\"testD\" dataset=\"D\" />" +
+             "</and>" +
+        "</or>";
+
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+        List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4, 5);
+        inputDir.addAll(createPartitionWithTime("db_a", now, 0, 1, 2, 3, 4, 5));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 12);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+
+    }
+
+    public void testHcatHdfs() throws Exception {
+        Configuration conf = getConfForCombine();
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String inputLogic =
+        //@formatter:off
+            "<and name=\"test\">" +
+                       "<data-in name=\"testA\" dataset=\"A\" />" +
+                       "<data-in name=\"testB\" dataset=\"B\" />" +
+            "</and>";
+            //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.CURRENT_SINGLE);
+
+        String input1 = createTestCaseSubDir("input-data/b/2014/10/08/_SUCCESS".split("/"));
+        String input2 = addPartition("db_a", "table1", "dt=20141008;country=usa");
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+        checkDataSets(dataSets, input1, input2);
+
+    }
+
+    public void testHcatHdfsLatest() throws Exception {
+        Configuration conf = getConfForCombine();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 10 * TIME_DAYS)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+        conf.set("initial_instance", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * TIME_DAYS)));
+
+        SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd");
+        TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+        sd.setTimeZone(tzUTC);
+
+        String inputLogic =
+        // @formatter:off
+            "<and name=\"test\" min = \"1\" >" +
+                       "<data-in dataset=\"A\" />" +
+                       "<data-in dataset=\"D\" />" +
+            "</and>";
+
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-combine.xml", conf, inputLogic, TEST_TYPE.LATEST_RANGE);
+
+        String input1 = createTestCaseSubDir(("input-data/d/" + sd.format(now) + "/_SUCCESS").split("/"));
+        sd = new SimpleDateFormat("yyyyMMdd");
+        String input2 = addPartition("db_a", "table1", "dt=" + sd.format(now) + ";country=usa");
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+        checkDataSets(dataSets, input1, input2);
+
+    }
+
+    private Configuration getConf() throws Exception {
+        Configuration conf = new XConfiguration();
+        conf.set("start_time", "2014-10-08T00:00Z");
+        conf.set("end_time", "2015-10-08T00:00Z");
+        conf.set("initial_instance", "2014-10-08T00:00Z");
+
+        String dataset1 = "hcat://" + getMetastoreAuthority();
+
+        conf.set("data_set", dataset1.toString());
+        conf.set("db_a", "db_a");
+        conf.set("db_b", "db_b");
+        conf.set("db_c", "db_c");
+        conf.set("db_d", "db_d");
+        conf.set("db_e", "db_e");
+        conf.set("db_f", "db_f");
+        conf.set("table", table);
+        conf.set("wfPath", getWFPath());
+        conf.set("partitionName", "test");
+
+        return conf;
+
+    }
+
+    private Configuration getConfForCombine() throws Exception {
+        Configuration conf = new XConfiguration();
+        conf.set("start_time", "2014-10-08T00:00Z");
+        conf.set("end_time", "2015-10-08T00:00Z");
+        conf.set("initial_instance", "2014-10-08T00:00Z");
+
+        conf.set("data_set_b", "file://" + getTestCaseDir() + "/input-data/b");
+        conf.set("data_set_d", "file://" + getTestCaseDir() + "/input-data/d");
+        conf.set("data_set_f", "file://" + getTestCaseDir() + "/input-data/f");
+
+        conf.set("start_time", "2014-10-08T00:00Z");
+        conf.set("end_time", "2015-10-08T00:00Z");
+        conf.set("initial_instance_a", "2014-10-08T00:00Z");
+        conf.set("initial_instance_b", "2014-10-08T00:00Z");
+
+        String dataset1 = "hcat://" + getMetastoreAuthority();
+
+        conf.set("data_set", dataset1.toString());
+        conf.set("db_a", "db_a");
+        conf.set("db_b", "db_b");
+        conf.set("db_c", "db_c");
+        conf.set("db_d", "db_d");
+        conf.set("db_e", "db_e");
+        conf.set("db_f", "db_f");
+        conf.set("table", table);
+        conf.set("wfPath", getWFPath());
+        conf.set("partitionName", "test");
+
+        return conf;
+
+    }
+
+    private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, TEST_TYPE... testType)
+            throws Exception {
+        String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
+
+        String content = IOUtils.getResourceAsString(coordinatorXml, -1);
+        content = content.replaceAll("=input-logic=", inputLogic);
+        for (int i = 1; i <= 6; i++) {
+            if (i - 1 < testType.length) {
+                content = content.replaceAll("=data-in-param-" + i + "=", getEnumText(testType[i - 1]));
+            }
+            else {
+                content = content.replaceAll("=data-in-param-" + i + "=", getEnumText(testType[testType.length - 1]));
+            }
+        }
+
+        Writer writer = new FileWriter(new URI(appPath).getPath());
+        IOUtils.copyCharStream(new StringReader(content), writer);
+        conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("nameNode", "hdfs://localhost:9000");
+        conf.set("queueName", "default");
+        conf.set("jobTracker", "localhost:9001");
+        conf.set("examplesRoot", "examples");
+
+        String coordId = null;
+
+        try {
+            coordId = new CoordSubmitXCommand(conf).call();
+        }
+        catch (CommandException e) {
+            e.printStackTrace();
+            fail("should not throw exception " + e.getMessage());
+        }
+        return coordId;
+    }
+
+    public String getWFPath() throws Exception {
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        String appXml = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='map-reduce-wf'> " + "<start to='end' /> "
+                + "<end name='end' /> " + "</workflow-app>";
+
+        writeToFile(appXml, workflowUri);
+        return workflowUri;
+    }
+
+    private void writeToFile(String appXml, String appPath) throws IOException {
+        File wf = new File(URI.create(appPath));
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(wf));
+            out.println(appXml);
+        }
+        catch (IOException iOException) {
+            throw iOException;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+
+    public void checkDataSets(String dataSets, String... values) {
+
+        Set<String> inputDataSets = new HashSet<String>();
+        for (String dataSet : dataSets.split(",")) {
+            if (dataSet.indexOf(getTestCaseDir()) >= 0) {
+                inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+            }
+            else {
+                inputDataSets.add(dataSet);
+            }
+        }
+
+        for (String value : values) {
+            assertTrue(inputDataSets.contains(value.replace("/_SUCCESS","")));
+        }
+
+    }
+
+    private void startCoordAction(final String jobId) throws CommandException, JPAExecutorException {
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        new CoordPushDependencyCheckXCommand(jobId + "@1").call();
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+        waitFor(50 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+                return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+            }
+        });
+
+        CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+                jobId + "@1");
+        assertFalse("Action status should not be waiting",
+                actionBean.getStatus().equals(CoordinatorAction.Status.WAITING));
+
+        waitFor(50 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+                return !actionBean.getStatus().equals(CoordinatorAction.Status.READY);
+            }
+        });
+        CoordinatorJob coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+        new CoordActionStartXCommand(actionBean.getId(), coordJob.getUser(), coordJob.getAppName(),
+                actionBean.getJobId()).call();
+    }
+
+    @SuppressWarnings("unchecked")
+    public Configuration getActionConf(CoordinatorActionBean actionBean) throws JDOMException {
+        Configuration conf = new XConfiguration();
+        Element eAction = XmlUtils.parseXml(actionBean.getActionXml());
+        Element configElem = eAction.getChild("action", eAction.getNamespace())
+                .getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
+        List<Element> elementList = configElem.getChildren("property", eAction.getNamespace());
+        for (Element element : elementList) {
+            conf.set(((Element) element.getChildren().get(0)).getText(),
+                    ((Element) element.getChildren().get(1)).getText());
+        }
+        return conf;
+    }
+
+    private String getEnumText(TEST_TYPE testType) {
+        switch (testType) {
+            case LATEST_SINGLE:
+                return "<instance>\\${coord:latest(0)}</instance>";
+            case LATEST_RANGE:
+                return "<start-instance>\\${coord:latest(-5)}</start-instance>"
+                        + "<end-instance>\\${coord:latest(0)}</end-instance>";
+            case CURRENT_SINGLE:
+                return "<instance>\\${coord:current(0)}</instance>";
+            case CURRENT_RANGE:
+                return "<start-instance>\\${coord:current(-5)}</start-instance>"
+                        + "<end-instance>\\${coord:current(0)}</end-instance>";
+        }
+        return "";
+
+    }
+
+    public List<String> createDirWithTime(String dirPrefix, Date date, int... hours) {
+
+        SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd");
+
+        TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+        sd.setTimeZone(tzUTC);
+        List<String> createdDirPath = new ArrayList<String>();
+
+        for (int hour : hours) {
+            createdDirPath
+                    .add(createTestCaseSubDir((dirPrefix + sd.format(new Date(date.getTime() - hour * TIME_DAYS)) + "/_SUCCESS")
+                            .split("/")));
+        }
+        return createdDirPath;
+    }
+
+    public List<String> createPartitionWithTime(String database, Date date, int... hours) throws Exception {
+
+        List<String> createdPartition = new ArrayList<String>();
+        SimpleDateFormat sd = new SimpleDateFormat("yyyyMMdd");
+        TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+        sd.setTimeZone(tzUTC);
+        for (int hour : hours) {
+            createdPartition.add(addPartition(database, "table1",
+                    "dt=" + sd.format(new Date(date.getTime() - hour * TIME_DAYS)) + ";country=usa"));
+
+        }
+        return createdPartition;
+    }
+
+    protected String addPartition(String db, String table, String partitionSpec) throws Exception {
+        super.addPartition(db, table, partitionSpec);
+        return "hcat://" + server + "/" + db + "/" + table + "/" + partitionSpec;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/81ce22b6/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
new file mode 100644
index 0000000..0679c8c
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/coord/input/logic/TestCoordinatorInputLogic.java
@@ -0,0 +1,1054 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord.input.logic;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringReader;
+import java.io.Writer;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.CoordinatorJob;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
+import org.apache.oozie.command.coord.CoordActionStartXCommand;
+import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
+import org.apache.oozie.command.coord.CoordSubmitXCommand;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
+import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.jdom.JDOMException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCoordinatorInputLogic extends XDataTestCase {
+    private Services services;
+
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        services = new Services();
+        services.init();
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        services.destroy();
+        super.tearDown();
+    }
+
+
+    @Test(expected = CommandException.class)
+    public void testValidateRange() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<combine name=\"test\">"+
+                        "<data-in dataset=\"A\" />"+
+                        "<data-in dataset=\"b\" />"+
+         "</combine>";
+        String inputEvent =
+        "<data-in name=\"A\" dataset=\"a\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+        "</data-in>" +
+        "<data-in name=\"B\" dataset=\"b\">" +
+                    "<start-instance>${coord:current(-4)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+       "</data-in>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        try {
+            _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic, inputEvent, true);
+            fail();
+        }
+        catch (CommandException e) {
+            assertEquals(e.getErrorCode(), ErrorCode.E0803);
+        }
+    }
+
+    public void testDryRun() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                "<and>"+
+                    "<or>"+
+                        "<data-in dataset=\"A\" />"+
+                        "<data-in dataset=\"B\" />"+
+                    "</or>"+
+                    "<or>"+
+                        "<data-in dataset=\"C\" />"+
+                        "<data-in dataset=\"D\" />"+
+                    "</or>"+
+                "</and>"+
+                "<and>"+
+                    "<data-in dataset=\"A\" />"+
+                    "<data-in dataset=\"B\" />"+
+                "</and>"+
+         "</or>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic, "", true);
+
+    }
+
+    public void testNestedCondition() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+                "<or name=\"test\">"+
+                        "<and>"+
+                            "<or>"+
+                                "<data-in dataset=\"A\" />"+
+                                "<data-in dataset=\"B\" />"+
+                            "</or>"+
+                        "<or>"+
+                            "<data-in dataset=\"C\" />"+
+                            "<data-in dataset=\"D\" />"+
+                        "</or>"+
+                        "</and>"+
+                            "<and>"+
+                                "<data-in dataset=\"A\" />"+
+                                "<data-in dataset=\"B\" />"+
+                             "</and>"+
+                        "</or>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+
+        final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+        checkDataSets(dataSets, input1, input2);
+
+    }
+
+    public void testNestedCondition1() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+              "<or>"+
+                  "<and>" +
+                          "<data-in dataset=\"A\"/>"+
+                          "<data-in dataset=\"B\"/>"+
+                   "</and>" +
+                   "<and>"+
+                          "<data-in dataset=\"C\"/>"+
+                          "<data-in dataset=\"D\"/>"+
+                   "</and>"+
+             "</or>"+
+             "<and>"+
+                 "<data-in dataset=\"E\"/>"+
+                 "<data-in dataset=\"F\"/>"+
+             "</and>"+
+         "</and>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 4);
+        checkDataSets(dataSets, input1, input2, input3, input4);
+
+    }
+
+    public void testNestedCondition2() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"${partitionName}\">"+
+                  "<and>" +
+                          "<data-in dataset=\"A\" />"+
+                          "<data-in dataset=\"B\" />"+
+                          "<data-in dataset=\"C\" />"+
+                          "<data-in dataset=\"D\" />"+
+                   "</and>" +
+                   "<and>"+
+                          "<data-in dataset=\"E\" />"+
+                          "<data-in dataset=\"F\" />"+
+                   "</and>"+
+         "</or>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/c/2014/10/08/00/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+        checkDataSets(dataSets, input4, input5);
+        checkDataSetsForFalse(dataSets, input1, input2, input3);
+
+    }
+
+    public void testNestedCondition3() throws Exception {
+        Configuration conf = getConf();
+
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+                  "<and>" +
+                          "<data-in dataset=\"A\" />"+
+                          "<data-in dataset=\"B\" />"+
+                   "</and>" +
+                   "<and>"+
+                          "<data-in dataset=\"C\" />"+
+                          "<data-in dataset=\"D\" />"+
+                   "</and>"+
+                   "<and>"+
+                       "<data-in dataset=\"E\" />"+
+                       "<data-in dataset=\"F\" />"+
+                   "</and>"+
+         "</and>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        final String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/c/2014/10/08/00/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/d/2014/10/08/00/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/e/2014/10/08/00/_SUCCESS".split("/"));
+        String input6 = createTestCaseSubDir("input-data/f/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 6);
+        checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+
+    }
+
+    public void testSimpleOr() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                  "<data-in dataset=\"A\" />"+
+                  "<data-in dataset=\"B\" />"+
+         "</or>";
+        //@formatter:on
+        conf.set("partitionName", "test");
+        String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 1);
+        checkDataSets(dataSets, input1);
+    }
+
+    public void testSimpleOr1() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                  "<and>" +
+                          "<data-in dataset=\"C\" />"+
+                          "<data-in dataset=\"D\" />"+
+                   "</and>" +
+                   "<or>"+
+                          "<data-in dataset=\"A\" />"+
+                          "<data-in dataset=\"B\" />"+
+                   "</or>"+
+          "</or>";
+
+        String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        String input1=createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+        startCoordAction(jobId);
+        actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 1);
+        checkDataSets(dataSets, input1);
+
+    }
+
+    public void testOrWithMin() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                   "<data-in dataset=\"A\" min=\"3\"/>"+
+                   "<data-in dataset=\"B\" min=\"3\"/>"+
+        "</or>";
+        //@formatter:on
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 3);
+        checkDataSets(dataSets, input3, input4, input5);
+    }
+
+    public void testAndWithMin() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+                   "<data-in dataset=\"A\" min=\"2\"/>"+
+                   "<data-in dataset=\"B\" min=\"3\"/>"+
+                   "<data-in dataset=\"C\" min=\"0\"/>"+
+
+        "</and>";
+        //@formatter:on
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 5);
+        checkDataSets(dataSets, input1, input2, input3, input4, input5, input5);
+    }
+
+    public void testMultipleInstance() throws Exception {
+        Configuration conf = getConf();
+        Date now = new Date();
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+                   "<data-in dataset=\"A\" min=\"2\"/>"+
+                   "<data-in dataset=\"B\"/>"+
+
+        "</and>";
+        String event =
+                "<data-in name=\"A\" dataset=\"a\">" +
+                        "<instance>${coord:current(-5)}</instance>" +
+                        "<instance>${coord:latest(-1)}</instance>" +
+                        "<instance>${coord:futureRange(0,2,10)}</instance>" +
+                 "</data-in>" +
+                 "<data-in name=\"B\" dataset=\"b\">" +
+                     "<instance>${coord:latest(0)}</instance>" +
+                     "<instance>${coord:latestRange(-3,0)}</instance>" +
+                 "</data-in>" ;
+
+        //@formatter:on
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+        // 5 hour before
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+        // 5 hour before
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, event);
+
+        List<String> inputDir = createDirWithTime("input-data/a/", now, 3, 5, 0, -1, -2);
+        inputDir.addAll(createDirWithTime("input-data/b/", now, 0, 1));
+
+        startCoordActionForWaiting(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertTrue(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+
+        inputDir.addAll(createDirWithTime("input-data/b/", now, 2, 3));
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 10);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+    }
+
+    public void testAnd() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<and name=\"test\">"+
+                  "<data-in dataset=\"A\"/>"+
+                  "<data-in dataset=\"B\"/>"+
+         "</and>";
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/b/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+        checkDataSets(dataSets, input1, input2);
+
+    }
+
+    public void testCombine() throws Exception {
+
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<combine name=\"test\">"+
+                   "<data-in dataset=\"A\" />"+
+                   "<data-in dataset=\"B\" />"+
+        "</combine>";
+        //@formatter:on
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+        String input6 = createTestCaseSubDir("input-data/b/2014/10/07/19/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 6);
+        checkDataSets(dataSets, input1, input2, input3, input4, input5, input6);
+    }
+
+    public void testCombineNegative() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+         String inputLogic =
+         "<combine name=\"test\">"+
+                    "<data-in dataset=\"A\" />"+
+                    "<data-in dataset=\"B\" />"+
+         "</combine>";
+         //@formatter:on
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        final String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+        createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        waitFor(5 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+                return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+            }
+        });
+
+        CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+                jobId + "@1");
+        assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+
+    }
+
+    public void testSingeSetWithMin() throws Exception {
+        Configuration conf = getConf();
+        //@formatter:off
+        String inputLogic =
+        "<or name=\"test\">"+
+                     "<data-in dataset=\"A\" min=\"3\" />"+
+        "</or>";
+        //@formatter:on
+
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        // dataset with gap
+        String input3 = createTestCaseSubDir("input-data/a/2014/10/07/19/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 3);
+        checkDataSets(dataSets, input1, input2, input3);
+    }
+
+    public void testCombineWithMin() throws Exception {
+        Configuration conf = getConf();
+        String inputLogic =
+        //@formatter:off
+        "<combine name=\"test\" min=\"4\">"+
+                   "<data-in dataset=\"A\" />"+
+                   "<data-in dataset=\"B\" />"+
+        "</combine>";
+        //@formatter:on
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        final String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+        String input2 = createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+        String input3 = createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+        String input4 = createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+        String input5 = createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 5);
+        checkDataSets(dataSets, input1, input2, input3, input4, input5);
+
+    }
+
+    public void testMinWait() throws Exception {
+        Configuration conf = getConf();
+        Date now = new Date();
+        String inputLogic =
+        //@formatter:off
+        "<combine name=\"test\" min= \"4\" wait=\"1\">"+
+                   "<data-in dataset=\"A\" />"+
+                   "<data-in dataset=\"B\" />"+
+        "</combine>";
+        //@formatter:on
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+        // 5 hour before
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+        // 5 hour before
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+        startCoordActionForWaiting(jobId);
+        // wait for 1 min
+        sleep(60 * 1000);
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 5);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+    }
+
+    public void testWait() throws Exception {
+        Configuration conf = getConf();
+        Date now = new Date();
+        String inputLogic =
+        //@formatter:off
+        "<combine name=\"test\" wait=\"1\">"+
+                   "<data-in dataset=\"A\" />"+
+                   "<data-in dataset=\"B\" />"+
+        "</combine>";
+        //@formatter:on
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        List<String> inputDir = createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+        startCoordActionForWaiting(jobId);
+        // wait for 1 min
+        sleep(60 * 1000);
+
+        inputDir.addAll(createDirWithTime("input-data/b/", now, 5));
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 6);
+        checkDataSets(dataSets, inputDir.toArray(new String[inputDir.size()]));
+    }
+
+    public void testWaitFail() throws Exception {
+        Configuration conf = getConf();
+        Date now = new Date();
+        String inputLogic =
+        //@formatter:off
+                "<or name=\"test\" min=\"${min}\" wait=\"${wait}\">"+
+                           "<data-in dataset=\"${dataA}\" />"+
+                           "<data-in dataset=\"${dataB}\" />"+
+                "</or>";
+        //@formatter:on
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("min", "4");
+        conf.set("wait", "180");
+        conf.set("dataA", "A");
+        conf.set("dataB", "B");
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+        String jobId = _testCoordSubmit("coord-inputlogic-range.xml", conf, inputLogic, getInputEventForRange());
+
+        createDirWithTime("input-data/b/", now, 0, 1, 2, 3, 4);
+
+        startCoordActionForWaiting(jobId);
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertTrue(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+    }
+
+    public void testLatest() throws Exception {
+
+        Configuration conf = getConf();
+        conf.set("initial_instance_a", "2014-10-07T00:00Z");
+        conf.set("initial_instance_b", "2014-10-07T00:00Z");
+
+        String inputLogic = "<data-in name=\"test\" dataset=\"A\"/>";
+        String jobId = _testCoordSubmit("coord-inputlogic-latest.xml", conf, inputLogic);
+
+        String input1 = createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 1);
+        checkDataSets(dataSets, input1);
+
+    }
+
+    public void testLatestRange() throws Exception {
+
+        Configuration conf = getConf();
+        Date now = new Date();
+        conf.set("start_time", DateUtils.formatDateOozieTZ(now));
+        conf.set("end_time", DateUtils.formatDateOozieTZ(new Date(now.getTime() + 3 * 60 * 60 * 1000)));
+        conf.set("initial_instance_a", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+        conf.set("initial_instance_b", DateUtils.formatDateOozieTZ(new Date(now.getTime() - 5 * 60 * 60 * 1000)));
+
+        String inputLogic =
+        //@formatter:off
+              "<data-in name=\"test\" dataset=\"A\" min =\"2\" />";
+        //@formatter:on
+        String jobId = _testCoordSubmit("coord-inputlogic-range-latest.xml", conf, inputLogic);
+
+        createDirWithTime("input-data/a/", now, 0, 1);
+
+        startCoordAction(jobId);
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+
+        assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+        XConfiguration runConf = new XConfiguration(new StringReader(actionBean.getRunConf()));
+        String dataSets = runConf.get("inputLogicData");
+        assertEquals(dataSets.split(",").length, 2);
+
+    }
+
+     //TODO combine support for unresolved
+     // public void testLatestWithCombine() throws Exception {
+     // Configuration conf = getConf();
+     // conf.set("input_check", "combine(\"A\", \"B\")");
+     // conf.set("initial_instance_a", "2014-10-07T00:00Z");
+     // conf.set("initial_instance_b", "2014-10-07T00:00Z");
+     //
+     // String jobId = _testCoordSubmit("coord-inputlogic-range-latest.xml", conf);
+     //
+     // new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+     // CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+     // CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+     // sleep(2000);
+     //
+     // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+     // assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+     // createTestCaseSubDir("input-data/a/2014/10/08/00/_SUCCESS".split("/"));
+     // createTestCaseSubDir("input-data/a/2014/10/07/23/_SUCCESS".split("/"));
+     // createTestCaseSubDir("input-data/a/2014/10/07/22/_SUCCESS".split("/"));
+     // createTestCaseSubDir("input-data/b/2014/10/07/21/_SUCCESS".split("/"));
+     //
+     // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+     //
+     // actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+     // assertEquals(actionBean.getStatus(), CoordinatorAction.Status.WAITING);
+     //
+     // createTestCaseSubDir("input-data/b/2014/10/07/20/_SUCCESS".split("/"));
+     // new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+     //
+     // actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+     // assertFalse(CoordinatorAction.Status.WAITING.equals(actionBean.getStatus()));
+     //
+     // }
+    public void testCoordWithoutInputCheck() throws Exception {
+        Configuration conf = new XConfiguration();
+        String jobId = setupCoord(conf, "coord-multiple-input-instance3.xml");
+        sleep(1000);
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+
+        CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+                jobId + "@1");
+
+        assertEquals(actionBean.getMissingDependencies(), "!!${coord:latest(0)}#${coord:latest(-1)}");
+
+    }
+
+    private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic) throws Exception {
+        return _testCoordSubmit(coordinatorXml, conf, inputLogic, "", false);
+    }
+
+    private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, String inputEvent)
+            throws Exception {
+        return _testCoordSubmit(coordinatorXml, conf, inputLogic, inputEvent, false);
+    }
+
+    private String _testCoordSubmit(String coordinatorXml, Configuration conf, String inputLogic, String inputEvent,
+            boolean dryRun) throws Exception {
+        String appPath = "file://" + getTestCaseDir() + File.separator + "coordinator.xml";
+
+        String content = IOUtils.getResourceAsString(coordinatorXml, -1);
+        content = content.replace("=input-logic=", inputLogic);
+        content = content.replace("=input-events=", inputEvent);
+
+        Writer writer = new FileWriter(new URI(appPath).getPath());
+        IOUtils.copyCharStream(new StringReader(content), writer);
+        conf.set(OozieClient.COORDINATOR_APP_PATH, appPath);
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        conf.set("nameNode", "hdfs://localhost:9000");
+        conf.set("queueName", "default");
+        conf.set("jobTracker", "localhost:9001");
+        conf.set("examplesRoot", "examples");
+
+        return new CoordSubmitXCommand(dryRun, conf).call();
+    }
+
+    private Configuration getConf() throws Exception {
+        Configuration conf = new XConfiguration();
+        conf.set("data_set_a", "file://" + getTestCaseDir() + "/input-data/a");
+        conf.set("data_set_b", "file://" + getTestCaseDir() + "/input-data/b");
+        conf.set("data_set_c", "file://" + getTestCaseDir() + "/input-data/c");
+        conf.set("data_set_d", "file://" + getTestCaseDir() + "/input-data/d");
+        conf.set("data_set_e", "file://" + getTestCaseDir() + "/input-data/e");
+        conf.set("data_set_f", "file://" + getTestCaseDir() + "/input-data/f");
+        conf.set("partitionName", "test");
+
+        conf.set("start_time", "2014-10-08T00:00Z");
+        conf.set("end_time", "2015-10-08T00:00Z");
+        conf.set("initial_instance_a", "2014-10-08T00:00Z");
+        conf.set("initial_instance_b", "2014-10-08T00:00Z");
+        conf.set("wfPath", getWFPath());
+        return conf;
+
+    }
+
+    public String getWFPath() throws Exception {
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        String appXml = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='map-reduce-wf'> " + "<start to='end' /> "
+                + "<end name='end' /> " + "</workflow-app>";
+
+        writeToFile(appXml, workflowUri);
+        return workflowUri;
+
+    }
+
+    private void writeToFile(String appXml, String appPath) throws IOException {
+        File wf = new File(URI.create(appPath));
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(wf));
+            out.println(appXml);
+        }
+        catch (IOException iex) {
+            throw iex;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+
+    public void checkDataSets(String dataSets, String... values) {
+
+        Set<String> inputDataSets = new HashSet<String>();
+        for (String dataSet : dataSets.split(",")) {
+            inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+        }
+
+        for (String value : values) {
+            assertTrue(inputDataSets.contains(value.replace("/_SUCCESS","")));
+        }
+    }
+
+    public void checkDataSetsForFalse(String dataSets, String... values) {
+
+        Set<String> inputDataSets = new HashSet<String>();
+        for (String dataSet : dataSets.split(",")) {
+            inputDataSets.add(dataSet.substring(dataSet.indexOf(getTestCaseDir())));
+        }
+
+        for (String value : values) {
+            assertFalse(inputDataSets.contains(value));
+        }
+
+    }
+
+    private void startCoordAction(final String jobId) throws CommandException, JPAExecutorException {
+        startCoordAction(jobId, CoordinatorAction.Status.WAITING);
+
+    }
+
+    private void startCoordAction(final String jobId, final CoordinatorAction.Status coordActionStatus)
+            throws CommandException, JPAExecutorException {
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        waitFor(50 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+                return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+            }
+        });
+
+        CoordinatorAction actionBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION,
+                jobId + "@1");
+        assertFalse(actionBean.getStatus().equals(coordActionStatus));
+
+        CoordinatorJob coordJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+
+        new CoordActionStartXCommand(actionBean.getId(), coordJob.getUser(), coordJob.getAppName(),
+                actionBean.getJobId()).call();
+    }
+
+    private void startCoordActionForWaiting(final String jobId) throws CommandException, JPAExecutorException,
+            JDOMException {
+        new CoordMaterializeTransitionXCommand(jobId, 3600).call();
+
+        new CoordActionInputCheckXCommand(jobId + "@1", jobId).call();
+        waitFor(5 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                        CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+                return !actionBean.getStatus().equals(CoordinatorAction.Status.WAITING);
+            }
+        });
+
+        CoordinatorActionBean actionBean = CoordActionQueryExecutor.getInstance().get(
+                CoordActionQuery.GET_COORD_ACTION, jobId + "@1");
+        assertTrue("should be waiting", actionBean.getStatus().equals(CoordinatorAction.Status.WAITING));
+    }
+
+    private String setupCoord(Configuration conf, String coordFile) throws CommandException, IOException {
+        File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+        Reader reader = IOUtils.getResourceAsReader(coordFile, -1);
+        Writer writer = new FileWriter(appPathFile);
+        conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+        IOUtils.copyCharStream(reader, writer);
+        sc = new CoordSubmitXCommand(conf);
+        return sc.call();
+
+    }
+
+    private String getInputEventForRange() {
+        //@formatter:off
+        return
+                "<data-in name=\"A\" dataset=\"a\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>" +
+                "<data-in name=\"B\" dataset=\"b\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>" +
+                "<data-in name=\"C\" dataset=\"c\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance> " +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>" +
+                "<data-in name=\"D\" dataset=\"d\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>" +
+                "<data-in name=\"E\" dataset=\"e\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance>" +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>" +
+                "<data-in name=\"F\" dataset=\"f\">" +
+                    "<start-instance>${coord:current(-5)}</start-instance> " +
+                    "<end-instance>${coord:current(0)}</end-instance>" +
+                "</data-in>";
+        //@formatter:on
+    }
+
+    public List<String> createDirWithTime(String dirPrefix, Date date, int... hours) {
+
+        SimpleDateFormat sd = new SimpleDateFormat("yyyy/MM/dd/HH");
+
+        TimeZone tzUTC = TimeZone.getTimeZone("UTC");
+        sd.setTimeZone(tzUTC);
+        List<String> createdDirPath = new ArrayList<String>();
+
+        for (int hour : hours) {
+            createdDirPath.add(createTestCaseSubDir((dirPrefix
+                    + sd.format(new Date(date.getTime() - hour * 60 * 60 * 1000)) + "/_SUCCESS").split("/")));
+        }
+        return createdDirPath;
+    }
+
+}