You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:11 UTC
[06/41] git commit: FALCON-632 Refactoring,
documentation stuff contributed by Paul Isaychuk
FALCON-632 Refactoring, documentation stuff contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/5dfe5cde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/5dfe5cde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/5dfe5cde
Branch: refs/heads/FALCON-585
Commit: 5dfe5cdef7521d2e41de32d8abd35f9a389ae82a
Parents: 78b9c1a
Author: Samarth Gupta <sa...@inmobi.com>
Authored: Thu Aug 28 11:33:05 2014 +0530
Committer: Samarth Gupta <sa...@inmobi.com>
Committed: Thu Aug 28 11:33:05 2014 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../regression/EmbeddedPigScriptTest.java | 60 +++----
.../regression/FeedInstanceStatusTest.java | 170 ++++++++-----------
.../regression/ProcessInstanceKillsTest.java | 62 +++----
.../regression/ProcessInstanceRerunTest.java | 127 ++++++--------
.../falcon/regression/ProcessLibPathTest.java | 40 ++---
.../falcon/regression/prism/RetentionTest.java | 67 ++++++--
7 files changed, 239 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 5ed6a89..faf7e02 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -8,6 +8,8 @@ Trunk (Unreleased)
FALCON-589 Add test cases for various feed operations on Hcat feeds (Karishma G
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-632 Refactoring, documentation stuff (Paul Isaychuk via Samarth Gupta)
+
FALCON-609 UpdateAtSpecificTimeTest, InstanceSummaryTest tagged, fixed, refactored
(Paul Isaychuk via Samarth Gupta)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
index 1973bf8..0d89fac 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/EmbeddedPigScriptTest.java
@@ -69,28 +69,24 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
String inputPath = pigTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
private static final Logger logger = Logger.getLogger(EmbeddedPigScriptTest.class);
private static final double TIMEOUT = 15;
+ String processName;
+ String process;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
+
//copy pig script
HadoopUtil.uploadDir(clusterFS, pigScriptDir, OSUtil.RESOURCES + "pig");
-
Bundle bundle = BundleUtil.readELBundle();
bundle.generateUniqueBundle();
bundle = new Bundle(bundle, cluster);
-
String startDate = "2010-01-02T00:40Z";
String endDate = "2010-01-02T01:10Z";
-
bundle.setInputFeedDataPath(inputPath);
prefix = bundle.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
- List<String> dataDates =
- TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
+ List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
@@ -120,6 +116,8 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
processElement.getWorkflow().setEngine(EngineType.PIG);
bundles[0].setProcessData(processElement.toString());
bundles[0].submitFeedsScheduleProcess(prism);
+ process = bundles[0].getProcessData();
+ processName = Util.readEntityName(process);
}
@AfterMethod(alwaysRun = true)
@@ -129,70 +127,56 @@ public class EmbeddedPigScriptTest extends BaseTestClass {
@Test(groups = {"singleCluster"})
public void getResumedProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
- prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
+ prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process);
TimeUtil.sleepSeconds(TIMEOUT);
- ServiceResponse status =
- prism.getProcessHelper().getStatus(URLS.STATUS_URL, bundles[0].getProcessData());
+ ServiceResponse status = prism.getProcessHelper().getStatus(URLS.STATUS_URL, process);
Assert.assertTrue(status.getMessage().contains("SUSPENDED"), "Process not suspended.");
- prism.getProcessHelper().resume(URLS.RESUME_URL, bundles[0].getProcessData());
+ prism.getProcessHelper().resume(URLS.RESUME_URL, process);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@Test(groups = {"singleCluster"})
public void getSuspendedProcessInstance() throws Exception {
- prism.getProcessHelper().suspend(URLS.SUSPEND_URL, bundles[0].getProcessData());
+ prism.getProcessHelper().suspend(URLS.SUSPEND_URL, process);
TimeUtil.sleepSeconds(TIMEOUT);
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.SUSPENDED);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.SUSPENDED);
InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
InstanceUtil.validateSuccessWOInstances(r);
}
@Test(groups = {"singleCluster"})
public void getRunningProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
}
@Test(groups = {"singleCluster"})
public void getKilledProcessInstance() throws Exception {
- prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData());
+ prism.getProcessHelper().delete(URLS.DELETE_URL, process);
InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
Assert.assertEquals(r.getStatusCode(), ResponseKeys.PROCESS_NOT_FOUND,
"Unexpected status code");
}
@Test(groups = {"singleCluster"})
public void getSucceededProcessInstance() throws Exception {
- AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(),
- Job.Status.RUNNING);
+ AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING);
InstancesResult r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ .getRunningInstance(URLS.INSTANCE_RUNNING, processName);
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.RUNNING);
-
int counter = OSUtil.IS_WINDOWS ? 100 : 50;
InstanceUtil.waitForBundleToReachState(cluster, Util.getProcessName(bundles[0]
.getProcessData()), Job.Status.SUCCEEDED, counter);
- r = prism.getProcessHelper()
- .getRunningInstance(URLS.INSTANCE_RUNNING,
- Util.readEntityName(bundles[0].getProcessData()));
+ r = prism.getProcessHelper().getRunningInstance(URLS.INSTANCE_RUNNING, processName);
InstanceUtil.validateSuccessWOInstances(r);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
index ff227d6..acf3bb3 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceStatusTest.java
@@ -82,27 +82,29 @@ public class FeedInstanceStatusTest extends BaseTestClass {
removeBundles();
}
+ /**
+ * Goes through the whole feed replication workflow checking its instances status while
+ * submitting feed, scheduling it, performing different combinations of actions like
+ * -submit, -resume, -kill, -rerun.
+ */
@Test(groups = {"multiCluster"})
public void feedInstanceStatus_running() throws Exception {
bundles[0].setInputFeedDataPath(feedInputPath);
logger.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
- ServiceResponse r = prism.getClusterHelper()
- .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
- Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.assertSucceeded(prism.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0)));
logger.info("cluster bundle2: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
- r = prism.getClusterHelper()
- .submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
- Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.assertSucceeded(prism.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0)));
logger.info("cluster bundle3: " + Util.prettyPrintXml(bundles[2].getClusters().get(0)));
- r = prism.getClusterHelper()
- .submitEntity(URLS.SUBMIT_URL, bundles[2].getClusters().get(0));
- Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+ AssertUtil.assertSucceeded(prism.getClusterHelper()
+ .submitEntity(URLS.SUBMIT_URL, bundles[2].getClusters().get(0)));
String feed = bundles[0].getDataSets().get(0);
+ String feedName = Util.readEntityName(feed);
feed = InstanceUtil.setFeedCluster(feed,
XmlUtil.createValidity("2009-02-01T00:00Z", "2012-01-01T00:00Z"),
XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
@@ -110,7 +112,7 @@ public class FeedInstanceStatusTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(-50);
feed = InstanceUtil.setFeedCluster(feed, XmlUtil.createValidity(startTime,
- TimeUtil.addMinsToTime(startTime, 65)),
+ TimeUtil.addMinsToTime(startTime, 65)),
XmlUtil.createRtention("hours(10)", ActionType.DELETE),
Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.SOURCE,
"US/${cluster.colo}");
@@ -126,48 +128,38 @@ public class FeedInstanceStatusTest extends BaseTestClass {
Util.readEntityName(bundles[2].getClusters().get(0)), ClusterType.SOURCE,
"UK/${cluster.colo}");
-
logger.info("feed: " + Util.prettyPrintXml(feed));
//status before submit
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 100) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 120));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 100)
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 120));
AssertUtil.assertSucceeded(prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed));
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 100));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
AssertUtil.assertSucceeded(prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed));
// both replication instances
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 100));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
// single instance at -30
- prism.getFeedHelper().getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + TimeUtil
- .addMinsToTime(startTime, 20));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20));
//single at -10
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
//single at 10
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
//single at 30
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
String postFix = "/US/" + cluster2.getClusterHelper().getColo();
String prefix = bundles[0].getFeedDataPathPrefix();
@@ -180,102 +172,74 @@ public class FeedInstanceStatusTest extends BaseTestClass {
HadoopUtil.lateDataReplenish(cluster3FS, 80, 20, prefix, postFix);
// both replication instances
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 100));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 100));
// single instance at -30
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 20));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20));
//single at -10
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
//single at 10
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
//single at 30
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
logger.info("Wait till feed goes into running ");
//suspend instances -10
- prism.getFeedHelper()
- .getProcessInstanceSuspend(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 40));
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 20) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceSuspend(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
//resuspend -10 and suspend -30 source specific
- prism.getFeedHelper()
- .getProcessInstanceSuspend(Util.readEntityName(feed),
- "?start=" + TimeUtil
- .addMinsToTime(startTime, 20) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 40));
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 20) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceSuspend(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
//resume -10 and -30
- prism.getFeedHelper()
- .getProcessInstanceResume(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 20) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 40));
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 20) + "&end=" +
- TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceResume(feedName, "?start=" + TimeUtil
+ .addMinsToTime(startTime, 20) + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
+ prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + TimeUtil
+ .addMinsToTime(startTime, 20) + "&end=" + TimeUtil.addMinsToTime(startTime, 40));
//get running instances
- prism.getFeedHelper().getRunningInstance(URLS.INSTANCE_RUNNING, Util.readEntityName(feed));
+ prism.getFeedHelper().getRunningInstance(URLS.INSTANCE_RUNNING, feedName);
//rerun succeeded instance
- prism.getFeedHelper()
- .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 20));
+ prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
+ prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startTime
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 20));
//kill instance
- prism.getFeedHelper()
- .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + TimeUtil
- .addMinsToTime(startTime, 44));
- prism.getFeedHelper()
- .getProcessInstanceKill(Util.readEntityName(feed), "?start=" + startTime);
+ prism.getFeedHelper().getProcessInstanceKill(feedName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 44));
+ prism.getFeedHelper().getProcessInstanceKill(feedName, "?start=" + startTime);
//end time should be less than end of validity i.e startTime + 110
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 110));
-
+ prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
//rerun killed instance
- prism.getFeedHelper()
- .getProcessInstanceRerun(Util.readEntityName(feed), "?start=" + startTime);
- prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 110));
+ prism.getFeedHelper().getProcessInstanceRerun(feedName, "?start=" + startTime);
+ prism.getFeedHelper().getProcessInstanceStatus(feedName, "?start=" + startTime
+ + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
//kill feed
prism.getFeedHelper().delete(URLS.DELETE_URL, feed);
- InstancesResult responseInstance = prism.getFeedHelper()
- .getProcessInstanceStatus(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil
- .addMinsToTime(startTime, 110));
+ InstancesResult responseInstance = prism.getFeedHelper().getProcessInstanceStatus(feedName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(startTime, 110));
logger.info(responseInstance.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index e7b2616..e4129e6 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -55,29 +55,25 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
private String testDir = "/ProcessInstanceKillsTest";
private String baseTestHDFSDir = baseHDFSDir + testDir;
private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
- private String feedInputPath = baseTestHDFSDir +
- "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- private String feedOutputPath =
- baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedInputPath = baseTestHDFSDir + "/input" + datePattern;
+ private String feedOutputPath = baseTestHDFSDir + "/output-data" + datePattern;
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceKillsTest.class);
private static final double TIMEOUT = 15;
+ String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
Bundle b = BundleUtil.readELBundle();
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-01T23:20Z";
String endDate = "2010-01-02T01:21Z";
-
b.setInputFeedDataPath(feedInputPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
@@ -85,12 +81,12 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
@BeforeMethod(alwaysRun = true)
public void setup(Method method) throws Exception {
LOGGER.info("test name: " + method.getName());
-
bundles[0] = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
bundles[0].setInputFeedDataPath(feedInputPath);
+ processName = Util.readEntityName(bundles[0].getProcessData());
}
@AfterMethod(alwaysRun = true)
@@ -115,8 +111,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
}
@@ -139,8 +134,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T00:03Z&end=2010-01-02T00:03Z");
InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1);
}
@@ -162,8 +156,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T00:03Z&end=2010-01-02T00:30Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
LOGGER.info(r.toString());
}
@@ -196,9 +189,8 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
String startTimeRequest = TimeUtil.getTimeWrtSystemTime(-17);
String endTimeRequest = TimeUtil.getTimeWrtSystemTime(23);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=" + startTimeRequest + "&end=" + endTimeRequest);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+ "?start=" + startTimeRequest + "&end=" + endTimeRequest);
LOGGER.info(r.toString());
}
@@ -222,8 +214,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
String startTime = TimeUtil.getTimeWrtSystemTime(1);
String endTime = TimeUtil.getTimeWrtSystemTime(40);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ .getProcessInstanceKill(processName, "?start=" + startTime + "&end=" + endTime);
LOGGER.info(r.getMessage());
Assert.assertEquals(r.getInstances(), null);
}
@@ -245,12 +236,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:05Z&end=2010-01-02T01:15Z");
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 2, 0, 0, 3);
}
@@ -269,13 +258,10 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
TimeUtil.sleepSeconds(TIMEOUT);
- prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:20Z");
+ prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:20Z");
TimeUtil.sleepSeconds(TIMEOUT);
- InstancesResult result = prism.getProcessHelper()
- .getProcessInstanceStatus(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
+ InstancesResult result = prism.getProcessHelper().getProcessInstanceStatus(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:20Z");
InstanceUtil.validateResponse(result, 5, 4, 0, 0, 1);
}
@@ -294,11 +280,9 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ .getProcessInstanceSuspend(processName, "?start=2010-01-02T01:00Z");
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.KILLED);
}
@@ -316,15 +300,13 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstanceReachState(serverOC.get(0), Util.getProcessName(bundles[0]
- .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(0), processName, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
InstanceUtil.validateSuccess(r, bundles[0], WorkflowStatus.SUCCEEDED);
}
-
@AfterClass(alwaysRun = true)
public void deleteData() throws Exception {
LOGGER.info("in @AfterClass");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
index 119d871..df65a79 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceRerunTest.java
@@ -53,30 +53,28 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
private String baseTestDir = baseHDFSDir + "/ProcessInstanceRerunTest";
private String aggregateWorkflowDir = baseTestDir + "/aggregator";
- private String feedInputPath = baseTestDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- private String feedOutputPath = baseTestDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
- private String feedInputTimedOutPath =
- baseTestDir + "/timedout/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
-
+ private String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ private String feedInputPath = baseTestDir + "/input" + datePattern;
+ private String feedOutputPath = baseTestDir + "/output-data" + datePattern;
+ private String feedInputTimedOutPath = baseTestDir + "/timedout" + datePattern;
private ColoHelper cluster = servers.get(0);
private FileSystem clusterFS = serverFS.get(0);
private OozieClient clusterOC = serverOC.get(0);
private static final Logger LOGGER = Logger.getLogger(ProcessInstanceRerunTest.class);
private static final double TIMEOUT = 10;
+ private String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
LOGGER.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
Bundle b = BundleUtil.readELBundle();
-
b = new Bundle(b, cluster);
String startDate = "2010-01-02T00:40Z";
String endDate = "2010-01-02T01:20Z";
b.setInputFeedDataPath(feedInputPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
@@ -89,6 +87,7 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].generateUniqueBundle();
bundles[0].setInputFeedDataPath(feedInputPath);
bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ processName = bundles[0].getProcessName();
}
@AfterMethod(alwaysRun = true)
@@ -111,17 +110,15 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
+ String process = bundles[0].getProcessData();
TimeUtil.sleepSeconds(TIMEOUT);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstancesResult r = prism.getProcessHelper().getProcessInstanceKill(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:16Z");
InstanceUtil.validateResponse(r, 4, 0, 0, 0, 4);
- List<String> wfIDs =
- InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ prism.getProcessHelper().getProcessInstanceRerun(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 5, 1, 0);
}
@@ -137,18 +134,16 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
- LOGGER.info("process: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ String process = bundles[0].getProcessData();
+ LOGGER.info("process: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
- List<String> wfIDs =
- InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ prism.getProcessHelper().
+ getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
}
@@ -166,16 +161,14 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(5);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ String process = bundles[0].getProcessData();
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
InstancesResult r = prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ .getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3);
- List<String> wfIDs =
- InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ prism.getProcessHelper().getProcessInstanceRerun(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
TimeUtil.sleepSeconds(TIMEOUT);
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 6, 6, 0, 0);
}
@@ -193,15 +186,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(1);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- prism.getProcessHelper()
- .getProcessInstanceKill(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
- String wfID = InstanceUtil.getWorkflows(cluster,
- Util.getProcessName(bundles[0].getProcessData()), Status.KILLED).get(0);
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ String process = bundles[0].getProcessData();
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ prism.getProcessHelper().getProcessInstanceKill(processName, "?start=2010-01-02T01:00Z");
+ String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.KILLED).get(0);
+ prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
}
@@ -219,15 +208,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(6);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- String wfID = InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0]
- .getProcessData()), Status.RUNNING, Status.SUCCEEDED).get(0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 0, CoordinatorAction
+ String process = bundles[0].getProcessData();
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ String wfID = InstanceUtil.getWorkflows(cluster, processName, Status.RUNNING,
+ Status.SUCCEEDED).get(0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 0, CoordinatorAction
.Status.SUCCEEDED, EntityType.PROCESS);
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z");
+ prism.getProcessHelper().getProcessInstanceRerun(processName, "?start=2010-01-02T01:00Z");
Assert.assertTrue(InstanceUtil.isWorkflowRunning(clusterOC, wfID));
}
@@ -245,14 +232,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(2);
bundles[0].submitFeedsScheduleProcess(prism);
- prism.getProcessHelper()
- .getProcessInstanceSuspend(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
- Assert.assertEquals(InstanceUtil
- .getInstanceStatus(cluster, Util.getProcessName(bundles[0].getProcessData()), 0, 1),
+ prism.getProcessHelper().getProcessInstanceSuspend(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ prism.getProcessHelper().getProcessInstanceRerun(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:06Z");
+ Assert.assertEquals(InstanceUtil.getInstanceStatus(cluster, processName, 0, 1),
CoordinatorAction.Status.SUSPENDED);
}
@@ -269,14 +253,13 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setOutputFeedLocationData(feedOutputPath);
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 2, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
- List<String> wfIDs =
- InstanceUtil.getWorkflows(cluster, Util.getProcessName(bundles[0].getProcessData()));
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ String process = bundles[0].getProcessData();
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 2,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ List<String> wfIDs = InstanceUtil.getWorkflows(cluster, processName);
+ prism.getProcessHelper().getProcessInstanceRerun(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
InstanceUtil.areWorkflowsRunning(clusterOC, wfIDs, 3, 3, 0, 0);
}
@@ -297,13 +280,11 @@ public class ProcessInstanceRerunTest extends BaseTestClass {
bundles[0].setProcessConcurrency(3);
bundles[0].submitFeedsScheduleProcess(prism);
CoordinatorAction.Status s;
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.getProcessName(bundles[0]
- .getProcessData()), 1, CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
- prism.getProcessHelper()
- .getProcessInstanceRerun(Util.readEntityName(bundles[0].getProcessData()),
- "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
- s = InstanceUtil
- .getInstanceStatus(cluster, Util.readEntityName(bundles[0].getProcessData()), 0, 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+ CoordinatorAction.Status.TIMEDOUT, EntityType.PROCESS);
+ prism.getProcessHelper().getProcessInstanceRerun(processName,
+ "?start=2010-01-02T01:00Z&end=2010-01-02T01:11Z");
+ s = InstanceUtil.getInstanceStatus(cluster, processName, 0, 0);
Assert.assertEquals(s, CoordinatorAction.Status.WAITING,
"instance should have been in WAITING state");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
index fc8e4a8..7647d15 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLibPathTest.java
@@ -52,45 +52,43 @@ public class ProcessLibPathTest extends BaseTestClass {
String testDir = baseHDFSDir + "/ProcessLibPath";
String testLibDir = testDir + "/TestLib";
private static final Logger logger = Logger.getLogger(ProcessLibPathTest.class);
+ String datePattern = "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ String processName;
+ String process;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
+
//common lib for both test cases
HadoopUtil.uploadDir(clusterFS, testLibDir, OSUtil.RESOURCES_OOZIE + "lib");
-
Bundle b = BundleUtil.readELBundle();
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-01T22:00Z";
String endDate = "2010-01-02T03:00Z";
-
- b.setInputFeedDataPath(testDir + "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ b.setInputFeedDataPath(testDir + "/input" + datePattern);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
-
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
-
@BeforeMethod(alwaysRun = true)
public void testName(Method method) throws Exception {
logger.info("test name: " + method.getName());
bundles[0] = BundleUtil.readELBundle();
bundles[0] = new Bundle(bundles[0], cluster);
bundles[0].generateUniqueBundle();
- bundles[0].setInputFeedDataPath(baseHDFSDir + "/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setInputFeedDataPath(baseHDFSDir + datePattern);
bundles[0].setProcessValidity("2010-01-02T01:00Z", "2010-01-02T01:04Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
bundles[0].setOutputFeedPeriodicity(5, TimeUnit.minutes);
- bundles[0].setOutputFeedLocationData(
- baseHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
+ bundles[0].setOutputFeedLocationData(baseHDFSDir + "/output-data" + datePattern);
bundles[0].setProcessConcurrency(1);
bundles[0].setProcessLibPath(testLibDir);
+ process = bundles[0].getProcessData();
+ processName = Util.readEntityName(process);
}
@AfterMethod(alwaysRun = true)
@@ -109,13 +107,11 @@ public class ProcessLibPathTest extends BaseTestClass {
HadoopUtil.uploadDir(clusterFS, workflowDir, OSUtil.RESOURCES_OOZIE);
HadoopUtil.deleteDirIfExists(workflowDir + "/lib", clusterFS);
bundles[0].setProcessWorkflow(workflowDir);
- logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ logger.info("processData: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
- .getProcessData()), 0);
- InstanceUtil
- .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
}
/**
@@ -131,12 +127,10 @@ public class ProcessLibPathTest extends BaseTestClass {
HadoopUtil.copyDataToFolder(clusterFS, workflowDir + "/lib",
OSUtil.RESOURCES + "ivory-oozie-lib-0.1.jar");
bundles[0].setProcessWorkflow(workflowDir);
- logger.info("processData: " + Util.prettyPrintXml(bundles[0].getProcessData()));
+ logger.info("processData: " + Util.prettyPrintXml(process));
bundles[0].submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, Util.readEntityName(bundles[0]
- .getProcessData()), 0);
- InstanceUtil
- .waitForBundleToReachState(cluster, bundles[0].getProcessName(), Status.SUCCEEDED);
+ InstanceUtil.waitTillInstancesAreCreated(cluster, process, 0);
+ OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitForBundleToReachState(cluster, processName, Status.SUCCEEDED);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/5dfe5cde/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 85bd770..1d900d9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -65,7 +65,6 @@ public class RetentionTest extends BaseTestClass {
String baseTestHDFSDir = baseHDFSDir + "/RetentionTest/";
String testHDFSDir = baseTestHDFSDir + TEST_FOLDERS;
private static final Logger logger = Logger.getLogger(RetentionTest.class);
-
ColoHelper cluster = servers.get(0);
FileSystem clusterFS = serverFS.get(0);
OozieClient clusterOC = serverOC.get(0);
@@ -85,12 +84,26 @@ public class RetentionTest extends BaseTestClass {
removeBundles();
}
+ /**
+ * Particular test case for https://issues.apache.org/jira/browse/FALCON-321
+ * @throws Exception
+ */
@Test
public void testRetentionWithEmptyDirectories() throws Exception {
- // test for https://issues.apache.org/jira/browse/FALCON-321
testRetention(24, RetentionUnit.HOURS, true, FeedType.DAILY, false);
}
+ /**
+ * Tests retention with different parameters. Validates its results based on expected and
+ * actual retained data.
+ *
+ * @param retentionPeriod period for which data should be retained
+ * @param retentionUnit type of retention limit attribute
+ * @param gaps defines gaps within list of data folders
+ * @param feedType feed type
+ * @param withData should folders be filled with data or not
+ * @throws Exception
+ */
@Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
final boolean gaps, final FeedType feedType, final boolean withData) throws Exception {
@@ -125,16 +138,28 @@ public class RetentionTest extends BaseTestClass {
if (gap) {
skip = gaps[new Random().nextInt(gaps.length)];
}
-
final DateTime today = new DateTime(DateTimeZone.UTC);
final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
feedType.addTime(today, -36), feedType.addTime(today, 36), skip, feedType);
final List<String> dataDates = TimeUtil.convertDatesToString(times, feedType.getFormatter());
logger.info("dataDates = " + dataDates);
-
HadoopUtil.replenishData(clusterFS, testHDFSDir, dataDates, withData);
}
+ /**
+ * Schedules feed and waits till retention succeeds. Makes validation of data which was removed
+ * and which was retained.
+ *
+ * @param feed analyzed retention feed
+ * @param feedType feed type
+ * @param retentionUnit type of retention limit attribute
+ * @param retentionPeriod period for which data should be retained
+ * @throws OozieClientException
+ * @throws IOException
+ * @throws URISyntaxException
+ * @throws AuthenticationException
+ * @throws JMSException
+ */
private void commonDataRetentionWorkflow(String feed, FeedType feedType,
RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
IOException, URISyntaxException, AuthenticationException, JMSException {
@@ -148,22 +173,20 @@ public class RetentionTest extends BaseTestClass {
JmsMessageConsumer messageConsumer = new JmsMessageConsumer("FALCON." + feedName,
cluster.getClusterHelper().getActiveMQ());
messageConsumer.start();
-
final DateTime currentTime = new DateTime(DateTimeZone.UTC);
String bundleId = OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0);
List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
logger.info("workflows: " + workflows);
-
messageConsumer.interrupt();
Util.printMessageData(messageConsumer);
+
//now look for cluster data
List<String> finalData = Util.getHadoopDataFromDir(clusterFS, feed, testHDFSDir);
//now see if retention value was matched to as expected
List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
retentionPeriod, feedType);
-
logger.info("initialData = " + initialData);
logger.info("finalData = " + finalData);
logger.info("expectedOutput = " + expectedOutput);
@@ -171,23 +194,31 @@ public class RetentionTest extends BaseTestClass {
final List<String> missingData = new ArrayList<String>(initialData);
missingData.removeAll(expectedOutput);
validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData);
-
Assert.assertEquals(finalData.size(), expectedOutput.size(),
- "sizes of outputs are different! please check");
+ "Expected and actual sizes of retained data are different! Please check.");
Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
expectedOutput.toArray(new String[expectedOutput.size()])));
}
+ /**
+ * Makes validation based on comparison of data which is expected to be removed with data
+ * mentioned in messages from ActiveMQ
+ *
+ * @param feedName feed name
+ * @param messages messages from ActiveMQ
+ * @param missingData data which is expected to be removed after retention succeeded
+ * @throws OozieClientException
+ * @throws JMSException
+ */
private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages,
List<String> missingData) throws OozieClientException, JMSException {
//just verify that each element in queue is same as deleted data!
List<String> workflowIds = OozieUtil.getWorkflowJobs(cluster,
OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
- //create queuedata folderList:
+ //create queue data folderList:
List<String> deletedFolders = new ArrayList<String>();
-
for (MapMessage message : messages) {
if (message != null) {
Assert.assertEquals(message.getString("entityName"), feedName);
@@ -205,7 +236,6 @@ public class RetentionTest extends BaseTestClass {
cluster.getFeedHelper().getActiveMQ());
}
}
-
Assert.assertEquals(deletedFolders.size(), missingData.size(),
"Output size is different than expected!");
Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]),
@@ -213,6 +243,16 @@ public class RetentionTest extends BaseTestClass {
"The missing data and message for delete operation don't correspond");
}
+ /**
+ * Evaluates amount of data which is expected to be retained
+ *
+ * @param inputData initial data on cluster
+ * @param currentTime current date
+ * @param retentionUnit type of retention limit attribute
+ * @param retentionPeriod period for which data should be retained
+ * @param feedType feed type
+ * @return list of data folders which are expected to be present on cluster
+ */
private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
RetentionUnit retentionUnit, int retentionPeriod, FeedType feedType) {
final List<String> finalData = new ArrayList<String>();
@@ -232,6 +272,9 @@ public class RetentionTest extends BaseTestClass {
final static int[] gaps = new int[]{2, 4, 5, 1};
+ /**
+ * Provides different sets of parameters for retention workflow.
+ */
@DataProvider(name = "betterDP")
public Object[][] getTestData(Method m) {
// a negative value like -4 should be covered in validation scenarios.