You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:14 UTC
[15/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
new file mode 100644
index 0000000..515b18e
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance kill tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceKillsTest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private String testDir = "/ProcessInstanceKillsTest";
+ private String baseTestHDFSDir = baseHDFSDir + testDir;
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private String feedInputPath = baseTestHDFSDir +
+ "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath =
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+ Bundle b = BundleUtil.readELBundle();
+ b.generateUniqueBundle();
+ b = new Bundle(b, cluster);
+
+ String startDate = "2010-01-01T23:20Z";
+ String endDate = "2010-01-02T01:21Z";
+
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(Method method) {
+ LOGGER.info("tearDown " + method.getName());
+ removeBundles();
+ }
+
+ /**
+ * Schedule process. Perform -kill action using only -start parameter. Check that action
+ * succeeded and only one instance was killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillSingle() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
+ }
+
+ /**
+ * Schedule process. Check that in case when -start and -end parameters are equal -kill
+ * action results in the same way as in case with only -start parameter is used. Only one
+ * instance should be killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillStartAndEndSame() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z");
+ bundles[0].setProcessConcurrency(2);
+ bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(10);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
+ InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
+ }
+
+ /**
+ * Schedule process. Perform -kill action on instances between -start and -end dates which
+ * expose range of last 3 instances which have been materialized already and those which
+ * should be. Check that only existent instances are killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillKillNonMatrelized() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z");
+ bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
+ InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+ LOGGER.info(r.toString());
+ }
+
+ /**
+ * Generate data. Schedule process. Try to perform -kill
+ * operation using -start and -end which are both in future with respect to process start.
+ *
+ * @throws Exception TODO amend test with validations
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillBothStartAndEndInFuture01() throws Exception {
+ /*
+ both start and end r in future with respect to process start end
+ */
+ String startTime = TimeUtil.getTimeWrtSystemTime(-20);
+ String endTime = TimeUtil.getTimeWrtSystemTime(400);
+ String startTimeData = TimeUtil.getTimeWrtSystemTime(-50);
+ String endTimeData = TimeUtil.getTimeWrtSystemTime(50);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeData, endTimeData, 1);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.SINGLE_FILE,
+ baseTestHDFSDir + "/input01", dataDates);
+ bundles[0].setInputFeedDataPath(feedInputPath.replace("input/","input01/"));
+ bundles[0].setProcessValidity(startTime, endTime);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
+ String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=" + startTimeRequest + "&end=" + endTimeRequest);
+ LOGGER.info(r.toString());
+ }
+
+ /**
+ * Schedule process. Check that -kill action is not performed when time range between -start
+ * and -end parameters is in future and don't include existing instances.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillBothStartAndEndInFuture() throws Exception {
+ /*
+ both start and end r in future with respect to current time
+ */
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ String startTime = TimeUtil.getTimeWrtSystemTime(1);
+ String endTime = TimeUtil.getTimeWrtSystemTime(40);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=" + startTime + "&end=" + endTime);
+ LOGGER.info(r.getMessage());
+ Assert.assertEquals(r.getInstances(), null);
+ }
+
+ /**
+ * Schedule process. Perform -kill action within time range which includes 3 running instances.
+ * Get status of instances within wider range. Check that only mentioned 3 instances are
+ * killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillMultipleInstance() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
+ TimeUtil.sleepSeconds(15);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 2, 0, 0, 3);
+ }
+
+ /**
+ * Schedule process. Perform -kill action on last expected instance. Get status of instances
+ * which are in wider range. Check that only last is killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillLastInstance() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:20Z");
+ TimeUtil.sleepSeconds(15);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateResponse(result, 5, 4, 0, 0, 1);
+ }
+
+ /**
+ * Schedule process. Suspend one running instance. Perform -kill action on it. Check that
+ * mentioned instance is really killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillSuspended() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
+ }
+
+ /**
+ * Schedule single instance process. Wait till it finished. Try to kill the instance. Check
+ * that instance still succeeded.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceKillSucceeded() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
+ .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ public void deleteData() throws Exception {
+ LOGGER.info("in @AfterClass");
+ Bundle b = BundleUtil.readELBundle();
+ b = new Bundle(b, cluster);
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
new file mode 100644
index 0000000..8b40cf1
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Test Suite for instance rerun.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceRerunTest extends BaseTestClass {
+
+ private String baseTestDir = baseHDFSDir + "/ProcessInstanceRerunTest";
+ private String aggregateWorkflowDir = baseTestDir + "/aggregator";
+ private String feedInputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath = baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedInputTimedOutPath =
+ baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ Bundle b = BundleUtil.readELBundle();
+
+ b = new Bundle(b, cluster);
+ String startDate = "2010-01-02T00:40Z";
+ String endDate = "2010-01-02T01:20Z";
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(Method method) {
+ LOGGER.info("tearDown " + method.getName());
+ removeBundles();
+ }
+
+ /**
+ * Schedule process. Kill some instances. Rerun some of that killed. Check that
+ * instances were rerun correctly and other are still killed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunSomeKilled02() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(10);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
+ InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
+ List<String> wfIDs =
+ InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
+ }
+
+ /**
+ * Schedule process. Kill all instances. Rerun them. Check that they were rerun.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunMultipleKilled() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ LOGGER.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+ List<String> wfIDs =
+ InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
+ }
+
+ /**
+ * Schedule process. Kill some instances. Rerun them. Check that there are no killed
+ * instances left.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunSomeKilled01() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(5);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
+ List<String> wfIDs =
+ InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ TimeUtil.sleepSeconds(5);
+ InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
+ }
+
+ /**
+ * Schedule process. Kill single instance. Rerun it. Check it was rerun.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunSingleKilled() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ prism.getProcessHelper()
+ .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ String wfID = InstanceUtil.getWorkflows(cluster,
+ Util.getProcessName(bundles[0].getProcessData()), Status.KILLED).get(0);
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
+ }
+
+ /**
+ * Schedule process. Wait till it got succeeded. Rerun first succeeded instance. Check if it
+ * is running.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunSingleSucceeded() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String wfID = InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0]
+ .getProcessData()), Status.RUNNING, Status.SUCCEEDED).get(0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+ .getProcessData()), 0, CoordinatorAction
+ .Status.SUCCEEDED, EntityType.PROCESS);
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
+ }
+
+ /**
+ * Schedule process. Suspend its instances. Try to rerun them. Check that instances weren't
+ * rerun and are still suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunSingleSuspended() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:06Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(2);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ Assert.assertEquals(InstanceUtil
+ .getInstanceStatus(cluster, Util.getProcessName(bundles[0].getProcessData()), 0, 1),
+ CoordinatorAction.Status.SUSPENDED);
+ }
+
+ /**
+ * Schedule process. Wait till its instances succeed. Rerun them all. Check they are running.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunMultipleSucceeded() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
+ .getProcessData()), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ List<String> wfIDs =
+ InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
+ }
+
+ /**
+ * Schedule process with invalid input feed data path. Wait till process got timed-out. Rerun
+ * it's instances. Check that they were rerun and are waiting (wait for input data).
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceRerunTimedOut() throws Exception {
+ bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ CoordinatorAction.Status s;
+ InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
+ .getProcessData()), 1, CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
+ prism.getProcessHelper()
+ .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ s = InstanceUtil
+ .getInstanceStatus(cluster, Util.readEntityName(bundles[0].getProcessData()), 0, 0);
+ Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
+ "instance should have been in WAITING state");
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void deleteData() throws Exception {
+ LOGGER.info("in @AfterClass");
+ Bundle b = BundleUtil.readELBundle();
+ b = new Bundle(b, cluster);
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
new file mode 100644
index 0000000..72457de
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -0,0 +1,371 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Process instance resume tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceResumeTest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceResumeTest";
+ private String feedInputPath = baseTestHDFSDir +
+ "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath =
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceResumeTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ Bundle b = BundleUtil.readELBundle();
+ b = new Bundle(b, cluster);
+ b = new Bundle(b, cluster);
+ String startDate = "2010-01-01T23:20Z";
+ String endDate = "2010-01-02T01:40Z";
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("setup " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown(Method method) {
+ LOGGER.info("tearDown " + method.getName());
+ removeBundles();
+ }
+
+ /**
+ * Schedule process. Suspend some instances. Attempt to -resume instance using single -end
+ * parameter results in failure.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeOnlyEnd() throws Exception {
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ TimeUtil.sleepSeconds(10);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+ result = prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?end=2010-01-02T01:15Z");
+ InstanceUtil.validateSuccessWithStatusCode(result, ResponseKeys.UNPARSEABLE_DATE);
+ }
+
+ /**
+ * Schedule process. Suspend some instances. Try to perform -resume using time range which
+ * effects only on one instance. Check that this instance was resumed es expected.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeResumeSome() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ TimeUtil.sleepSeconds(10);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+ }
+
+ /**
+ * Schedule process. Suspend some instances. Try to perform -resume using time range which
+ * effects on all instances. Check that there are no suspended instances.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeResumeMany() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+ TimeUtil.sleepSeconds(15);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ }
+
+ /**
+ * Schedule process. Suspend first instance. Resume that instance using only -start parameter.
+ * Check that mentioned instance was resumed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeSingle() throws Exception {
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(5);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ TimeUtil.sleepSeconds(5);
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ TimeUtil.sleepSeconds(5);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Attempt to resume instances of non-existent process should fail with an appropriate
+ * status code.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeNonExistent() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r =
+ prism.getProcessHelper()
+ .getProcessInstanceResume("invalidName",
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
+ }
+
+ /**
+ * Attempt to perform -resume action without time range parameters should fail with an
+ + appropriate status code or message.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeNoParams() throws Exception {
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r =
+ prism.getProcessHelper().getProcessInstanceResume(
+ Util.readEntityName(bundles[0].getProcessData()), null);
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+ }
+
+ /**
+ * Schedule process, remove it. Try to -resume it's instance. Attempt should fail with
+ * an appropriate status code.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeDeleted() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
+ }
+
+ /**
+ * Schedule process. Try to resume entity which wasn't suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeNonSuspended() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z");
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z");
+ InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
+ result = prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z");
+ InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
+ }
+
+ /**
+ * Schedule process. Suspend last instance. Resume it using parameter which points to
+ * expected materialization time of last instance. Check that there are no suspended
+ * instances among all which belong to current process.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeLastInstance() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:25Z");
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:25Z");
+ InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:25Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:25Z");
+ InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ }
+
+ /**
+ * Schedule process. Suspend all instances except the first and the last using appropriate
+ * -start/-end parameters. Resume that instances. Check that there are no suspended ones.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceResumeWithinRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ prism.getProcessHelper()
+ .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+ TimeUtil.sleepSeconds(15);
+ InstancesResult result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
+
+ prism.getProcessHelper()
+ .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:20Z");
+ result = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
+ InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ public void deleteData() throws Exception {
+ LOGGER.info("in @AfterClass");
+
+ Bundle b = BundleUtil.readELBundle();
+ b = new Bundle(b, cluster);
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
new file mode 100644
index 0000000..559df48
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+/**
+ * Regression for instance running api.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceRunningTest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceRunningTest";
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private String feedInputPath = baseTestHDFSDir +
+ "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath =
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRunningTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+ Bundle bundle = BundleUtil.readELBundle();
+ bundle.generateUniqueBundle();
+ bundle = new Bundle(bundle, cluster);
+
+ String startDate = "2010-01-02T00:40Z";
+ String endDate = "2010-01-02T01:11Z";
+
+ bundle.setInputFeedDataPath(feedInputPath);
+ String prefix = bundle.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * Run process. Suspend it and then resume. Get all -running instances. Response should
+ * contain all process instances.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getResumedProcessInstance() throws Exception {
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+ bundles[0].getProcessData()));
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().resume(URLS.RESUME_URL,
+ bundles[0].getProcessData()));
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Run process. Suspend it. Try to get -running instances. Response should be
+ * successful but shouldn't contain any instance.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getSuspendedProcessInstance() throws Exception {
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+ bundles[0].getProcessData()));
+ TimeUtil.sleepSeconds(5);
+ InstancesResult r = prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ InstanceUtil.validateSuccessWOInstances(r);
+ }
+
+ /**
+ * Run process. Get -running instances. Check that response contains expected number of
+ * instances.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getRunningProcessInstance() throws Exception {
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Attempt to get -running instances of nonexistent process should result in error.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getNonExistenceProcessInstance() throws Exception {
+ InstancesResult r =
+ prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING, "invalidName");
+ Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
+ "Unexpected status code");
+ }
+
+ /**
+ * Attempt to get -running instances of deleted process should result in error.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getKilledProcessInstance() throws Exception {
+ bundles[0].submitFeedsScheduleProcess(prism);
+ prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ InstancesResult r = prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
+ "Unexpected status code");
+ }
+
+ /**
+ * Launch process and wait till it got succeeded. Try to get -running instances. Response
+ * should reflect success but shouldn't contain any of instances.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void getSucceededProcessInstance() throws Exception {
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
+ .getProcessData()), Job.Status.SUCCEEDED);
+ InstancesResult result = prism.getProcessHelper()
+ .getRunningInstance(URLS.INSTANCE_RUNNING,
+ Util.readEntityName(bundles[0].getProcessData()));
+ InstanceUtil.validateSuccessWOInstances(result);
+ }
+
+
+ @AfterClass(alwaysRun = true)
+ public void deleteData() throws Exception {
+ LOGGER.info("in @AfterClass");
+ Bundle b = BundleUtil.readELBundle();
+ b = new Bundle(b, cluster);
+ b.setInputFeedDataPath(feedInputPath);
+ String prefix = b.getFeedDataPathPrefix();
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
new file mode 100644
index 0000000..86f7a86
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -0,0 +1,436 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency.TimeUnit;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.InstancesResult.WorkflowStatus;
+import org.apache.falcon.regression.core.response.ResponseKeys;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction.Status;
+import org.apache.oozie.client.Job;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URISyntaxException;
+import java.util.List;
+
+/**
+ * Process instance status tests.
+ */
+@Test(groups = "embedded")
+public class ProcessInstanceStatusTest extends BaseTestClass {
+
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private String baseTestHDFSDir = baseHDFSDir + "/ProcessInstanceStatusTest";
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+ private String feedInputPath =
+ baseTestHDFSDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputPath =
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedInputTimedOutPath =
+ baseTestHDFSDir + "/timedoutStatus/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedOutputTimedOutPath =
+ baseTestHDFSDir + "/output-data/timedoutStatus/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class);
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+
+ Bundle bundle = BundleUtil.readELBundle();
+ bundle.generateUniqueBundle();
+ bundle = new Bundle(bundle, cluster);
+
+ String startDate = "2010-01-01T23:40Z";
+ String endDate = "2010-01-02T02:40Z";
+
+ bundle.setInputFeedDataPath(feedInputPath);
+ String prefix = bundle.getFeedDataPathPrefix();
+
+ HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
+
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
+ HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
+ }
+
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ /**
+ * time out is set as 3 minutes .... getStatus is for a large range in past.
+ * 6 instance should be materialized and one in running and other in waiting
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusStartAndEndCheckNoInstanceAfterEndDate()
+ throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
+ bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+ InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0);
+ }
+
+ /**
+ * Perform -getStatus using only -start parameter within time-range of non-materialized
+ * instances. There should be no instances returned in response.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusOnlyStartAfterMat() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-03T10:22Z");
+ bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
+ bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(1);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T05:00Z");
+ AssertUtil.assertSucceeded(r);
+ Assert.assertEquals(r.getInstances(), null);
+ }
+
+ /**
+ * Schedule process. Perform -getStatus using -end parameter which is out of process
+ * validity range. Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusEndOutOfRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, 400);
+ }
+
+ /**
+ * Schedule process and try to -getStatus without date parameters. Attempt should fail with
+ * an appropriate message.
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusDateEmpty()
+ throws JAXBException, AuthenticationException, IOException, URISyntaxException {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+ }
+
+ /**
+ * Schedule process with number of instances. Perform -getStatus request with valid
+ * parameters. Attempt should succeed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusStartAndEnd() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Schedule process. Perform -getStatus using -start parameter which is out of process
+ * validity range. Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusStartOutOfRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, 400);
+ }
+
+ /**
+ * Schedule and then delete process. Try to get the status of its instances. Attempt should
+ * fail with an appropriate code.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusKilled() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
+ bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ /**
+ * Schedule process and then suspend it. -getStatus of first instance only -start parameter.
+ * Instance should be suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusOnlyStartSuspended() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
+ bundles[0].getProcessData()));
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
+ }
+
+ /**
+ * Schedule process. Try to -getStatus using -start/-end parameters with values which were
+ * reversed i.e. -start is further then -end. Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusReverseDateRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:20Z&end=2010-01-02T01:07Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, 400);
+ }
+
+ /**
+ * Schedule process. Perform -getStatus using -start/-end parameters which are out of process
+ * validity range. Attempt should fail.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusStartEndOutOfRange() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(
+ feedOutputPath);
+ bundles[0].setProcessConcurrency(2);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, 400);
+ }
+
+ /**
+ * Schedule process. Suspend and then resume it. -getStatus of its instances. Check that
+ * response reflects that instances are running.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusResumed() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessConcurrency(2);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.SUSPENDED);
+ prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+ TimeUtil.sleepSeconds(15);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Schedule process. -getStatus of it's first instance using only -start parameter which
+ * points to start time of process validity. Check that response reflects expected status of
+ * instance.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusOnlyStart() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z");
+ InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
+ }
+
+ /**
+ * Schedule process. Try to perform -getStatus using valid -start parameter but invalid
+ * process name. Attempt should fail with an appropriate status code.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusInvalidName() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus("invalidProcess", "?start=2010-01-01T01:00Z");
+ if (!(r.getStatusCode() == ResponseKeys.PROCESS_NOT_FOUND)) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ /**
+ * Schedule process. Suspend it. -getStatus of it's instances. Check if response reflects
+ * their status as suspended.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusSuspended() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ for (int i = 0; i < bundles[0].getClusters().size(); i++) {
+ LOGGER.info("cluster to be submitted: " + i + " "
+ + Util.prettyPrintXml(bundles[0].getClusters().get(i)));
+ }
+ bundles[0].submitFeedsScheduleProcess(prism);
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(
+ prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ Job.Status.SUSPENDED);
+ TimeUtil.sleepSeconds(15);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUSPENDED);
+ }
+
+ /**
+ * Schedule process. Try to -getStatus without time range parameters. Attempt should fails
+ * with an appropriate status code.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusWoParams() throws Exception {
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
+ }
+
+ /**
+ * Schedule process with timeout set to 2 minutes. Wait till it become timed-out. -getStatus
+ * of that process. Check that all materialized instances are failed.
+ *
+ * @throws Exception
+ */
+ @Test(groups = {"singleCluster"})
+ public void testProcessInstanceStatusTimedOut() throws Exception {
+ bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputTimedOutPath);
+ bundles[0].setProcessConcurrency(3);
+ bundles[0].submitFeedsScheduleProcess(prism);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.readEntityName(bundles[0]
+ .getProcessData()), 1, Status.TIMEDOUT, EntityType.PROCESS);
+ InstancesResult r = prism.getProcessHelper()
+ .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.validateFailedInstances(r, 3);
+ }
+}