You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:36 UTC
[31/41] git commit: FALCON-674 General code factored out for
ProcessInstance* tests. Contributed by Paul Isaychuk
FALCON-674 General code factored out for ProcessInstance* tests. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/1a3728b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/1a3728b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/1a3728b1
Branch: refs/heads/FALCON-585
Commit: 1a3728b1f2dfc62c4c8be2bdd6c8c4e663c61955
Parents: be08a2e
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Fri Sep 5 15:11:34 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Fri Sep 5 15:12:32 2014 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../regression/ProcessInstanceKillsTest.java | 53 ++---
.../regression/ProcessInstanceRerunTest.java | 45 ++---
.../regression/ProcessInstanceResumeTest.java | 199 ++++++-------------
.../regression/ProcessInstanceRunningTest.java | 48 ++---
.../regression/ProcessInstanceStatusTest.java | 136 +++++--------
.../regression/ProcessInstanceSuspendTest.java | 107 ++++------
7 files changed, 196 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index aecd520..cc38357 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,6 +9,9 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-674 General code factored out for ProcessInstance* tests (Paul Isaychuk via Ruslan
+ Ostafiychuk)
+
FALCON-657 String datePattern moved to BaseTestClass (Ruslan Ostafiychuk)
FALCON-643 Tests with zero-output/input scenario amended to match test case (Paul Isaychuk via
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index ba7a8fd..7b938ec 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -60,7 +60,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class);
private static final double TIMEOUT = 15;
- String processName;
+ private String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
@@ -85,6 +85,9 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setInputFeedDataPath(feedInputPath);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
processName = Util.readEntityName(bundles[0].getProcessData());
}
@@ -103,14 +106,11 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceKillSingle() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+ "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
}
@@ -127,8 +127,6 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessConcurrency(2);
bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(10);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
@@ -149,8 +147,6 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessValidity("2010-01-02T00:00Z", "2010-01-02T04:00Z");
bundles[0].setProcessTimeOut(3, TimeUnit.minutes);
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
@@ -181,9 +177,6 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
baseTestHDFSDir + "/input01", dataDates);
bundles[0].setInputFeedDataPath(feedInputPath.replace("input/", "input01/"));
bundles[0].setProcessValidity(startTime, endTime);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
@@ -205,9 +198,6 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
both start and end r in future with respect to current time
*/
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:21Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
String startTime = TimeUtil.getTimeWrtSystemTime(1);
@@ -228,18 +218,15 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceKillMultipleInstance() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
prism.getProcessHelper()
.getProcessInstanceKill(processName, "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
- InstanceUtil.validateResponse(result, 5, 2, 0, 0, 3);
+ InstanceUtil.validateResponse(r, 5, 2, 0, 0, 3);
}
/**
@@ -251,17 +238,14 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceKillLastInstance() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:21Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:20Z");
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
- InstanceUtil.validateResponse(result, 5, 4, 0, 0, 1);
+ InstanceUtil.validateResponse(r, 5, 4, 0, 0, 1);
}
/**
@@ -273,15 +257,11 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceKillSuspended() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(processName, "?start=2010-01-02T01:00Z");
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+ "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
}
@@ -294,15 +274,12 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceKillSucceeded() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+ "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index 8df7f1f..4ae6d72 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -63,6 +63,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class);
private static final double TIMEOUT = 10;
private String processName;
+ private String start = "?start=2010-01-02T01:00Z";
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
@@ -86,6 +87,8 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].generateUniqueBundle();
bundles[0].setInputFeedDataPath(feedInputPath);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
processName = bundles[0].getProcessName();
}
@@ -104,8 +107,6 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunSomeKilled02() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
@@ -113,11 +114,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
TimeUtil.sleepSeconds(TIMEOUT);
InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
+ start + "&end=2010-01-02T01:16Z");
InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ start + "&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
}
@@ -129,8 +130,6 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunMultipleKilled() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
String process = bundles[0].getProcessData();
@@ -138,11 +137,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
prism.getProcessHelper().
- getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ getProcessInstanceRerun(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
}
@@ -155,19 +154,17 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunSomeKilled01() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
String process = bundles[0].getProcessData();
InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ .getProcessInstanceKill(processName, start + "&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ start + "&end=2010-01-02T01:11Z");
TimeUtil.sleepSeconds(TIMEOUT);
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
}
@@ -180,16 +177,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunSingleKilled() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
String process = bundles[0].getProcessData();
InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
- prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceKill(processName, start);
String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0);
- prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceRerun(processName, start);
Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
}
@@ -202,8 +197,6 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunSingleSucceeded() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
@@ -213,7 +206,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
Status.SUCCEEDED).get(0);
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
.Status.SUCCEEDED, EntityType.PROCESS);
- prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceRerun(processName, start);
Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
}
@@ -226,15 +219,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunSingleSuspended() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:06Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
prism.getProcessHelper().getProcessInstanceSuspend(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ start + "&end=2010-01-02T01:06Z");
prism.getProcessHelper().getProcessInstanceRerun(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ start + "&end=2010-01-02T01:06Z");
Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1),
CoordinatorAction.Status.SUSPENDED);
}
@@ -247,8 +238,6 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceRerunMultipleSucceeded() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
@@ -258,7 +247,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
prism.getProcessHelper().getProcessInstanceRerun(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ start + "&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
}
@@ -272,9 +261,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
public void testProcessInstanceRerunTimedOut() throws Exception {
bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
@@ -282,7 +269,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
prism.getProcessHelper().getProcessInstanceRerun(processName,
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ start + "&end=2010-01-02T01:11Z");
s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0);
Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
"instance should have been in WAITING state");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
index fdb64af..bfb8d52 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceResumeTest.java
@@ -59,6 +59,8 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceResumeTest.class);
private static final double SCHEDULED = 15;
private static final double AFFECTED = 10;
+ private String processName;
+ private String wholeRange = "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z";
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
@@ -69,7 +71,6 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
String startDate = "2010-01-01T23:20Z";
String endDate = "2010-01-02T01:40Z";
b.setInputFeedDataPath(feedInputPath);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
b.getFeedDataPathPrefix(), dataDates);
@@ -84,6 +85,11 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(feedInputPath);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setProcessConcurrency(6);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
+ processName = Util.readEntityName(bundles[0].getProcessData());
}
@AfterMethod(alwaysRun = true)
@@ -100,25 +106,17 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeOnlyEnd() throws Exception {
- bundles[0].setProcessConcurrency(6);
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
-
- result = prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?end=2010-01-02T01:15Z");
- InstanceUtil.validateSuccessWithStatusCode(result, ResponseKeys.UNPARSEABLE_DATE);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ wholeRange);
+ InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0);
+ r = prism.getProcessHelper().getProcessInstanceResume(processName, "?end=2010-01-02T01:15Z");
+ InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
}
/**
@@ -129,28 +127,18 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeResumeSome() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
-
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ wholeRange);
+ InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0);
+ prism.getProcessHelper().getProcessInstanceResume(processName,
+ "?start=2010-01-02T01:05Z&end=2010-01-02T01:16Z");
+ r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange);
+ InstanceUtil.validateResponse(r, 6, 5, 1, 0, 0);
}
/**
@@ -161,28 +149,17 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeResumeMany() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ String withinRange = "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z";
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, withinRange);
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
-
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ wholeRange);
+ InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0);
+ prism.getProcessHelper().getProcessInstanceResume(processName, withinRange);
+ r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange);
+ InstanceUtil.validateResponse(r, 6, 6, 0, 0, 0);
}
/**
@@ -193,22 +170,15 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeSingle() throws Exception {
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ String start = "?start=2010-01-02T01:00Z";
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, start);
TimeUtil.sleepSeconds(AFFECTED);
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceResume(processName, start);
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, start);
InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
}
@@ -220,15 +190,9 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeNonExistent() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r =
- prism.getProcessHelper()
- .getProcessInstanceResume("invalidName",
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceResume("invalidName",
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
}
@@ -240,13 +204,8 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeNoParams() throws Exception {
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r =
- prism.getProcessHelper().getProcessInstanceResume(
- Util.readEntityName(bundles[0].getProcessData()), null);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName, null);
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
}
@@ -258,15 +217,10 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeDeleted() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceResume(processName,
+ "?start=2010-01-02T01:05Z");
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.PROCESS_NOT_FOUND);
}
@@ -277,23 +231,14 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeNonSuspended() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z");
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z");
- InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
- result = prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z");
- InstanceUtil.validateResponse(result, 1, 1, 0, 0, 0);
+ String start = "?start=2010-01-02T01:05Z";
+ prism.getProcessHelper().getProcessInstanceResume(processName, start);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, start);
+ InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
+ r = prism.getProcessHelper().getProcessInstanceResume(processName, start);
+ InstanceUtil.validateResponse(r, 1, 1, 0, 0, 0);
}
/**
@@ -305,28 +250,18 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeLastInstance() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:25Z");
+ String last = "?start=2010-01-02T01:25Z";
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, last);
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 5, 1, 0, 0);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ wholeRange);
+ InstanceUtil.validateResponse(r, 6, 5, 1, 0, 0);
TimeUtil.sleepSeconds(10);
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:25Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ prism.getProcessHelper().getProcessInstanceResume(processName, last);
+ r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange);
+ InstanceUtil.validateResponse(r, 6, 6, 0, 0, 0);
}
/**
@@ -337,27 +272,17 @@ public class ProcessInstanceResumeTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceResumeWithinRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:26Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(SCHEDULED);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
+ String withinRange = "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z";
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, withinRange);
TimeUtil.sleepSeconds(AFFECTED);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 2, 4, 0, 0);
- prism.getProcessHelper()
- .getProcessInstanceResume(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:21Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:26Z");
- InstanceUtil.validateResponse(result, 6, 6, 0, 0, 0);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ wholeRange);
+ InstanceUtil.validateResponse(r, 6, 2, 4, 0, 0);
+ prism.getProcessHelper().getProcessInstanceResume(processName, withinRange);
+ r = prism.getProcessHelper().getProcessInstanceStatus(processName, wholeRange);
+ InstanceUtil.validateResponse(r, 6, 6, 0, 0, 0);
}
@AfterClass(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
index 2b36d4d..e92f6b8 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRunningTest.java
@@ -61,26 +61,23 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
private String feedOutputPath = baseTestHDFSDir + "/output-data" + MINUTE_DATE_PATTERN;
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRunningTest.class);
private static final double TIMEOUT = 15;
+ private String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
Bundle bundle = BundleUtil.readELBundle();
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-02T00:40Z";
String endDate = "2010-01-02T01:11Z";
-
bundle.setInputFeedDataPath(feedInputPath);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT,
bundle.getFeedDataPathPrefix(), dataDates);
}
-
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
@@ -93,6 +90,7 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
+ processName = Util.readEntityName(bundles[0].getProcessData());
}
@AfterMethod(alwaysRun = true)
@@ -111,15 +109,13 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
- bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process));
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.assertSucceeded(prism.getProcessHelper().resume(URLS.RESUME_URL,
- bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().resume(URLS.RESUME_URL, process));
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@@ -136,9 +132,8 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
bundles[0].getProcessData()));
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ processName);
InstanceUtil.validateSuccessWOInstances(r);
}
@@ -151,9 +146,8 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void getRunningProcessInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@@ -164,9 +158,8 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void getNonExistenceProcessInstance() throws Exception {
- InstancesResult r =
- prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING, "invalidName");
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ "invalidName");
Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
"Unexpected status code");
}
@@ -180,9 +173,8 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
public void getKilledProcessInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
- InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ processName);
Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
"Unexpected status code");
}
@@ -196,12 +188,10 @@ public class ProcessInstanceRunningTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void getSucceededProcessInstance() throws Exception {
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
- .getProcessData()), Job.Status.SUCCEEDED);
- InstancesResult result = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
- InstanceUtil.validateSuccessWOInstances(result);
+ InstanceUtil.waitForBundleToReachState(cluster, processName, Job.Status.SUCCEEDED);
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING,
+ processName);
+ InstanceUtil.validateSuccessWOInstances(r);
}
@AfterClass(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
index cac8904..8c82da9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceStatusTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction.Status;
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -70,30 +71,28 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
baseTestHDFSDir + "/output-data/timedoutStatus" + MINUTE_DATE_PATTERN;
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceStatusTest.class);
private static final double TIMEOUT = 15;
+ private String processName;
+ private OozieClient clusterOC = serverOC.get(0);
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
-
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
Bundle bundle = BundleUtil.readELBundle();
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-01T23:40Z";
String endDate = "2010-01-02T02:40Z";
-
bundle.setInputFeedDataPath(feedInputPath);
String prefix = bundle.getFeedDataPathPrefix();
-
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
-
+ /**
+ * Configures general process definition which particular properties can be overwritten
+ */
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
@@ -102,6 +101,9 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].generateUniqueBundle();
bundles[0].setInputFeedDataPath(feedInputPath);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ processName = Util.readEntityName(bundles[0].getProcessData());
}
@AfterMethod(alwaysRun = true)
@@ -123,9 +125,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T10:20Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
InstanceUtil.validateResponse(r, 6, 1, 0, 5, 0);
}
@@ -143,9 +144,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
bundles[0].setProcessPeriodicity(1, TimeUnit.minutes);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T05:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T05:00Z");
AssertUtil.assertSucceeded(r);
Assert.assertEquals(r.getInstances(), null);
}
@@ -158,13 +158,10 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusEndOutOfRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:30Z");
InstanceUtil.validateSuccessWithStatusCode(r, 400);
}
@@ -176,10 +173,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
public void testProcessInstanceStatusDateEmpty()
throws JAXBException, AuthenticationException, IOException, URISyntaxException {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
}
@@ -191,11 +186,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusStartAndEnd() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@@ -208,12 +200,9 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusStartOutOfRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T00:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateSuccessWithStatusCode(r, 400);
}
@@ -226,13 +215,10 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusKilled() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL,
bundles[0].getProcessData()));
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
Assert.assertTrue(false);
@@ -247,14 +233,11 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusOnlyStartSuspended() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL,
bundles[0].getProcessData()));
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
}
@@ -267,11 +250,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusReverseDateRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:20Z&end=2010-01-02T01:07Z");
InstanceUtil.validateSuccessWithStatusCode(r, 400);
}
@@ -284,18 +264,14 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusStartEndOutOfRange() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(
- feedOutputPath);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
InstanceUtil.validateSuccessWithStatusCode(r, 400);
}
@@ -307,24 +283,19 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusResumed() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
- prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.SUSPENDED);
- prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+ String process = bundles[0].getProcessData();
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
+ prism.getProcessHelper().resume(URLS.RESUME_URL, process);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@@ -337,14 +308,11 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusOnlyStart() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.RUNNING);
}
@@ -357,7 +325,6 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusInvalidName() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
InstancesResult r = prism.getProcessHelper()
.getProcessInstanceStatus("invalidProcess", "?start=2010-01-01T01:00Z");
@@ -374,23 +341,18 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
*/
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusSuspended() throws Exception {
- bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
for (int i = 0; i < bundles[0].getClusters().size(); i++) {
LOGGER.info("cluster to be submitted: " + i + " "
+ Util.prettyPrintXml(bundles[0].getClusters().get(i)));
}
+ String process = bundles[0].getProcessData();
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ AssertUtil.assertSucceeded(prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process));
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUSPENDED);
}
@@ -403,10 +365,8 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceStatusWoParams() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T02:30Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].submitFeedsScheduleProcess(prism);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName, null);
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
}
@@ -420,17 +380,15 @@ public class ProcessInstanceStatusTest extends BaseTestClass {
public void testProcessInstanceStatusTimedOut() throws Exception {
bundles[0].setInputFeedDataPath(feedInputTimedOutPath);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setProcessTimeOut(2, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputTimedOutPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.readEntityName(bundles[0]
- .getProcessData()), 1, Status.TIMEDOUT, EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1, Status.TIMEDOUT,
+ EntityType.PROCESS);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.validateFailedInstances(r, 3);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/1a3728b1/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
index 80da1ad..2a4f7f3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceSuspendTest.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -63,6 +64,8 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
private FileSystem clusterFS = serverFS.get(0);
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceSuspendTest.class);
private static final double TIMEOUT = 15;
+ private String processName;
+ private OozieClient clusterOC = serverOC.get(0);
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
@@ -88,6 +91,9 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
bundles[0].generateUniqueBundle();
bundles[0].setInputFeedDataPath(feedInputPath);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ bundles[0].setOutputFeedLocationData(feedOutputPath);
+ processName = Util.readEntityName(bundles[0].getProcessData());
}
@AfterMethod(alwaysRun = true)
@@ -104,23 +110,17 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendLargeRange() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:21Z");
InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T00:00Z&end=2010-01-02T01:30Z");
InstanceUtil.validateSuccessWithStatusCode(result, 400);
}
@@ -134,15 +134,11 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendSucceeded() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
+ InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
.getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccessWithStatusCode(r, 0);
}
@@ -156,23 +152,17 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendAll() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 0, 5, 0, 0);
}
@@ -186,15 +176,11 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendWoParams() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:22Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()), null);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName, null);
InstanceUtil.validateSuccessWithStatusCode(r, ResponseKeys.UNPARSEABLE_DATE);
}
@@ -207,23 +193,17 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendStartAndEnd() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
InstanceUtil.validateResponse(result, 5, 3, 0, 2, 0);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:15Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
+ result = prism.getProcessHelper().getProcessInstanceStatus(processName,
"?start=2010-01-02T01:00Z&end=2010-01-02T01:22Z");
InstanceUtil.validateResponse(result, 5, 0, 3, 2, 0);
}
@@ -236,15 +216,11 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendNonExistent() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult r =
- prism.getProcessHelper()
+ InstancesResult r = prism.getProcessHelper()
.getProcessInstanceSuspend("invalidName", "?start=2010-01-02T01:20Z");
if ((r.getStatusCode() != ResponseKeys.PROCESS_NOT_FOUND)) {
Assert.assertTrue(false);
@@ -260,24 +236,16 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendOnlyStart() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:11Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
+ prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING, processName);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceSuspend(processName,
"?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccessOnlyStart(r, WorkflowStatus.SUSPENDED);
- prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING, processName);
}
/**
@@ -290,24 +258,17 @@ public class ProcessInstanceSuspendTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void testProcessInstanceSuspendSuspendLast() throws Exception {
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:23Z");
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(serverOC.get(0), EntityType.PROCESS, bundles[0].getProcessData(),
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
Job.Status.RUNNING);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 5, 0, 0, 0);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:20Z");
- result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ prism.getProcessHelper().getProcessInstanceSuspend(processName, "?start=2010-01-02T01:20Z");
+ result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 4, 1, 0, 0);
}