You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2014/09/03 17:22:07 UTC
git commit: FALCON-660 7 test classes refactored and few of them
documented. Contributed by Paul Isaychuk
Repository: incubator-falcon
Updated Branches:
refs/heads/master 75f06b4fe -> 3678eabab
FALCON-660 7 test classes refactored and few of them documented. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/3678eaba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/3678eaba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/3678eaba
Branch: refs/heads/master
Commit: 3678eabab73603a7ca3e45e1a56187cc0aca3619
Parents: 75f06b4
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Wed Sep 3 18:20:38 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Wed Sep 3 18:20:38 2014 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../falcon/regression/InstanceParamTest.java | 45 ++--
.../falcon/regression/InstanceSummaryTest.java | 171 +++++--------
.../regression/prism/EntityDryRunTest.java | 26 +-
.../regression/prism/PrismFeedSnSTest.java | 252 ++++++++++---------
.../prism/PrismProcessScheduleTest.java | 111 +++-----
.../regression/prism/PrismProcessSnSTest.java | 151 ++++++-----
.../falcon/regression/ui/ProcessUITest.java | 21 +-
8 files changed, 361 insertions(+), 419 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 94403c8..b60c23c 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,6 +9,9 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-660 7 test classes refactored and few of them documented (Paul Isaychuk via
+ Ruslan Ostafiychuk)
+
FALCON-653 Add falcon regression test for zero input process(Karishma via Samarth Gupta)
FALCON-655 Skip workflow upload if process won't be submitted (Ruslan Ostafiychuk)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
index d733cfc..edf3428 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceParamTest.java
@@ -58,24 +58,21 @@ public class InstanceParamTest extends BaseTestClass {
*/
private String baseTestHDFSDir = baseHDFSDir + "/InstanceParamTest";
- private String feedInputPath = baseTestHDFSDir
- +
+ private String feedInputPath = baseTestHDFSDir +
"/testInputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
private String startTime;
private String endTime;
-
private ColoHelper cluster1 = servers.get(0);
- private OozieClient oC1 = serverOC.get(0);
+ private OozieClient cluster1OC = serverOC.get(0);
private Bundle processBundle;
private static final Logger LOGGER = Logger.getLogger(InstanceParamTest.class);
-
+ private String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- startTime = TimeUtil.get20roundedTime(TimeUtil
- .getTimeWrtSystemTime(-20));
+ startTime = TimeUtil.get20roundedTime(TimeUtil.getTimeWrtSystemTime(-20));
endTime = TimeUtil.getTimeWrtSystemTime(60);
}
@@ -92,7 +89,12 @@ public class InstanceParamTest extends BaseTestClass {
bundles[i].generateUniqueBundle();
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
}
+ processName = processBundle.getProcessName();
}
+
+ /**
+ * Schedule process. Get params of waiting instance.
+ */
@Test(timeOut = 1200000, enabled = false)
public void getParamsValidRequestInstanceWaiting()
throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -104,12 +106,14 @@ public class InstanceParamTest extends BaseTestClass {
ClusterType.SOURCE, null, null);
processBundle.submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
- InstancesResult r = prism.getProcessHelper()
- .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
- "?start="+startTime);
+ InstancesResult r = prism.getProcessHelper().getInstanceParams(processName,
+ "?start=" + startTime);
r.getMessage();
}
+ /**
+ * Schedule process. Wait till instance succeeded. Get its params.
+ */
@Test(timeOut = 1200000, enabled = true)
public void getParamsValidRequestInstanceSucceeded()
throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -121,16 +125,18 @@ public class InstanceParamTest extends BaseTestClass {
ClusterType.SOURCE, null, null);
processBundle.submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
- processBundle.getProcessName(), 0);
- InstanceUtil.waitTillInstanceReachState(oC1, processBundle.getProcessName(), 1,
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, processName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 10);
InstancesResult r = prism.getProcessHelper()
- .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
- "?start="+startTime);
+ .getInstanceParams(processName, "?start=" + startTime);
LOGGER.info(r.getMessage());
}
+ /**
+ * Schedule process. Wait till instance got killed. Get its params.
+ * TODO: change according to test case
+ */
@Test(timeOut = 1200000, enabled = false)
public void getParamsValidRequestInstanceKilled()
throws URISyntaxException, JAXBException, AuthenticationException, IOException,
@@ -142,15 +148,12 @@ public class InstanceParamTest extends BaseTestClass {
ClusterType.SOURCE, null, null);
processBundle.submitFeedsScheduleProcess(prism);
InstanceUtil.waitTillInstancesAreCreated(cluster1, processBundle.getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
- processBundle.getProcessName(), 0);
- InstanceUtil.waitTillInstanceReachState(oC1, processBundle.getProcessName(), 0,
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, processName, 0);
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, processName, 0,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
InstancesResult r = prism.getProcessHelper()
- .getInstanceParams(Util.readEntityName(processBundle.getProcessData()),
- "?start="+startTime);
+ .getInstanceParams(processName, "?start=" + startTime);
r.getMessage();
-
}
@AfterMethod(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
index 154485f..636da2c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
@@ -50,43 +50,34 @@ import java.net.URISyntaxException;
import java.text.ParseException;
import java.util.List;
-/*
-this test currently provide minimum verification. More detailed test should
- be added
+/** This test currently provide minimum verification. More detailed test should be added:
+ 1. process : test summary single cluster few instance some future some past
+ 2. process : test multiple cluster, full past on one cluster, full future on one cluster,
+ half future / past on third one
+ 3. feed : same as test 1 for feed
+ 4. feed : same as test 2 for feed
*/
@Test(groups = "embedded")
public class InstanceSummaryTest extends BaseTestClass {
- //1. process : test summary single cluster few instance some future some past
- //2. process : test multiple cluster, full past on one cluster,
- // full future on one cluster, half future / past on third one
-
- // 3. feed : same as test 1 for feed
- // 4. feed : same as test 2 for feed
-
-
String baseTestHDFSDir = baseHDFSDir + "/InstanceSummaryTest";
String feedInputPath = baseTestHDFSDir +
"/testInputData/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
String startTime;
String endTime;
-
ColoHelper cluster3 = servers.get(2);
-
Bundle processBundle;
private static final Logger logger = Logger.getLogger(InstanceSummaryTest.class);
+ String processName;
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
uploadDirToClusters(aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
- startTime = TimeUtil.get20roundedTime(TimeUtil
- .getTimeWrtSystemTime
- (-20));
+ startTime = TimeUtil.get20roundedTime(TimeUtil.getTimeWrtSystemTime(-20));
endTime = TimeUtil.getTimeWrtSystemTime(60);
String startTimeData = TimeUtil.addMinsToTime(startTime, -100);
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startTimeData, endTime, 20);
-
for (FileSystem fs : serverFS) {
HadoopUtil.deleteDirIfExists(Util.getPathPrefix(feedInputPath), fs);
HadoopUtil.flattenAndPutDataInFolder(fs, OSUtil.NORMAL_INPUT,
@@ -108,100 +99,88 @@ public class InstanceSummaryTest extends BaseTestClass {
bundles[i].generateUniqueBundle();
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
}
+ processName = Util.readEntityName(processBundle.getProcessData());
}
+ /**
+ * Schedule single-cluster process. Get its instances summary.
+ * TODO: should be complete
+ */
@Test(enabled = true, timeOut = 1200000)
public void testSummarySingleClusterProcess()
throws URISyntaxException, JAXBException, IOException, ParseException,
OozieClientException, AuthenticationException {
processBundle.setProcessValidity(startTime, endTime);
processBundle.submitFeedsScheduleProcess(prism);
- InstanceUtil.waitTillInstancesAreCreated(cluster3,
- processBundle.getProcessData(), 0);
+ InstanceUtil.waitTillInstancesAreCreated(cluster3, processBundle.getProcessData(), 0);
// start only at start time
InstancesSummaryResult r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime);
-
- InstanceUtil.waitTillInstanceReachState(serverOC.get(2),
- Util.readEntityName(processBundle.getProcessData()), 2,
+ .getInstanceSummary(processName, "?start=" + startTime);
+ InstanceUtil.waitTillInstanceReachState(serverOC.get(2), processName, 2,
Status.SUCCEEDED, EntityType.PROCESS);
-
//AssertUtil.assertSucceeded(r);
//start only before process start
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+ r = prism.getProcessHelper().getInstanceSummary(processName,
"?start=" + TimeUtil.addMinsToTime(startTime, -100));
//AssertUtil.assertFailed(r,"response should have failed");
//start only after process end
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+ r = prism.getProcessHelper().getInstanceSummary(processName,
"?start=" + TimeUtil.addMinsToTime(startTime, 120));
//start only at mid specific instance
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- +10));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 10));
//start only in between 2 instance
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- 7));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 7));
//start and end at start and end
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
//start in between and end at end
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- 14) + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 14) + "&end=" + endTime);
//start at start and end between
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime,
- -20));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime, -20));
// start and end in between
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- 20) + "&end=" + TimeUtil.addMinsToTime(endTime, -13));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 20)
+ + "&end=" + TimeUtil.addMinsToTime(endTime, -13));
//start before start with end in between
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- -100) + "&end=" + TimeUtil.addMinsToTime(endTime, -37));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, -100)
+ + "&end=" + TimeUtil.addMinsToTime(endTime, -37));
//start in between and end after end
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- 60) + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime, 60)
+ + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
// both start end out od range
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + TimeUtil.addMinsToTime(startTime,
- -100) + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + TimeUtil.addMinsToTime(startTime,-100)
+ + "&end=" + TimeUtil.addMinsToTime(endTime, 100));
// end only
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
+ r = prism.getProcessHelper().getInstanceSummary(processName,
"?end=" + TimeUtil.addMinsToTime(endTime, -30));
}
+ /**
+ * Adjust multi-cluster process. Submit and schedule it. Get its instances summary.
+ * TODO: should be complete
+ */
@Test(enabled = true, timeOut = 1200000)
public void testSummaryMultiClusterProcess() throws JAXBException,
ParseException, IOException, URISyntaxException, AuthenticationException {
@@ -212,39 +191,31 @@ public class InstanceSummaryTest extends BaseTestClass {
ClusterType.SOURCE, null, null);
processBundle.submitFeedsScheduleProcess(prism);
InstancesSummaryResult r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime);
-
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ .getInstanceSummary(processName, "?start=" + startTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
-
-
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
-
-
- r = prism.getProcessHelper()
- .getInstanceSummary(Util.readEntityName(processBundle.getProcessData()),
- "?start=" + startTime + "&end=" + endTime);
+ r = prism.getProcessHelper().getInstanceSummary(processName,
+ "?start=" + startTime + "&end=" + endTime);
}
+ /**
+ * Adjust multi-cluster feed. Submit and schedule it. Get its instances summary.
+ * TODO: should be complete
+ */
@Test(enabled = true, timeOut = 1200000)
public void testSummaryMultiClusterFeed() throws JAXBException, ParseException, IOException,
URISyntaxException, OozieClientException, AuthenticationException {
@@ -253,7 +224,6 @@ public class InstanceSummaryTest extends BaseTestClass {
String feed = bundles[0].getDataSets().get(0);
//cluster_1 is target, cluster_2 is source and cluster_3 is neutral
-
feed = InstanceUtil.setFeedCluster(feed,
XmlUtil.createValidity("2012-10-01T12:00Z", "2010-01-01T00:00Z"),
XmlUtil.createRtention("days(100000)", ActionType.DELETE), null,
@@ -268,8 +238,7 @@ public class InstanceSummaryTest extends BaseTestClass {
.setFeedCluster(feed, XmlUtil.createValidity(startTime, "2099-10-01T12:25Z"),
XmlUtil.createRtention("days(100000)", ActionType.DELETE),
Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.TARGET,
- null,
- feedInputPath);
+ null, feedInputPath);
feed = InstanceUtil
.setFeedCluster(feed, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
@@ -287,21 +256,15 @@ public class InstanceSummaryTest extends BaseTestClass {
feedInputPath, 1);*/
//submit and schedule feed
- prism.getFeedHelper().submitAndSchedule(Util.URLS
- .SUBMIT_AND_SCHEDULE_URL, feed);
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
InstancesSummaryResult r = prism.getFeedHelper()
- .getInstanceSummary(Util.readEntityName(feed),
- "?start=" + startTime);
-
- r = prism.getFeedHelper()
- .getInstanceSummary(Util.readEntityName(feed),
- "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime,
- -20));
+ .getInstanceSummary(Util.readEntityName(feed), "?start=" + startTime);
+ r = prism.getFeedHelper().getInstanceSummary(Util.readEntityName(feed),
+ "?start=" + startTime + "&end=" + TimeUtil.addMinsToTime(endTime, -20));
}
-
@AfterMethod(alwaysRun = true)
public void tearDown() throws IOException {
processBundle.deleteBundle(prism);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
index 5000746..0b06823 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/EntityDryRunTest.java
@@ -52,9 +52,9 @@ public class EntityDryRunTest extends BaseTestClass {
private FileSystem clusterFS = serverFS.get(0);
private String baseTestHDFSDir = baseHDFSDir + "/EntityDryRunTest";
private String feedInputPath = baseTestHDFSDir +
- "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ "/input/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
private String feedOutputPath =
- baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
+ baseTestHDFSDir + "/output-data/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}";
private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
private static final Logger LOGGER = Logger.getLogger(EntityDryRunTest.class);
@@ -85,7 +85,6 @@ public class EntityDryRunTest extends BaseTestClass {
}
/**
- *
* tries to submit process with invalid el exp
*/
@Test(groups = {"singleCluster"})
@@ -93,12 +92,11 @@ public class EntityDryRunTest extends BaseTestClass {
bundles[0].setProcessProperty("EntityDryRunTestProp", "${coord:someEL(1)");
bundles[0].submitProcess(true);
ServiceResponse response = prism.getProcessHelper()
- .schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+ .schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
validate(response);
}
/**
- *
* tries to update process with invalid EL exp
*/
@Test(groups = {"singleCluster"})
@@ -108,17 +106,15 @@ public class EntityDryRunTest extends BaseTestClass {
bundles[0].submitAndScheduleProcess();
bundles[0].setProcessProperty("EntityDryRunTestProp", "${coord:someEL(1)");
ServiceResponse response = prism.getProcessHelper().update(bundles[0].getProcessData(),
- bundles[0].getProcessData(), TimeUtil.getTimeWrtSystemTime(5), null);
+ bundles[0].getProcessData(), TimeUtil.getTimeWrtSystemTime(5), null);
validate(response);
Assert.assertEquals(
OozieUtil.getNumberOfBundle(cluster, EntityType.PROCESS, bundles[0].getProcessName()),
- 1,
- "more than one bundle found after failed update request");
+ 1, "more than one bundle found after failed update request");
}
/**
* tries to submit feed with invalied EL exp
- *
*/
@Test(groups = {"singleCluster"})
public void testDryRunFailureScheduleFeed() throws Exception {
@@ -131,14 +127,14 @@ public class EntityDryRunTest extends BaseTestClass {
}
/**
- *
* tries to update feed with invalid el exp
*/
@Test(groups = {"singleCluster"})
public void testDryRunFailureUpdateFeed() throws Exception {
bundles[0].submitClusters(prism);
String feed = bundles[0].getInputFeedFromBundle();
- ServiceResponse response = prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
+ ServiceResponse response =
+ prism.getFeedHelper().submitAndSchedule(Util.URLS.SUBMIT_AND_SCHEDULE_URL, feed);
AssertUtil.assertSucceeded(response);
feed = Util.setFeedProperty(feed, "EntityDryRunTestProp", "${coord:someEL(1)");
response = prism.getFeedHelper().update(feed, feed);
@@ -150,11 +146,9 @@ public class EntityDryRunTest extends BaseTestClass {
private void validate(ServiceResponse response) throws JAXBException {
AssertUtil.assertFailed(response);
- Assert.assertTrue(response.getMessage()
- .contains("org.apache.falcon.FalconException: AUTHENTICATION : E1004 :" +
- " E1004: Expression language evaluation error, Unable to evaluate :${coord:someEL" +
- "(1)"),
- "Correct response was not present in process / feed schedule");
+ Assert.assertTrue(response.getMessage().contains("org.apache.falcon.FalconException: " +
+ "AUTHENTICATION : E1004 : Expression language evaluation error, Unable to evaluate " +
+ ":${coord:someEL(1)"), "Correct response was not present in process / feed schedule");
}
@AfterClass(alwaysRun = true)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
index b7da224..15a2c3c 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedSnSTest.java
@@ -56,6 +56,7 @@ public class PrismFeedSnSTest extends BaseTestClass {
private boolean restartRequired;
String aggregateWorkflowDir = baseHDFSDir + "/PrismFeedSnSTest/aggregator";
private static final Logger logger = Logger.getLogger(PrismFeedSnSTest.class);
+ String feed1, feed2;
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
@@ -72,6 +73,8 @@ public class PrismFeedSnSTest extends BaseTestClass {
bundles[i].generateUniqueBundle();
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
}
+ feed1 = bundles[0].getDataSets().get(0);
+ feed2 = bundles[1].getDataSets().get(0);
}
@AfterMethod(alwaysRun = true)
@@ -82,22 +85,27 @@ public class PrismFeedSnSTest extends BaseTestClass {
removeBundles();
}
-
+ /**
+ * Submit and schedule feed1 on cluster1 and check that only this feed is running there.
+ * Perform the same for feed2 on cluster2.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testFeedSnSOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- bundles[1].submitAndScheduleFeed();
- //now check if they have been scheduled correctly or not
+ bundles[1].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-
- //check if there is no criss cross
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
}
+ /**
+ * Submit and schedule feed1 on cluster1 and feed2 on cluster2. Check that they are running
+ * on matching clusters only. Submit and schedule them once more. Check that new bundles
+ * were not created and feed still running on matching clusters.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testSnSAlreadyScheduledFeedOnBothColos() throws Exception {
//schedule both bundles
@@ -112,102 +120,105 @@ public class PrismFeedSnSTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
-
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
//ensure only one bundle is there
Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
- Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ Util.readEntityName(feed1), EntityType.FEED).size(), 1);
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
+ //ensure only one bundle is there
Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
- Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ Util.readEntityName(feed2), EntityType.FEED).size(), 1);
//now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
}
-
+ /**
+ * Submit and schedule feed1 on cluster1, feed2 on cluster2. Suspend feed1 and check their
+ * statuses. Submit and schedule feed1 again. Check that statuses hasn't been changed and new
+ * bundle hasn't been created. Resume feed1. Repeat the same for feed2.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testSnSSuspendedFeedOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleFeed();
bundles[1].submitAndScheduleFeed();
- AssertUtil.assertSucceeded(prism.getFeedHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
- //now check if they have been scheduled correctly or not
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
Assert.assertEquals(OozieUtil.getBundles(cluster1OC,
- Util.readEntityName(bundles[0].getDataSets().get(0)), EntityType.FEED).size(), 1);
+ Util.readEntityName(feed1), EntityType.FEED).size(), 1);
- AssertUtil.assertSucceeded(cluster1.getFeedHelper()
- .resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper().resume(URLS.RESUME_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(prism.getFeedHelper()
- .suspend(URLS.SUSPEND_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(URLS.SUSPEND_URL, feed2));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
Assert.assertEquals(OozieUtil.getBundles(cluster2OC,
- Util.readEntityName(bundles[1].getDataSets().get(0)), EntityType.FEED).size(), 1);
- AssertUtil.assertSucceeded(cluster2.getFeedHelper()
- .resume(URLS.RESUME_URL, bundles[1].getDataSets().get(0)));
+ Util.readEntityName(feed2), EntityType.FEED).size(), 1);
+ AssertUtil.assertSucceeded(cluster2.getFeedHelper().resume(URLS.RESUME_URL, feed2));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
-
-
}
+ /**
+ * Submit and schedule both feeds. Delete them and submit and schedule again. Check that
+ * action succeeded.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testSnSDeletedFeedOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleFeed();
bundles[1].submitAndScheduleFeed();
- AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed2));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
}
+ /**
+ * Attempt to submit and schedule non-registered feed should fail.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testScheduleNonExistentFeedOnBothColos() throws Exception {
AssertUtil.assertFailed(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.assertFailed(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
}
+ /**
+ * Shut down server on cluster1. Submit and schedule feed on cluster2. Check that only
+ * mentioned feed is running there.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testFeedSnSOn1ColoWhileOtherColoIsDown() throws Exception {
restartRequired = true;
- for (String cluster : bundles[1].getClusters()) {
- AssertUtil
- .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
- }
+ AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL,
+ bundles[1].getClusters().get(0)));
Util.shutDownService(cluster1.getFeedHelper());
-
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
//now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -215,24 +226,29 @@ public class PrismFeedSnSTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
}
-
+ /**
+ * Attempt to submit and schedule feed on cluster which is down should fail and this feed
+ * shouldn't run on another cluster.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testFeedSnSOn1ColoWhileThatColoIsDown() throws Exception {
restartRequired = true;
bundles[0].submitFeed();
-
Util.shutDownService(cluster1.getFeedHelper());
-
AssertUtil.assertFailed(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
}
+ /**
+ * Submit and schedule and then suspend feed1 on cluster1. Submit and schedule feed2 on
+ * cluster2 and check that this actions don't affect each other.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeed() throws Exception {
bundles[0].submitAndScheduleFeed();
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ .suspend(URLS.SUSPEND_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
bundles[1].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -241,11 +257,15 @@ public class PrismFeedSnSTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
}
+ /**
+ * Submit and schedule and then delete feed1 on cluster1. Submit and schedule feed2 on
+ * cluster2 and check that this actions don't affect each other.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeed() throws Exception {
bundles[0].submitAndScheduleFeed();
AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
bundles[1].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -254,108 +274,110 @@ public class PrismFeedSnSTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
}
+ /**
+ * Submit and schedule feed1 on cluster1 and check that it failed. Repeat for feed2.
+ * TODO: should be reviewed
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testFeedSnSOnBothColosUsingColoHelper() throws Exception {
//schedule both bundles
bundles[0].submitFeed();
APIResult result = Util.parseResponse((cluster1.getFeedHelper()
- .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0))));
+ .submitEntity(URLS.SUBMIT_AND_SCHEDULE_URL, feed1)));
Assert.assertEquals(result.getStatusCode(), 404);
AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
bundles[1].submitFeed();
result = Util.parseResponse(cluster2.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
Assert.assertEquals(result.getStatusCode(), 404);
-
AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
}
-
+ /**
+ * Submit and schedule both feeds. Suspend feed1 and submit and schedule it once more. Check
+ * that status of feed1 is still suspended. Resume it. Suspend feed2 but submit and schedule
+ * feed1 again. Check that it didn't affect feed2 and it is still suspended.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testSnSSuspendedFeedOnBothColosUsingColoHelper() throws Exception {
-
//schedule both bundles
bundles[0].submitFeed();
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
bundles[1].submitFeed();
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
- AssertUtil.assertSucceeded(cluster1.getFeedHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
//now check if they have been scheduled correctly or not
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
- AssertUtil.assertSucceeded(
- cluster1.getFeedHelper().resume(URLS.RESUME_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper().resume(URLS.RESUME_URL, feed1));
- AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL,
- bundles[1].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(cluster2.getFeedHelper().suspend(URLS.SUSPEND_URL, feed2));
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
}
-
+ /**
+ * Submit and schedule both feeds and then delete them. Submit and schedule feeds again.
+ * Check that action succeeded and feeds are running.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testScheduleDeletedFeedOnBothColosUsingColoHelper() throws Exception {
-
//schedule both bundles
bundles[0].submitAndScheduleFeed();
bundles[1].submitAndScheduleFeed();
- AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[1].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed2));
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.KILLED);
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1));
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
-
- Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
- .getStatus(URLS.STATUS_URL, bundles[0].getDataSets().get(0))).getMessage(),
- cluster1.getClusterHelper().getColoName() + "/RUNNING");
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
- Assert.assertEquals(Util.parseResponse(prism.getFeedHelper()
- .getStatus(URLS.STATUS_URL, bundles[1].getDataSets().get(0))).getMessage(),
- cluster2.getClusterHelper().getColoName() + "/RUNNING");
+ Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().getStatus(URLS.STATUS_URL,
+ feed1)).getMessage(), cluster1.getClusterHelper().getColoName() + "/RUNNING");
+ Assert.assertEquals(Util.parseResponse(prism.getFeedHelper().getStatus(URLS.STATUS_URL,
+ feed2)).getMessage(), cluster2.getClusterHelper().getColoName() + "/RUNNING");
}
-
+ /**
+ * Attempt to submit and schedule non-registered feeds should fail.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testSNSNonExistentFeedOnBothColosUsingColoHelper() throws Exception {
-
Assert.assertEquals(Util.parseResponse(cluster1.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getDataSets().get(0)))
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed1))
.getStatusCode(), 404);
Assert.assertEquals(Util.parseResponse(cluster2.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)))
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2))
.getStatusCode(), 404);
}
+ /**
+ * Shut down server on cluster1. Submit and schedule feed on cluster2. Check that only that
+ * feed is running on cluster2.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testFeedSnSOn1ColoWhileOtherColoIsDownUsingColoHelper() throws Exception {
restartRequired = true;
- for (String cluster : bundles[1].getClusters()) {
- AssertUtil
- .assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, cluster));
- }
+ AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL,
+ bundles[1].getClusters().get(0)));
Util.shutDownService(cluster1.getFeedHelper());
-
AssertUtil.assertSucceeded(prism.getFeedHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getDataSets().get(0)));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, feed2));
//now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
@@ -363,21 +385,24 @@ public class PrismFeedSnSTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
}
-
+ /**
+ * Set both clusters as feed clusters. Shut down one of them. Submit and schedule feed.
+ * Check that action is partially successful.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testFeedSnSOn1ColoWhileThatColoIsDownUsingColoHelper() throws Exception {
restartRequired = true;
+ String clust1 = bundles[0].getClusters().get(0);
+ String clust2 = bundles[1].getClusters().get(0);
bundles[0].setCLusterColo(cluster1.getClusterHelper().getColoName());
- logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
- ServiceResponse r =
- prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[0].getClusters().get(0));
+ logger.info("cluster bundles[0]: " + Util.prettyPrintXml(clust1));
+ ServiceResponse r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, clust1);
Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
bundles[1].setCLusterColo(cluster2.getClusterHelper().getColoName());
- logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
- r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, bundles[1].getClusters().get(0));
+ logger.info("cluster bundles[1]: " + Util.prettyPrintXml(clust2));
+ r = prism.getClusterHelper().submitEntity(URLS.SUBMIT_URL, clust2);
Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
String startTimeUA1 = "2012-10-01T12:00Z";
@@ -392,53 +417,52 @@ public class PrismFeedSnSTest extends BaseTestClass {
feed = InstanceUtil
.setFeedCluster(feed, XmlUtil.createValidity(startTimeUA1, "2099-10-01T12:10Z"),
XmlUtil.createRtention("days(10000)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
- "${cluster.colo}",
+ Util.readEntityName(clust1), ClusterType.SOURCE, "${cluster.colo}",
baseHDFSDir + "/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
-
feed = InstanceUtil
.setFeedCluster(feed, XmlUtil.createValidity(startTimeUA2, "2099-10-01T12:25Z"),
XmlUtil.createRtention("days(10000)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null,
- baseHDFSDir +
+ Util.readEntityName(clust2), ClusterType.TARGET, null, baseHDFSDir +
"/clusterPath/localDC/rc/billing/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}");
-
logger.info("feed: " + Util.prettyPrintXml(feed));
Util.shutDownService(cluster1.getFeedHelper());
- ServiceResponse response = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
- AssertUtil.assertPartial(response);
- response = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
- AssertUtil.assertPartial(response);
+ r = prism.getFeedHelper().submitEntity(URLS.SUBMIT_URL, feed);
+ AssertUtil.assertPartial(r);
+ r = prism.getFeedHelper().schedule(URLS.SCHEDULE_URL, feed);
+ AssertUtil.assertPartial(r);
Util.startService(cluster1.getFeedHelper());
- prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[0].getClusters().get(0));
- prism.getClusterHelper().delete(URLS.DELETE_URL, bundles[1].getClusters().get(0));
-
+ prism.getClusterHelper().delete(URLS.DELETE_URL, clust1);
+ prism.getClusterHelper().delete(URLS.DELETE_URL, clust2);
}
-
+ /**
+ * Submit and schedule feed1 and suspend it. Submit and schedule feed2 on another cluster
+ * and check that only feed2 is running on cluster2.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testFeedSnSOn1ColoWhileAnotherColoHasSuspendedFeedUsingColoHelper()
throws Exception {
bundles[0].submitAndScheduleFeed();
- AssertUtil.assertSucceeded(
- cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(cluster1.getFeedHelper().suspend(URLS.SUSPEND_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
bundles[1].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
- AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster2OC, EntityType.FEED, bundles[0], Job.Status.RUNNING);
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.SUSPENDED);
- AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+ AssertUtil.checkNotStatus(cluster1OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
}
-
+ /**
+ * Submit and schedule and delete feed1. Submit and schedule feed2 and check that this
+ * action didn't affect feed1 and it is still killed.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testFeedSnSOn1ColoWhileAnotherColoHasKilledFeedUsingColoHelper() throws Exception {
bundles[0].submitAndScheduleFeed();
- AssertUtil.assertSucceeded(
- prism.getFeedHelper().delete(URLS.DELETE_URL, bundles[0].getDataSets().get(0)));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().delete(URLS.DELETE_URL, feed1));
AssertUtil.checkStatus(cluster1OC, EntityType.FEED, bundles[0], Job.Status.KILLED);
bundles[1].submitAndScheduleFeed();
AssertUtil.checkStatus(cluster2OC, EntityType.FEED, bundles[1], Job.Status.RUNNING);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
index 10df6c2..5590c54 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessScheduleTest.java
@@ -56,6 +56,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessScheduleTest/aggregator";
String workflowForNoIpOp = baseHDFSDir + "/PrismProcessScheduleTest/noinop";
private static final Logger logger = Logger.getLogger(PrismProcessScheduleTest.class);
+ String process1;
+ String process2;
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
@@ -72,6 +74,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
bundles[i].generateUniqueBundle();
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
}
+ process1 = bundles[0].getProcessData();
+ process2 = bundles[1].getProcessData();
}
@AfterMethod(alwaysRun = true)
@@ -99,7 +103,6 @@ public class PrismProcessScheduleTest extends BaseTestClass {
//check if there is no criss cross
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
}
/**
@@ -123,9 +126,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ .schedule(URLS.SCHEDULE_URL, process1));
AssertUtil.assertSucceeded(cluster1.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ .schedule(URLS.SCHEDULE_URL, process2));
//now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
@@ -148,33 +151,27 @@ public class PrismProcessScheduleTest extends BaseTestClass {
bundles[1].submitAndScheduleProcess();
//suspend process on colo-1
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().suspend(URLS.SUSPEND_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
//now check if it has been scheduled correctly or not
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .resume(URLS.RESUME_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.RESUME_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
//suspend process on colo-2
- AssertUtil.assertSucceeded(cluster1.getProcessHelper()
- .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper().suspend(URLS.SUSPEND_URL, process2));
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
//now check if it has been scheduled correctly or not
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .resume(URLS.RESUME_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.RESUME_URL, process2));
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- }
+ }
/**
* Schedule two processes on different colos. Delete both of them. Try to schedule them once
@@ -188,21 +185,16 @@ public class PrismProcessScheduleTest extends BaseTestClass {
bundles[0].submitAndScheduleProcess();
bundles[1].submitAndScheduleProcess();
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process2));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
- AssertUtil.assertFailed(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
- AssertUtil.assertFailed(cluster1.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
-
+ AssertUtil.assertFailed(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
+ AssertUtil.assertFailed(cluster1.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
}
/**
@@ -212,11 +204,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
*/
@Test(groups = {"prism", "0.2", "embedded"})
public void testScheduleNonExistentProcessOnBothColos() throws Exception {
- AssertUtil.assertFailed(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
- AssertUtil.assertFailed(cluster1.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[1].getProcessData()));
-
+ AssertUtil.assertFailed(cluster2.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
+ AssertUtil.assertFailed(cluster1.getProcessHelper().schedule(URLS.SCHEDULE_URL, process2));
}
/**
@@ -230,11 +219,9 @@ public class PrismProcessScheduleTest extends BaseTestClass {
public void testProcessScheduleOn1ColoWhileOtherColoIsDown() throws Exception {
try {
bundles[1].submitProcess(true);
-
Util.shutDownService(cluster2.getProcessHelper());
-
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
//now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
@@ -260,11 +247,8 @@ public class PrismProcessScheduleTest extends BaseTestClass {
public void testProcessScheduleOn1ColoWhileThatColoIsDown() throws Exception {
try {
bundles[0].submitProcess(true);
-
Util.shutDownService(cluster2.getProcessHelper());
-
- AssertUtil.assertFailed(prism.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertFailed(prism.getProcessHelper().schedule(URLS.SCHEDULE_URL, process1));
AssertUtil
.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
} catch (Exception e) {
@@ -273,7 +257,6 @@ public class PrismProcessScheduleTest extends BaseTestClass {
} finally {
Util.restartService(cluster2.getProcessHelper());
}
-
}
/**
@@ -289,7 +272,7 @@ public class PrismProcessScheduleTest extends BaseTestClass {
try {
bundles[0].submitAndScheduleProcess();
AssertUtil.assertSucceeded(cluster1.getProcessHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ .suspend(URLS.SUSPEND_URL, process1));
AssertUtil
.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
@@ -301,12 +284,10 @@ public class PrismProcessScheduleTest extends BaseTestClass {
.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
AssertUtil
.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
-
} catch (Exception e) {
e.printStackTrace();
throw new TestNGException(e.getMessage());
}
-
}
/**
@@ -321,8 +302,7 @@ public class PrismProcessScheduleTest extends BaseTestClass {
throws Exception {
try {
bundles[0].submitAndScheduleProcess();
- AssertUtil.assertSucceeded(prism.getProcessHelper()
- .delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
bundles[1].submitAndScheduleProcess();
@@ -346,47 +326,30 @@ public class PrismProcessScheduleTest extends BaseTestClass {
*/
@Test(groups = {"prism", "0.2", "embedded"}, enabled = true, timeOut = 1800000)
public void testRescheduleKilledProcess() throws Exception {
-
- /*
- add test data generator pending
- */
-
+ /* add test data generator pending */
bundles[0].setProcessValidity(TimeUtil.getTimeWrtSystemTime(-1),
- TimeUtil.getTimeWrtSystemTime(1));
+ TimeUtil.getTimeWrtSystemTime(1));
HadoopFileEditor hadoopFileEditor = null;
+ String process = bundles[0].getProcessData();
try {
-
- hadoopFileEditor = new HadoopFileEditor(cluster1
- .getClusterHelper().getHadoopFS());
-
- hadoopFileEditor.edit(new ProcessMerlin(bundles[0]
- .getProcessData()).getWorkflow().getPath() + "/workflow.xml",
- "<value>${outputData}</value>",
- "<property>\n" +
- " <name>randomProp</name>\n" +
- " <value>randomValue</value>\n" +
- " </property>");
-
+ hadoopFileEditor = new HadoopFileEditor(cluster1.getClusterHelper().getHadoopFS());
+ hadoopFileEditor.edit(new ProcessMerlin(process).getWorkflow().getPath() +
+ "/workflow.xml", "<value>${outputData}</value>",
+ "<property>\n" +
+ " <name>randomProp</name>\n" +
+ " <value>randomValue</value>\n" +
+ " </property>");
bundles[0].submitFeedsScheduleProcess(prism);
-
InstanceUtil.waitForBundleToReachState(cluster1,
- Util.readEntityName(bundles[0].getProcessData()),
- org.apache.oozie.client.Job.Status.KILLED);
-
+ Util.readEntityName(process),Job.Status.KILLED);
String oldBundleID = InstanceUtil.getLatestBundleID(cluster1,
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS);
-
- prism.getProcessHelper().delete(URLS.DELETE_URL,
- bundles[0].getProcessData());
+ Util.readEntityName(process), EntityType.PROCESS);
+ prism.getProcessHelper().delete(URLS.DELETE_URL, process);
bundles[0].submitAndScheduleProcess();
-
OozieUtil.verifyNewBundleCreation(cluster1, oldBundleID,
- new ArrayList<String>(),
- bundles[0].getProcessData(), true,
- false);
+ new ArrayList<String>(), process, true, false);
} finally {
-
if (hadoopFileEditor != null) {
hadoopFileEditor.restore();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
index 842bc1a..304549d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -50,6 +50,8 @@ public class PrismProcessSnSTest extends BaseTestClass {
OozieClient cluster2OC = serverOC.get(1);
String aggregateWorkflowDir = baseHDFSDir + "/PrismProcessSnSTest/aggregator";
private static final Logger logger = Logger.getLogger(PrismProcessSnSTest.class);
+ String process1;
+ String process2;
@BeforeClass(alwaysRun = true)
public void uploadWorkflow() throws Exception {
@@ -65,6 +67,8 @@ public class PrismProcessSnSTest extends BaseTestClass {
bundles[i].generateUniqueBundle();
bundles[i].setProcessWorkflow(aggregateWorkflowDir);
}
+ process1 = bundles[0].getProcessData();
+ process2 = bundles[1].getProcessData();
}
@AfterMethod(alwaysRun = true)
@@ -72,6 +76,11 @@ public class PrismProcessSnSTest extends BaseTestClass {
removeBundles();
}
+ /**
+ * Submit and schedule process1 on cluster1. Check that process2 is not running on cluster1.
+ * Submit and schedule process2 on cluster2. Check that process2 is running and process1 is
+ * not running on cluster2.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testProcessSnSOnBothColos() throws Exception {
//schedule both bundles
@@ -79,49 +88,49 @@ public class PrismProcessSnSTest extends BaseTestClass {
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
bundles[1].submitAndScheduleProcess();
-
- //now check if they have been scheduled correctly or not
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
+
//check if there is no criss cross
- ServiceResponse response =
- prism.getProcessHelper()
- .getStatus(URLS.STATUS_URL, bundles[1].getProcessData());
+ ServiceResponse response = prism.getProcessHelper().getStatus(URLS.STATUS_URL, process2);
logger.info(response.getMessage());
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
}
+ /**
+ * Submit process1 on cluster1 and schedule it. Check that process1 runs on cluster1 but not
+ * on cluster2. Submit process2 but schedule process1 once more. Check that process1 is running
+ * on cluster1 but not on cluster2.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception {
//schedule both bundles
-
bundles[0].submitProcess(true);
-
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
bundles[1].submitProcess(true);
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
- //now check if they have been scheduled correctly or not
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
//check if there is no criss cross
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
}
+ /**
+ * Submit process1 on cluster1 and schedule it. Check that only process1 runs on cluster1.
+ * Submit process2 and check that it isn't running on cluster1. Submit and schedule process1
+ * once more and check that it is still running on cluster1 but process2 isn't running on
+ * cluster2.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper()
throws Exception {
- //schedule both bundles
-
bundles[0].submitProcess(true);
-
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
bundles[1].submitProcess(true);
@@ -130,127 +139,109 @@ public class PrismProcessSnSTest extends BaseTestClass {
bundles[1].submitProcess(true);
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
- //now check if they have been scheduled correctly or not
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
//check if there is no criss cross
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
-
}
+ /**
+ * Submit and schedule process1 on cluster1 and check that only it is running there. Submit
+ * and schedule process2 on cluster2 and check the same for it. Schedule process1 on cluster2.
+ * Check that it is running on cluster2 and cluster1 but process2 isn't running on cluster1.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testProcessSnSAlreadyScheduledOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleProcess();
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- bundles[1].submitAndScheduleProcess();
- //now check if they have been scheduled correctly or not
+ bundles[1].submitAndScheduleProcess();
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- //check if there is no criss cross
AssertUtil.checkNotStatus(cluster2OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
//reschedule trial
-
AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .schedule(URLS.SCHEDULE_URL, bundles[0].getProcessData()));
+ .schedule(URLS.SCHEDULE_URL, process1));
Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
+ Util.readEntityName(process1), EntityType.PROCESS).size(), 1);
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.RUNNING);
AssertUtil.checkNotStatus(cluster1OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
}
+ /**
+ * Submit and schedule both process1 and process2. Suspend process1. Check their statuses.
+ * Submit and schedule process1 once more.
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testSnSSuspendedProcessOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleProcess();
bundles[1].submitAndScheduleProcess();
-
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .suspend(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().suspend(URLS.SUSPEND_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- //now check if they have been scheduled correctly or not
+
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
- Assert.assertEquals(OozieUtil.getBundles(cluster2.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[0].getProcessData()), EntityType.PROCESS).size(), 1);
- AssertUtil.assertSucceeded(cluster2.getProcessHelper()
- .resume(URLS.SUSPEND_URL, bundles[0].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
+ Assert.assertEquals(OozieUtil.getBundles(cluster2OC, Util.readEntityName(process1),
+ EntityType.PROCESS).size(), 1);
+ AssertUtil.assertSucceeded(cluster2.getProcessHelper().resume(URLS.SUSPEND_URL, process1));
- AssertUtil.assertSucceeded(cluster1.getProcessHelper()
- .suspend(URLS.SUSPEND_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(cluster1.getProcessHelper().suspend(URLS.SUSPEND_URL, process2));
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
- Assert.assertEquals(OozieUtil.getBundles(cluster1.getFeedHelper().getOozieClient(),
- Util.readEntityName(bundles[1].getProcessData()), EntityType.PROCESS).size(), 1);
+ Assert.assertEquals(OozieUtil.getBundles(cluster1OC, Util.readEntityName(process2),
+ EntityType.PROCESS).size(), 1);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.SUSPENDED);
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.SUSPENDED);
}
+ /**
+ * Submit and schedule both processes on both cluster1 and cluster2. Check that they are
+ * running. Delete both of them. Submit and schedule them once more. Check that they are
+ * running again.
+ */
@Test(groups = {"prism", "0.2", "embedded"})
public void testSnSDeletedProcessOnBothColos() throws Exception {
//schedule both bundles
final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING";
final String cluster2Running = cluster2.getClusterHelper().getColoName() + "/RUNNING";
- bundles[0].submitAndScheduleProcess();
-
- Assert.assertEquals(Util.parseResponse(
- prism.getProcessHelper()
- .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())).getMessage(),
- cluster1Running
- );
+ bundles[0].submitAndScheduleProcess();
+ Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, process1)).getMessage(), cluster1Running);
bundles[1].submitAndScheduleProcess();
- Assert.assertEquals(Util.parseResponse(
- prism.getProcessHelper()
- .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())).getMessage(),
- cluster2Running
- );
+ Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, process2)).getMessage(), cluster2Running);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[0].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process1));
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.RUNNING);
- AssertUtil.assertSucceeded(
- prism.getProcessHelper().delete(URLS.DELETE_URL, bundles[1].getProcessData()));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().delete(URLS.DELETE_URL, process2));
AssertUtil.checkStatus(cluster2OC, EntityType.PROCESS, bundles[1], Job.Status.KILLED);
AssertUtil.checkStatus(cluster1OC, EntityType.PROCESS, bundles[0], Job.Status.KILLED);
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()));
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1));
AssertUtil.assertSucceeded(prism.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()));
-
- Assert.assertEquals(Util.parseResponse(
- prism.getProcessHelper()
- .getStatus(URLS.STATUS_URL, bundles[0].getProcessData())
- ).getMessage(),
- cluster1Running
- );
- Assert.assertEquals(Util.parseResponse(
- prism.getProcessHelper()
- .getStatus(URLS.STATUS_URL, bundles[1].getProcessData())
- ).getMessage(),
- cluster2Running
- );
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2));
+ Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, process1)).getMessage(),cluster1Running);
+ Assert.assertEquals(Util.parseResponse(prism.getProcessHelper()
+ .getStatus(URLS.STATUS_URL, process2)).getMessage(), cluster2Running);
}
+ /**
+ * Attempt to submit and schedule processes when all required entities weren't registered
+ */
@Test(groups = {"prism", "0.2", "distributed"})
public void testScheduleNonExistentProcessOnBothColos() throws Exception {
Assert.assertEquals(Util.parseResponse(cluster2.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[0].getProcessData()))
- .getStatusCode(), 404);
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process1)).getStatusCode(), 404);
Assert.assertEquals(Util.parseResponse(cluster1.getProcessHelper()
- .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, bundles[1].getProcessData()))
- .getStatusCode(), 404);
-
- }
-
- @AfterClass(alwaysRun = true)
- public void tearDownClass() throws IOException {
- cleanTestDirs();
+ .submitAndSchedule(URLS.SUBMIT_AND_SCHEDULE_URL, process2)).getStatusCode(), 404);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/3678eaba/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
index 458ddc6..c82786f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ui/ProcessUITest.java
@@ -176,27 +176,28 @@ public class ProcessUITest extends BaseUITestClass {
//check Process statuses via UI
EntitiesPage page = new EntitiesPage(DRIVER, cluster, EntityType.PROCESS);
page.navigateTo();
-
- softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+ String process = bundles[0].getProcessData();
+ String processName = Util.readEntityName(process);
+ softAssert.assertEquals(page.getEntityStatus(processName),
EntitiesPage.EntityStatus.SUBMITTED, "Process status should be SUBMITTED");
- prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, bundles[0].getProcessData());
+ prism.getProcessHelper().schedule(Util.URLS.SCHEDULE_URL, process);
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 1, CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS);
- softAssert.assertEquals(page.getEntityStatus(bundles[0].getProcessName()),
+ softAssert.assertEquals(page.getEntityStatus(processName),
EntitiesPage.EntityStatus.RUNNING, "Process status should be RUNNING");
- ProcessPage processPage = new ProcessPage(DRIVER, cluster, bundles[0].getProcessName());
+ ProcessPage processPage = new ProcessPage(DRIVER, cluster, processName);
processPage.navigateTo();
- String bundleID = InstanceUtil.getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS);
+ String bundleID = InstanceUtil.getLatestBundleID(cluster, processName, EntityType.PROCESS);
Map<Date, CoordinatorAction.Status> actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID,
EntityType.PROCESS);
checkActions(actions, processPage);
- InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0]
- .getProcessData()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
processPage.refresh();
actions = OozieUtil.getActionsNominalTimeAndStatus(prism, bundleID, EntityType.PROCESS);