You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/09/12 00:19:30 UTC
[25/41] git commit: FALCON-643 Tests with zero-output/input scenario
amended to match test case. Contributed by Paul Isaychuk
FALCON-643 Tests with zero-output/input scenario amended to match test case. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d9c115e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d9c115e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d9c115e0
Branch: refs/heads/FALCON-585
Commit: d9c115e0e9b55b8a61bdce1c43de88a4e1cc6d0d
Parents: 14b9add
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Thu Sep 4 12:56:47 2014 +0300
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Thu Sep 4 12:57:35 2014 +0300
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 3 +
.../falcon/regression/NoOutputProcessTest.java | 18 +---
.../regression/prism/PrismFeedUpdateTest.java | 88 ++++++++++----------
3 files changed, 53 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index b60c23c..0b4714a 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -9,6 +9,9 @@ Trunk (Unreleased)
via Samarth Gupta)
IMPROVEMENTS
+ FALCON-643 Tests with zero-output/input scenario amended to match test case (Paul Isaychuk via
+ Ruslan Ostafiychuk)
+
FALCON-660 7 test classes refactored and few of them documented (Paul Isaychuk via
Ruslan Ostafiychuk)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
index 25456a2..2c30f83 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/NoOutputProcessTest.java
@@ -18,6 +18,7 @@
package org.apache.falcon.regression;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
@@ -62,26 +63,20 @@ public class NoOutputProcessTest extends BaseTestClass {
@BeforeClass(alwaysRun = true)
public void createTestData() throws Exception {
-
logger.info("in @BeforeClass");
HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
-
Bundle b = BundleUtil.readELBundle();
b.generateUniqueBundle();
b = new Bundle(b, cluster);
-
String startDate = "2010-01-03T00:00Z";
String endDate = "2010-01-03T03:00Z";
-
b.setInputFeedDataPath(inputPath);
String prefix = b.getFeedDataPathPrefix();
HadoopUtil.deleteDirIfExists(prefix.substring(1), clusterFS);
-
List<String> dataDates = TimeUtil.getMinuteDatesOnEitherSide(startDate, endDate, 20);
HadoopUtil.flattenAndPutDataInFolder(clusterFS, OSUtil.NORMAL_INPUT, prefix, dataDates);
}
-
@BeforeMethod(alwaysRun = true)
public void testName(Method method) throws Exception {
logger.info("test name: " + method.getName());
@@ -92,6 +87,9 @@ public class NoOutputProcessTest extends BaseTestClass {
bundles[0].setInputFeedDataPath(inputPath);
bundles[0].setProcessValidity("2010-01-03T02:30Z", "2010-01-03T02:45Z");
bundles[0].setProcessPeriodicity(5, TimeUnit.minutes);
+ ProcessMerlin process = new ProcessMerlin(bundles[0].getProcessData());
+ process.setOutputs(null);
+ bundles[0].setProcessData(process.toString());
bundles[0].submitFeedsScheduleProcess(prism);
}
@@ -110,16 +108,12 @@ public class NoOutputProcessTest extends BaseTestClass {
//wait for all the instances to complete
InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
Assert.assertEquals(messageConsumer.getReceivedMessages().size(), 3,
" Message for all the 3 instance not found");
-
messageConsumer.interrupt();
-
Util.printMessageData(messageConsumer);
}
-
@Test(enabled = true, groups = {"singleCluster"})
public void rm() throws Exception {
JmsMessageConsumer consumerEntityMsg =
@@ -127,22 +121,18 @@ public class NoOutputProcessTest extends BaseTestClass {
JmsMessageConsumer consumerProcessMsg =
new JmsMessageConsumer("FALCON." + bundles[0].getProcessName(),
cluster.getClusterHelper().getActiveMQ());
-
consumerEntityMsg.start();
consumerProcessMsg.start();
//wait for all the instances to complete
InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
-
Assert.assertEquals(consumerEntityMsg.getReceivedMessages().size(), 3,
" Message for all the 3 instance not found");
Assert.assertEquals(consumerProcessMsg.getReceivedMessages().size(), 3,
" Message for all the 3 instance not found");
-
consumerEntityMsg.interrupt();
consumerProcessMsg.interrupt();
-
Util.printMessageData(consumerEntityMsg);
Util.printMessageData(consumerProcessMsg);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d9c115e0/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
index 89f3686..c0e1617 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -37,6 +37,7 @@ import org.apache.falcon.regression.core.util.XmlUtil;
import org.apache.falcon.regression.testHelper.BaseTestClass;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
@@ -59,7 +60,7 @@ public class PrismFeedUpdateTest extends BaseTestClass {
ColoHelper cluster1 = servers.get(0);
ColoHelper cluster2 = servers.get(1);
FileSystem server1FS = serverFS.get(0);
- OozieClient OC1 = serverOC.get(0);
+ OozieClient cluster1OC = serverOC.get(0);
String baseTestDir = baseHDFSDir + "/PrismFeedUpdateTest";
String aggregateWorkflowDir = baseTestDir + "/aggregator";
public final String cluster1colo = cluster1.getClusterHelper().getColoName();
@@ -97,23 +98,26 @@ public class PrismFeedUpdateTest extends BaseTestClass {
public void updateFeedQueue_dependentMultipleProcess_oneProcessZeroInput() throws Exception {
//cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo,
// feed02 on cluster2colo target cluster1colo
+ String cluster1Def = bundles[0].getClusters().get(0);
+ String cluster2Def = bundles[1].getClusters().get(0);
- //get 3 unique bundles
//set cluster colos
bundles[0].setCLusterColo(cluster1colo);
- logger.info("cluster bundles[0]: " + Util.prettyPrintXml(bundles[0].getClusters().get(0)));
-
+ logger.info("cluster bundles[0]: " + Util.prettyPrintXml(cluster1Def));
bundles[1].setCLusterColo(cluster2colo);
- logger.info("cluster bundles[1]: " + Util.prettyPrintXml(bundles[1].getClusters().get(0)));
+ logger.info("cluster bundles[1]: " + Util.prettyPrintXml(cluster2Def));
- //submit 3 clusters
+ //submit 2 clusters
+ AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
+ cluster1Def));
+ AssertUtil.assertSucceeded(prism.getClusterHelper().submitEntity(Util.URLS.SUBMIT_URL,
+ cluster2Def));
//get 2 unique feeds
String feed01 = bundles[0].getInputFeedFromBundle();
String outputFeed = bundles[0].getOutputFeedFromBundle();
- //set source and target for the 2 feeds
-
+ /* set source and target for the 2 feeds */
//set clusters to null;
feed01 = InstanceUtil
.setFeedCluster(feed01,
@@ -126,68 +130,63 @@ public class PrismFeedUpdateTest extends BaseTestClass {
XmlUtil.createRtention("hours(10)", ActionType.DELETE), null,
ClusterType.SOURCE, null);
-
//set new feed input data
feed01 = Util.setFeedPathValue(feed01,
baseTestDir + "/feed01/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}/");
-
//generate data in both the colos cluster1colo and cluster2colo
String prefix = InstanceUtil.getFeedPrefix(feed01);
+ String startTime = TimeUtil.getTimeWrtSystemTime(-40);
+ System.out.println("Start time = " + startTime);
HadoopUtil.deleteDirIfExists(prefix.substring(1), server1FS);
- HadoopUtil.lateDataReplenish(server1FS, 70, 1, prefix, null);
-
- String startTime = TimeUtil.getTimeWrtSystemTime(-50);
+ HadoopUtil.lateDataReplenish(server1FS, 80, 20, prefix, null);
//set clusters for feed01
feed01 = InstanceUtil
.setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
XmlUtil.createRtention("hours(10)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE,
- null);
+ Util.readEntityName(cluster1Def), ClusterType.SOURCE, null);
feed01 = InstanceUtil
.setFeedCluster(feed01, XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
XmlUtil.createRtention("hours(10)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET,
- null);
+ Util.readEntityName(cluster2Def), ClusterType.TARGET, null);
//set clusters for output feed
outputFeed = InstanceUtil.setFeedCluster(outputFeed,
XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
XmlUtil.createRtention("hours(10)", ActionType.DELETE),
- Util.readEntityName(bundles[0].getClusters().get(0)), ClusterType.SOURCE, null);
+ Util.readEntityName(cluster1Def), ClusterType.SOURCE, null);
outputFeed = InstanceUtil.setFeedCluster(outputFeed,
XmlUtil.createValidity(startTime, "2099-01-01T00:00Z"),
XmlUtil.createRtention("hours(10)", ActionType.DELETE),
- Util.readEntityName(bundles[1].getClusters().get(0)), ClusterType.TARGET, null);
-
+ Util.readEntityName(cluster2Def), ClusterType.TARGET, null);
//submit and schedule feeds
logger.info("feed01: " + Util.prettyPrintXml(feed01));
logger.info("outputFeed: " + Util.prettyPrintXml(outputFeed));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(Util.URLS
+ .SUBMIT_AND_SCHEDULE_URL, feed01));
+ AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(Util.URLS
+ .SUBMIT_AND_SCHEDULE_URL, outputFeed));
- //create 2 process with 2 clusters
-
+ /* create 2 process with 2 clusters */
//get first process
String process01 = bundles[0].getProcessData();
//add clusters to process
-
String processStartTime = TimeUtil.getTimeWrtSystemTime(-11);
String processEndTime = TimeUtil.getTimeWrtSystemTime(70);
-
-
process01 = InstanceUtil
.setProcessCluster(process01, null,
XmlUtil.createProcessValidity(startTime, "2099-01-01T00:00Z"));
process01 = InstanceUtil
- .setProcessCluster(process01, Util.readEntityName(bundles[0].getClusters().get(0)),
+ .setProcessCluster(process01, Util.readEntityName(cluster1Def),
XmlUtil.createProcessValidity(processStartTime, processEndTime));
process01 = InstanceUtil
- .setProcessCluster(process01, Util.readEntityName(bundles[1].getClusters().get(0)),
+ .setProcessCluster(process01, Util.readEntityName(cluster2Def),
XmlUtil.createProcessValidity(processStartTime, processEndTime));
- //get 2nd process :
+ //get 2nd process
String process02 = process01;
process02 = InstanceUtil
.setProcessName(process02, "zeroInputProcess" + new Random().nextInt());
@@ -197,24 +196,27 @@ public class PrismFeedUpdateTest extends BaseTestClass {
processMerlin.setProcessFeeds(feed, 0, 0, 1);
process02 = processMerlin.toString();
-
//submit and schedule both process
logger.info("process: " + Util.prettyPrintXml(process01));
logger.info("process: " + Util.prettyPrintXml(process02));
-
-
+ AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(Util.URLS
+ .SUBMIT_AND_SCHEDULE_URL, process01));
+ AssertUtil.assertSucceeded(prism.getProcessHelper().submitAndSchedule(Util.URLS
+ .SUBMIT_AND_SCHEDULE_URL, process02));
logger.info("Wait till process goes into running ");
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process01), 1,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1);
+ InstanceUtil.waitTillInstanceReachState(cluster1OC, Util.readEntityName(process02), 1,
+ CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 1);
//change feed location path
outputFeed = Util.setFeedProperty(outputFeed, "queueName", "myQueue");
-
logger.info("updated feed: " + Util.prettyPrintXml(outputFeed));
//update feed first time
- prism.getFeedHelper().update(outputFeed, outputFeed);
+ AssertUtil.assertSucceeded(prism.getFeedHelper().update(outputFeed, outputFeed));
}
-
/**
* schedules a feed and dependent process. Process start and end are in past
* Test for bug https://issues.apache.org/jira/browse/FALCON-500
@@ -222,24 +224,25 @@ public class PrismFeedUpdateTest extends BaseTestClass {
@Test
public void dependentProcessSucceeded()
throws Exception {
- bundles[0].setProcessValidity("2014-06-01T04:00Z","2014-06-01T04:02Z");
+ bundles[0].setProcessValidity("2014-06-01T04:00Z", "2014-06-01T04:02Z");
bundles[0].submitAndScheduleAllFeeds();
bundles[0].submitAndScheduleProcess();
InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+ bundles[0].getProcessName(),
0, 0);
InstanceUtil.waitForBundleToReachState(cluster1, bundles[0].getProcessName(),
Job.Status.SUCCEEDED, 20);
FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
- feed.addProperty("someProp","someVal");
+ feed.addProperty("someProp", "someVal");
AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
//check for new feed bundle creation
Assert.assertEquals(OozieUtil.getNumberOfBundle(prism, EntityType.FEED,
- feed.getName()),2);
+ feed.getName()), 2);
Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
- bundles[0].getProcessName()),1);
+ bundles[0].getProcessName()), 1);
}
/**
@@ -256,7 +259,8 @@ public class PrismFeedUpdateTest extends BaseTestClass {
bundles[0].submitAndScheduleProcess();
InstanceUtil.waitTillInstancesAreCreated(cluster1, bundles[0].getProcessData(), 0);
- OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS, bundles[0].getProcessName(),
+ OozieUtil.createMissingDependencies(cluster1, EntityType.PROCESS,
+ bundles[0].getProcessName(),
0, 0);
FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0));
@@ -264,9 +268,9 @@ public class PrismFeedUpdateTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getFeedHelper().update(feed.toString(), feed.toString()));
//check for new feed bundle creation
Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.FEED,
- feed.getName()),2);
+ feed.getName()), 2);
Assert.assertEquals(OozieUtil.getNumberOfBundle(cluster1, EntityType.PROCESS,
- bundles[0].getProcessName()),2);
+ bundles[0].getProcessName()), 2);
}
@AfterClass(alwaysRun = true)