You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:14 UTC

[15/27] adding falcon-regression

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
new file mode 100644
index 0000000..515b18e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance kill tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceKillsTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private String testDir = "/ProcessInstanceKillsTest";
+    private String baseTestHDFSDir = baseHDFSDir + testDir;
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private String feedInputPath = baseTestHDFSDir +
+        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle b = BundleUtil.readELBundle();
+        b.generateUniqueBundle();
+        b = new Bundle(b, cluster);
+
+        String startDate = "2010-01-01T23:20Z";
+        String endDate = "2010-01-02T01:21Z";
+
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setInputFeedDataPath(feedInputPath);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        LOGGER.info("tearDown " + method.getName());
+        removeBundles();
+    }
+
+    /**
+     * Schedule process. Perform -kill action using only -start parameter. Check that action
+     * succeeded and only one instance was killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillSingle() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
+    }
+
+    /**
+     * Schedule process. Check that in case when -start and -end parameters are equal -kill
+     * action results in the same way as in case with only -start parameter is used. Only one
+     * instance should be killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillStartAndEndSame() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z");
+        bundles[0].setProcessConcurrency(2);
+        bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(10);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
+        InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
+    }
+
+    /**
+     * Schedule process. Perform -kill action on instances between -start and -end dates which
+     * expose range of last 3 instances which have been materialized already and those which
+     * should be. Check that only existent instances are killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillKillNonMatrelized() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z");
+        bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
+        InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+        LOGGER.info(r.toString());
+    }
+
+    /**
+     * Generate data. Schedule process. Try to perform -kill
+     * operation using -start and -end which are both in future with respect to process start.
+     *
+     * @throws Exception TODO amend test with validations
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillBothStartAndEndInFuture01() throws Exception {
+        /*
+        both start and end r in future with respect to process start end
+         */
+        String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+        String endTime = TimeUtil.getTimeWrtSystemTime(400);
+        String startTimeData = TimeUtil.getTimeWrtSystemTime(-50);
+        String endTimeData = TimeUtil.getTimeWrtSystemTime(50);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeData, endTimeData, 1);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
+            baseTestHDFSDir + "/input01", dataDates);
+        bundles[0].setInputFeedDataPath(feedInputPath.replace("input/","input01/"));
+        bundles[0].setProcessValidity(startTime, endTime);
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
+        String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=" + startTimeRequest + "&end=" + endTimeRequest);
+        LOGGER.info(r.toString());
+    }
+
+    /**
+     * Schedule process. Check that -kill action is not performed when time range between -start
+     * and -end parameters is in future and don't include existing instances.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillBothStartAndEndInFuture() throws Exception {
+        /*
+         both start and end r in future with respect to current time
+          */
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        String startTime = TimeUtil.getTimeWrtSystemTime(1);
+        String endTime = TimeUtil.getTimeWrtSystemTime(40);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=" + startTime + "&end=" + endTime);
+        LOGGER.info(r.getMessage());
+        Assert.assertEquals(r.getInstances(), null);
+    }
+
+    /**
+     * Schedule process. Perform -kill action within time range which includes 3 running instances.
+     * Get status of instances within wider range. Check that only mentioned 3 instances are
+     * killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillMultipleInstance() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
+        TimeUtil.sleepSeconds(15);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 2, 0, 0, 3);
+    }
+
+    /**
+     * Schedule process. Perform -kill action on last expected instance. Get status of instances
+     * which are in wider range. Check that only last is killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillLastInstance() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:20Z");
+        TimeUtil.sleepSeconds(15);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateResponse(result, 5, 4, 0, 0, 1);
+    }
+
+    /**
+     * Schedule process. Suspend one running instance. Perform -kill action on it. Check that
+     * mentioned instance is really killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillSuspended() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
+    }
+
+    /**
+     * Schedule single instance process. Wait till it finished. Try to kill the instance. Check
+     * that instance still succeeded.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceKillSucceeded() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
+            .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED);
+    }
+
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        LOGGER.info("in @AfterClass");
+        Bundle b = BundleUtil.readELBundle();
+        b = new Bundle(b, cluster);
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
new file mode 100644
index 0000000..8b40cf1
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Test Suite for instance rerun.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceRerunTest extends BaseTestClass {
+
+    private String baseTestDir = baseHDFSDir + "/ProcessInstanceRerunTest";
+    private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+    private String feedInputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath = baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedInputTimedOutPath =
+        baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        Bundle b = BundleUtil.readELBundle();
+
+        b = new Bundle(b, cluster);
+        String startDate = "2010-01-02T00:40Z";
+        String endDate = "2010-01-02T01:20Z";
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        LOGGER.info("tearDown " + method.getName());
+        removeBundles();
+    }
+
+    /**
+     * Schedule process. Kill some instances. Rerun some of that killed. Check that
+     * instances were rerun correctly and other are still killed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunSomeKilled02() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(10);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
+        InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
+        List<String> wfIDs =
+            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
+    }
+
+    /**
+     * Schedule process. Kill all instances. Rerun them. Check that they were rerun.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunMultipleKilled() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        LOGGER.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+        List<String> wfIDs =
+            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
+    }
+
+    /**
+     * Schedule process. Kill some instances. Rerun them. Check that there are no killed
+     * instances left.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunSomeKilled01() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(5);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+        List<String> wfIDs =
+            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        TimeUtil.sleepSeconds(5);
+        InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
+    }
+
+    /**
+     * Schedule process. Kill single instance. Rerun it. Check it was rerun.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunSingleKilled() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        prism.getProcessHelper()
+            .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        String wfID = InstanceUtil.getWorkflows(cluster,
+            Util.getProcessName(bundles[0].getProcessData()), Status.KILLED).get(0);
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
+    }
+
+    /**
+     * Schedule process. Wait till it got succeeded. Rerun first succeeded instance. Check if it
+     * is running.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunSingleSucceeded() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        String wfID = InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0]
+            .getProcessData()), Status.RUNNING, Status.SUCCEEDED).get(0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 0, CoordinatorAction
+            .Status.SUCCEEDED, EntityType.PROCESS);
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
+    }
+
+    /**
+     * Schedule process. Suspend its instances. Try to rerun them. Check that instances weren't
+     * rerun and are still suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunSingleSuspended() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:06Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(2);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+        Assert.assertEquals(InstanceUtil
+            .getInstanceStatus(cluster, Util.getProcessName(bundles[0].getProcessData()), 0, 1),
+            CoordinatorAction.Status.SUSPENDED);
+    }
+
+    /**
+     * Schedule process. Wait till its instances succeed. Rerun them all. Check they are running.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunMultipleSucceeded() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+            .getProcessData()), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+        List<String> wfIDs =
+            InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
+    }
+
+    /**
+     * Schedule process with invalid input feed data path. Wait till process got timed-out. Rerun
+     * it's instances. Check that they were rerun and are waiting (wait for input data).
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceRerunTimedOut() throws Exception {
+        bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        CoordinatorAction.Status s;
+        InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
+            .getProcessData()), 1, CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
+        prism.getProcessHelper()
+            .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        s = InstanceUtil
+            .getInstanceStatus(cluster, Util.readEntityName(bundles[0].getProcessData()), 0, 0);
+        Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
+            "instance should have been in WAITING state");
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        LOGGER.info("in @AfterClass");
+        Bundle b = BundleUtil.readELBundle();
+        b = new Bundle(b, cluster);
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
new file mode 100644
index 0000000..72457de
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance resume tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceResumeTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceResumeTest";
+    private String feedInputPath = baseTestHDFSDir +
+        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceResumeTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        Bundle b = BundleUtil.readELBundle();
+        b = new Bundle(b, cluster);
+        b = new Bundle(b, cluster);
+        String startDate = "2010-01-01T23:20Z";
+        String endDate = "2010-01-02T01:40Z";
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("setup " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown(Method method) {
+        LOGGER.info("tearDown " + method.getName());
+        removeBundles();
+    }
+
+    /**
+     * Schedule process. Suspend some instances. Attempt to -resume instance using single -end
+     * parameter results in failure.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeOnlyEnd() throws Exception {
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+        TimeUtil.sleepSeconds(10);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+        result = prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?end=2010-01-02T01:15Z");
+        InstanceUtil.validateSuccessWithStatusCode(result, ResponseKeys.UNPARSEABLE_DATE);
+    }
+
+    /**
+     * Schedule process. Suspend some instances. Try to perform -resume using time range which
+     * effects only on one instance. Check that this instance was resumed es expected.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeResumeSome() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+        TimeUtil.sleepSeconds(10);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+    }
+
+    /**
+     * Schedule process. Suspend some instances. Try to perform -resume using time range which
+     * effects on all instances. Check that there are no suspended instances.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeResumeMany() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+        TimeUtil.sleepSeconds(15);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+    }
+
+    /**
+     * Schedule process. Suspend first instance. Resume that instance using only -start parameter.
+     * Check that mentioned instance was resumed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeSingle() throws Exception {
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(5);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        TimeUtil.sleepSeconds(5);
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        TimeUtil.sleepSeconds(5);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Attempt to resume instances of non-existent process should fail with an appropriate
+     * status code.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeNonExistent() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r =
+            prism.getProcessHelper()
+                .getProcessInstanceResume("invalidName",
+                    "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
+    }
+
+    /**
+     * Attempt to perform -resume action without time range parameters should fail with an
+     + appropriate status code or message.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeNoParams() throws Exception {
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r =
+                prism.getProcessHelper().getProcessInstanceResume(
+                        Util.readEntityName(bundles[0].getProcessData()), null);
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+    }
+
+    /**
+     * Schedule process, remove it. Try to -resume it's instance. Attempt should fail with
+     * an appropriate status code.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeDeleted() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
+    }
+
+    /**
+     * Schedule process. Try to resume entity which wasn't suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeNonSuspended() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z");
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z");
+        InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
+        result = prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z");
+        InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
+    }
+
+    /**
+     * Schedule process. Suspend last instance. Resume it using parameter which points to
+     * expected materialization time of last instance. Check that there are no suspended
+     * instances among all which belong to current process.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeLastInstance() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:25Z");
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:25Z");
+        InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:25Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:25Z");
+        InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+    }
+
+    /**
+     * Schedule process. Suspend all instances except the first and the last using appropriate
+     * -start/-end parameters. Resume that instances. Check that there are no suspended ones.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceResumeWithinRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(6);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        prism.getProcessHelper()
+            .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+        TimeUtil.sleepSeconds(15);
+        InstancesResult result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+        prism.getProcessHelper()
+            .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+        result = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+        InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+    }
+
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        LOGGER.info("in @AfterClass");
+
+        Bundle b = BundleUtil.readELBundle();
+        b = new Bundle(b, cluster);
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
new file mode 100644
index 0000000..559df48
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Regression for instance running api.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceRunningTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceRunningTest";
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private String feedInputPath = baseTestHDFSDir +
+        "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRunningTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle.generateUniqueBundle();
+        bundle = new Bundle(bundle, cluster);
+
+        String startDate = "2010-01-02T00:40Z";
+        String endDate = "2010-01-02T01:11Z";
+
+        bundle.setInputFeedDataPath(feedInputPath);
+        String prefix = bundle.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * Run process. Suspend it and then resume. Get all -running instances. Response should
+     * contain all process instances.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getResumedProcessInstance() throws Exception {
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+            bundles[0].getProcessData()));
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().resume(URLS.RESUME_URL,
+            bundles[0].getProcessData()));
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Run process. Suspend it. Try to get -running instances. Response should be
+     * successful but shouldn't contain any instance.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getSuspendedProcessInstance() throws Exception {
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+            bundles[0].getProcessData()));
+        TimeUtil.sleepSeconds(5);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccessWOInstances(r);
+    }
+
+    /**
+     * Run process. Get -running instances. Check that response contains expected number of
+     * instances.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getRunningProcessInstance() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Attempt to get -running instances of nonexistent process should result in error.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getNonExistenceProcessInstance() throws Exception {
+        InstancesResult r =
+            prism.getProcessHelper()
+                .getRunningInstance(URLS.INSTANCE_RUNNING, "invalidName");
+        Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
+            "Unexpected status code");
+    }
+
+    /**
+     * Attempt to get -running instances of deleted process should result in error.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getKilledProcessInstance() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+        InstancesResult r = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
+            "Unexpected status code");
+    }
+
+    /**
+     * Launch process and wait till it got succeeded. Try to get -running instances. Response
+     * should reflect success but shouldn't contain any of instances.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void getSucceededProcessInstance() throws Exception {
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
+            .getProcessData()), Job.Status.SUCCEEDED);
+        InstancesResult result = prism.getProcessHelper()
+            .getRunningInstance(URLS.INSTANCE_RUNNING,
+                Util.readEntityName(bundles[0].getProcessData()));
+        InstanceUtil.validateSuccessWOInstances(result);
+    }
+
+
+    @AfterClass(alwaysRun = true)
+    public void deleteData() throws Exception {
+        LOGGER.info("in @AfterClass");
+        Bundle b = BundleUtil.readELBundle();
+        b = new Bundle(b, cluster);
+        b.setInputFeedDataPath(feedInputPath);
+        String prefix = b.getFeedDataPathPrefix();
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
new file mode 100644
index 0000000..86f7a86
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Process instance status tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceStatusTest extends BaseTestClass {
+
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceStatusTest";
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+    private String feedInputPath =
+        baseTestHDFSDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputPath =
+        baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedInputTimedOutPath =
+        baseTestHDFSDir + "/timedoutStatus/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private String feedOutputTimedOutPath =
+        baseTestHDFSDir + "/output-data/timedoutStatus/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+    private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class);
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle.generateUniqueBundle();
+        bundle = new Bundle(bundle, cluster);
+
+        String startDate = "2010-01-01T23:40Z";
+        String endDate = "2010-01-02T02:40Z";
+
+        bundle.setInputFeedDataPath(feedInputPath);
+        String prefix = bundle.getFeedDataPathPrefix();
+
+        HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+        List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+        HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+    }
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setInputFeedDataPath(feedInputPath);
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    /**
+     * time out is set as 3 minutes .... getStatus is for a large range in past.
+     * 6 instance should be materialized and one in running and other in waiting
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusStartAndEndCheckNoInstanceAfterEndDate()
+        throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
+        bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+        InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0);
+    }
+
+    /**
+     * Perform -getStatus using only -start parameter within time-range of non-materialized
+     * instances. There should be no instances returned in response.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusOnlyStartAfterMat() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
+        bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+        bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+        bundles[0].setProcessConcurrency(1);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T05:00Z");
+        AssertUtil.assertSucceeded(r);
+        Assert.assertEquals(r.getInstances(), null);
+    }
+
+    /**
+     * Schedule process. Perform -getStatus using -end parameter which is out of process
+     * validity range. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusEndOutOfRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, 400);
+    }
+
+    /**
+     * Schedule process and try to -getStatus without date parameters. Attempt should fail with
+     * an appropriate message.
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusDateEmpty()
+        throws JAXBException, AuthenticationException, IOException, URISyntaxException {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+    }
+
+    /**
+     * Schedule process with number of instances. Perform -getStatus request with valid
+     * parameters. Attempt should succeed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusStartAndEnd() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Schedule process. Perform -getStatus using -start parameter which is out of process
+     * validity range. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusStartOutOfRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, 400);
+    }
+
+    /**
+     * Schedule and then delete process. Try to get the status of its instances. Attempt should
+     * fail with an appropriate code.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusKilled() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+            bundles[0].getProcessData()));
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
+            Assert.assertTrue(false);
+        }
+    }
+
+    /**
+     * Schedule process and then suspend it. -getStatus of first instance only -start parameter.
+     * Instance should be suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusOnlyStartSuspended() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+            bundles[0].getProcessData()));
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
+    }
+
+    /**
+     * Schedule process. Try to -getStatus using -start/-end parameters with values which were
+     * reversed i.e. -start is further then -end. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusReverseDateRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:20Z&end=2010-01-02T01:07Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, 400);
+    }
+
+    /**
+     * Schedule process. Perform -getStatus using -start/-end parameters which are out of process
+     * validity range. Attempt should fail.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusStartEndOutOfRange() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(
+            feedOutputPath);
+        bundles[0].setProcessConcurrency(2);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+        InstanceUtil.validateSuccessWithStatusCode(r, 400);
+    }
+
+    /**
+     * Schedule process. Suspend and then resume it. -getStatus of its instances. Check that
+     * response reflects that instances are running.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusResumed() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputPath);
+        bundles[0].setProcessConcurrency(2);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.SUSPENDED);
+        prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+        TimeUtil.sleepSeconds(15);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Schedule process. -getStatus of it's first instance using only -start parameter which
+     * points to start time of process validity. Check that response reflects expected status of
+     * instance.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusOnlyStart() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+            Job.Status.RUNNING);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z");
+        InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
+    }
+
+    /**
+     * Schedule process. Try to perform -getStatus using valid -start parameter but invalid
+     * process name. Attempt should fail with an appropriate status code.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusInvalidName() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus("invalidProcess", "?start=2010-01-01T01:00Z");
+        if (!(r.getStatusCode() == ResponseKeys.PROCESS_NOT_FOUND)) {
+            Assert.assertTrue(false);
+        }
+    }
+
+    /**
+     * Schedule process. Suspend it. -getStatus of it's instances. Check if response reflects
+     * their status as suspended.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusSuspended() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        for (int i = 0; i < bundles[0].getClusters().size(); i++) {
+            LOGGER.info("cluster to be submitted: " + i + "  "
+                    + Util.prettyPrintXml(bundles[0].getClusters().get(i)));
+        }
+        bundles[0].submitFeedsScheduleProcess(prism);
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+                Job.Status.RUNNING);
+        AssertUtil.assertSucceeded(
+                prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+        AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+                Job.Status.SUSPENDED);
+        TimeUtil.sleepSeconds(15);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+        InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUSPENDED);
+    }
+
+    /**
+     * Schedule process. Try to -getStatus without time range parameters. Attempt should fails
+     * with an appropriate status code.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusWoParams() throws Exception {
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+        InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+    }
+
+    /**
+     * Schedule process with timeout set to 2 minutes. Wait till it become timed-out. -getStatus
+     * of that process. Check that all materialized instances are failed.
+     *
+     * @throws Exception
+     */
+    @Test(groups = {"singleCluster"})
+    public void testProcessInstanceStatusTimedOut() throws Exception {
+        bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
+        bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+        bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
+        bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+        bundles[0].setOutputFeedLocationData(feedOutputTimedOutPath);
+        bundles[0].setProcessConcurrency(3);
+        bundles[0].submitFeedsScheduleProcess(prism);
+        InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.readEntityName(bundles[0]
+            .getProcessData()), 1, Status.TIMEDOUT, EntityType.PROCESS);
+        InstancesResult r = prism.getProcessHelper()
+            .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+                "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+        InstanceUtil.validateFailedInstances(r, 3);
+    }
+}