You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/10/29 18:21:12 UTC
git commit: FALCON-841 Test falcon process with different
frequencies. Contributed by Raghav Kumar Gautam
Repository: incubator-falcon
Updated Branches:
refs/heads/master c5c77229f -> aa3e455fb
FALCON-841 Test falcon process with different frequencies. Contributed by Raghav Kumar Gautam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/aa3e455f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/aa3e455f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/aa3e455f
Branch: refs/heads/master
Commit: aa3e455fb7ec8b24a9b5c4f2d43d0704cc86aca9
Parents: c5c7722
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Wed Oct 29 10:21:03 2014 -0700
Committer: Raghav Kumar Gautam <ra...@apache.org>
Committed: Wed Oct 29 10:21:03 2014 -0700
----------------------------------------------------------------------
.../falcon/regression/core/util/TimeUtil.java | 18 +--
.../falcon/regression/ProcessFrequencyTest.java | 162 +++++++++++++++++++
.../regression/hcat/HCatFeedOperationsTest.java | 6 +-
.../regression/hcat/HCatRetentionTest.java | 46 +++---
.../falcon/regression/prism/RetentionTest.java | 38 ++---
5 files changed, 216 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
index af74d30..7df4595 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
@@ -18,7 +18,7 @@
package org.apache.falcon.regression.core.util;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -130,36 +130,36 @@ public final class TimeUtil {
/**
* Get all possible dates between start and end date gap between subsequent dates be one unit.
- * of feedType
+ * of freqType
*
* @param startDate start date
* @param endDate end date
- * @param feedType type of the feed
+ * @param freqType type of the feed
* @return list of dates
*/
public static List<DateTime> getDatesOnEitherSide(DateTime startDate, DateTime endDate,
- FeedType feedType) {
- return getDatesOnEitherSide(startDate, endDate, 1, feedType);
+ FreqType freqType) {
+ return getDatesOnEitherSide(startDate, endDate, 1, freqType);
}
/**
* Get all possible dates between start and end date gap between subsequent dates be one unit.
- * of feedType
+ * of freqType
*
* @param startDate start date
* @param endDate end date
* @param skip amount of skipping
- * @param feedType type of the feed
+ * @param freqType type of the feed
* @return list of dates
*/
public static List<DateTime> getDatesOnEitherSide(DateTime startDate, DateTime endDate,
- int skip, FeedType feedType) {
+ int skip, FreqType freqType) {
final List<DateTime> dates = new ArrayList<DateTime>();
if (!startDate.isAfter(endDate)) {
dates.add(startDate);
}
for (int counter = 0; !startDate.isAfter(endDate) && counter < 1000; ++counter) {
- startDate = feedType.addTime(startDate, skip);
+ startDate = freqType.addTime(startDate, skip);
dates.add(startDate);
}
return dates;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
new file mode 100644
index 0000000..229cdcb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+public class ProcessFrequencyTest extends BaseTestClass {
+ private static final Logger LOGGER = Logger.getLogger(ProcessFrequencyTest.class);
+ private ColoHelper cluster = servers.get(0);
+ private FileSystem clusterFS = serverFS.get(0);
+ private OozieClient clusterOC = serverOC.get(0);
+ private String baseTestHDFSDir = baseHDFSDir + "/ProcessFrequencyTest";
+ private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+
+ @BeforeClass(alwaysRun = true)
+ public void createTestData() throws Exception {
+ LOGGER.info("in @BeforeClass");
+ HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+ Bundle bundle = BundleUtil.readELBundle();
+ bundle.generateUniqueBundle();
+ bundles[0] = new Bundle(bundle, cluster);
+ }
+
+ @BeforeMethod(alwaysRun = true)
+ public void setup(Method method) throws Exception {
+ LOGGER.info("test name: " + method.getName());
+ bundles[0] = BundleUtil.readELBundle();
+ bundles[0] = new Bundle(bundles[0], cluster);
+ bundles[0].generateUniqueBundle();
+ bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() {
+ removeBundles();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void tearDownClass() throws IOException {
+ cleanTestDirs();
+ }
+
+ /**
+ * Test Process submission with different frequency. Expecting process workflow to run
+ * successfully.
+ * @throws Exception
+ */
+ @Test(dataProvider = "generateProcessFrequencies")
+ public void testProcessWithFrequency(final FreqType freqType, final int freqAmount)
+ throws Exception {
+ final String startDate = "2010-01-02T01:00Z";
+ final String endDate = "2010-01-02T01:01Z";
+ final String inputPath = baseTestHDFSDir + "/input/";
+ bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
+ bundles[0].setOutputFeedLocationData(
+ baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
+ bundles[0].setProcessPeriodicity(freqAmount, freqType.getFalconTimeUnit());
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].setProcessValidity(startDate, endDate);
+ bundles[0].submitFeedsScheduleProcess(prism);
+
+ //upload data
+ HadoopUtil.deleteDirIfExists(inputPath, clusterFS);
+ final String startPath = inputPath + freqType.getFormatter().print(
+ TimeUtil.oozieDateToDate(startDate));
+ HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
+
+ final String processName = Util.readEntityName(bundles[0].getProcessData());
+ //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+ InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+ CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
+ InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
+ InstanceUtil.validateSuccessWOInstances(r);
+ }
+
+ @DataProvider(name = "generateProcessFrequencies")
+ public Object[][] generateProcessFrequencies() {
+ return new Object[][] {
+ {FreqType.MINUTELY, 2},
+ {FreqType.HOURLY, 3},
+ {FreqType.DAILY, 5},
+ {FreqType.MONTHLY, 7},
+ };
+ }
+
+ /**
+ * Test Process submission with bad frequency. Expecting submissions to fails.
+ * @throws Exception
+ */
+ @Test
+ public void testProcessWithBadFrequency()
+ throws Exception {
+ final String startDate = "2010-01-02T01:00Z";
+ final String endDate = "2010-01-02T01:01Z";
+ final String inputPath = baseTestHDFSDir + "/input/";
+ final FreqType freqType = FreqType.MINUTELY;
+ bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
+ bundles[0].setOutputFeedLocationData(
+ baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
+ bundles[0].submitClusters(prism);
+ bundles[0].submitFeeds(prism);
+
+ bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+ bundles[0].setProcessValidity(startDate, endDate);
+ final ProcessMerlin processMerlin = bundles[0].getProcessObject();
+ //a frequency can be bad in two ways - it can have bad amount or it can have bad unit
+ //submit process with bad amount
+ processMerlin.setFrequency(new Frequency("BadAmount", freqType.getFalconTimeUnit()));
+ AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(processMerlin.toString()));
+
+ //submit process with bad unit
+ processMerlin.setFrequency(new Frequency("2993", freqType.getFalconTimeUnit()));
+ final String process = processMerlin.toString();
+ final String newProcess = process.replaceAll("minutes\\(2993\\)", "BadUnit(2993)");
+ AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(newProcess));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 2ce36a4..f01f30e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -25,7 +25,7 @@ import org.apache.falcon.entity.v0.feed.ActionType;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
import org.apache.falcon.regression.core.util.AssertUtil;
@@ -129,7 +129,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
Bundle.submitCluster(bundles[1]);
feed = bundles[1].getInputFeedFromBundle();
FeedMerlin feedObj = new FeedMerlin(feed);
- feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue());
+ feedObj.setTableValue(dbName, randomTblName, FreqType.YEARLY.getHcatPathValue());
ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
AssertUtil.assertFailed(response);
}
@@ -145,7 +145,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
Bundle.submitCluster(bundles[0]);
feed = bundles[0].getInputFeedFromBundle();
FeedMerlin feedObj = new FeedMerlin(feed);
- feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue());
+ feedObj.setTableValue(dbName, tableName, FreqType.YEARLY.getHcatPathValue());
ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
AssertUtil.assertSucceeded(response);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
index aae3125..db40931 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
@@ -21,7 +21,7 @@ package org.apache.falcon.regression.hcat;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.util.AssertUtil;
@@ -91,15 +91,15 @@ public class HCatRetentionTest extends BaseTestClass {
@Test(enabled = true, dataProvider = "loopBelow", timeOut = 900000, groups = "embedded")
public void testHCatRetention(int retentionPeriod, RetentionUnit retentionUnit,
- FeedType feedType) throws Exception {
+ FreqType freqType) throws Exception {
/*the hcatalog table that is created changes tablename characters to lowercase. So the
name in the feed should be the same.*/
tableName = String.format("testhcatretention_%s_%d", retentionUnit.getValue(),
retentionPeriod);
- createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, feedType);
+ createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, freqType);
FeedMerlin feedElement = new FeedMerlin(bundle.getInputFeedFromBundle());
- feedElement.setTableValue(dBName, tableName, feedType.getHcatPathValue());
+ feedElement.setTableValue(dBName, tableName, freqType.getHcatPathValue());
feedElement
.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
if (retentionPeriod <= 0) {
@@ -113,15 +113,15 @@ public class HCatRetentionTest extends BaseTestClass {
feedElement.getClusters().getClusters().get(0).getValidity().getEnd(),
DateTimeZone.UTC).withSecondOfMinute(0);
final List<DateTime> dataDates =
- TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, feedType);
+ TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, freqType);
final List<String> dataDateStrings = TimeUtil.convertDatesToString(dataDates,
- feedType.getFormatter());
+ freqType.getFormatter());
AssertUtil.checkForListSizes(dataDates, dataDateStrings);
final List<String> dataFolders = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
OSUtil.OOZIE_EXAMPLE_INPUT_LATE_INPUT, baseTestHDFSDir, dataDateStrings);
- addPartitionsToExternalTable(cli, dBName, tableName, feedType, dataDates, dataFolders);
+ addPartitionsToExternalTable(cli, dBName, tableName, freqType, dataDates, dataFolders);
List<String> initialData =
- getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, feedType);
+ getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, freqType);
List<HCatPartition> initialPtnList = cli.getPartitions(dBName, tableName);
AssertUtil.checkForListSizes(initialData, initialPtnList);
@@ -133,9 +133,9 @@ public class HCatRetentionTest extends BaseTestClass {
AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feedElement.toString()));
List<String> expectedOutput = getExpectedOutput(retentionPeriod, retentionUnit,
- feedType, new DateTime(DateTimeZone.UTC), initialData);
+ freqType, new DateTime(DateTimeZone.UTC), initialData);
List<String> finalData = getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir,
- feedType);
+ freqType);
List<HCatPartition> finalPtnList = cli.getPartitions(dBName, tableName);
logger.info("checking expectedOutput and finalPtnList");
@@ -151,10 +151,10 @@ public class HCatRetentionTest extends BaseTestClass {
}
private static List<String> getHadoopDataFromDir(FileSystem fs, String hadoopPath,
- String dir, FeedType feedType)
+ String dir, FreqType freqType)
throws IOException {
List<String> finalResult = new ArrayList<String>();
- final int dirDepth = feedType.getDirDepth();
+ final int dirDepth = freqType.getDirDepth();
List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
new Path(hadoopPath), dirDepth);
@@ -174,21 +174,21 @@ public class HCatRetentionTest extends BaseTestClass {
*
* @param retentionPeriod retention period
* @param retentionUnit retention unit
- * @param feedType feed type
+ * @param freqType feed type
* @param endDateUTC end date of retention
* @param inputData input data on which retention was applied
* @return expected output of the retention
*/
private static List<String> getExpectedOutput(int retentionPeriod,
RetentionUnit retentionUnit,
- FeedType feedType,
+ FreqType freqType,
DateTime endDateUTC,
List<String> inputData) {
List<String> finalData = new ArrayList<String>();
//convert the end date to the same format
final String endLimit =
- feedType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
+ freqType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
//now to actually check!
for (String testDate : inputData) {
if (testDate.compareTo(endLimit) >= 0) {
@@ -199,7 +199,7 @@ public class HCatRetentionTest extends BaseTestClass {
}
private static void createPartitionedTable(HCatClient client, String dbName, String tableName,
- String tableLoc, FeedType dataType)
+ String tableLoc, FreqType dataType)
throws HCatException {
ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
@@ -240,7 +240,7 @@ public class HCatRetentionTest extends BaseTestClass {
}
private static void addPartitionsToExternalTable(HCatClient client, String dbName,
- String tableName, FeedType feedType,
+ String tableName, FreqType freqType,
List<DateTime> dataDates,
List<String> dataFolders)
throws HCatException {
@@ -249,7 +249,7 @@ public class HCatRetentionTest extends BaseTestClass {
for (int i = 0; i < dataDates.size(); ++i) {
final String dataFolder = dataFolders.get(i);
final DateTime dataDate = dataDates.get(i);
- switch (feedType) {
+ switch (freqType) {
case MINUTELY:
ptn.put("minute", "" + dataDate.getMinuteOfHour());
case HOURLY:
@@ -262,7 +262,7 @@ public class HCatRetentionTest extends BaseTestClass {
ptn.put("year", "" + dataDate.getYear());
break;
default:
- Assert.fail("Unexpected feedType = " + feedType);
+ Assert.fail("Unexpected freqType = " + freqType);
}
//Each HCat partition maps to a directory, not to a file
HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName,
@@ -278,11 +278,11 @@ public class HCatRetentionTest extends BaseTestClass {
RetentionUnit.MONTHS};// "minutes","years",
Integer[] periods = new Integer[]{7, 824, 43}; // a negative value like -4 should be covered
// in validation scenarios.
- FeedType[] dataTypes =
- new FeedType[]{
+ FreqType[] dataTypes =
+ new FreqType[]{
//disabling since falcon has support is for only for single hcat partition
- //FeedType.DAILY, FeedType.MINUTELY, FeedType.HOURLY, FeedType.MONTHLY,
- FeedType.YEARLY};
+ //FreqType.DAILY, FreqType.MINUTELY, FreqType.HOURLY, FreqType.MONTHLY,
+ FreqType.YEARLY};
return MathUtil.crossProduct(periods, retentionUnits, dataTypes);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index c271ecf..a17e1ba 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -22,7 +22,7 @@ package org.apache.falcon.regression.prism;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
import org.apache.falcon.regression.core.helpers.ColoHelper;
import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -91,7 +91,7 @@ public class RetentionTest extends BaseTestClass {
*/
@Test
public void testRetentionWithEmptyDirectories() throws Exception {
- testRetention(24, RetentionUnit.HOURS, true, FeedType.DAILY, false);
+ testRetention(24, RetentionUnit.HOURS, true, FreqType.DAILY, false);
}
/**
@@ -101,14 +101,14 @@ public class RetentionTest extends BaseTestClass {
* @param retentionPeriod period for which data should be retained
* @param retentionUnit type of retention limit attribute
* @param gaps defines gaps within list of data folders
- * @param feedType feed type
+ * @param freqType feed type
* @param withData should folders be filled with data or not
* @throws Exception
*/
@Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
- final boolean gaps, final FeedType feedType, final boolean withData) throws Exception {
- bundles[0].setInputFeedDataPath(testHDFSDir + feedType.getPathValue());
+ final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
+ bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
@@ -116,9 +116,9 @@ public class RetentionTest extends BaseTestClass {
if (retentionPeriod > 0) {
AssertUtil.assertSucceeded(response);
- replenishData(feedType, gaps, withData);
+ replenishData(freqType, gaps, withData);
- commonDataRetentionWorkflow(feedObject.toString(), feedType, retentionUnit,
+ commonDataRetentionWorkflow(feedObject.toString(), freqType, retentionUnit,
retentionPeriod);
} else {
AssertUtil.assertFailed(response);
@@ -128,20 +128,20 @@ public class RetentionTest extends BaseTestClass {
/**
* Generates folders based on proposed periodicity and then fills them with data if required.
*
- * @param feedType feed retention limit type
+ * @param freqType feed retention limit type
* @param gap defines what amount of units should be skipped
* @param withData should folders be filled with data or not
* @throws Exception
*/
- private void replenishData(FeedType feedType, boolean gap, boolean withData) throws Exception {
+ private void replenishData(FreqType freqType, boolean gap, boolean withData) throws Exception {
int skip = 1;
if (gap) {
skip = gaps[new Random().nextInt(gaps.length)];
}
final DateTime today = new DateTime(DateTimeZone.UTC);
final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
- feedType.addTime(today, -36), feedType.addTime(today, 36), skip, feedType);
- final List<String> dataDates = TimeUtil.convertDatesToString(times, feedType.getFormatter());
+ freqType.addTime(today, -36), freqType.addTime(today, 36), skip, freqType);
+ final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
logger.info("dataDates = " + dataDates);
dataDates.add(HadoopUtil.SOMETHING_RANDOM);
if (withData) {
@@ -157,7 +157,7 @@ public class RetentionTest extends BaseTestClass {
* and which was retained.
*
* @param feed analyzed retention feed
- * @param feedType feed type
+ * @param freqType feed type
* @param retentionUnit type of retention limit attribute
* @param retentionPeriod period for which data should be retained
* @throws OozieClientException
@@ -166,7 +166,7 @@ public class RetentionTest extends BaseTestClass {
* @throws AuthenticationException
* @throws JMSException
*/
- private void commonDataRetentionWorkflow(String feed, FeedType feedType,
+ private void commonDataRetentionWorkflow(String feed, FreqType freqType,
RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
IOException, URISyntaxException, AuthenticationException, JMSException {
//get Data created in the cluster
@@ -192,7 +192,7 @@ public class RetentionTest extends BaseTestClass {
//now see if retention value was matched to as expected
List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
- retentionPeriod, feedType);
+ retentionPeriod, freqType);
logger.info("initialData = " + initialData);
logger.info("finalData = " + finalData);
logger.info("expectedOutput = " + expectedOutput);
@@ -252,14 +252,14 @@ public class RetentionTest extends BaseTestClass {
* @param currentTime current date
* @param retentionUnit type of retention limit attribute
* @param retentionPeriod period for which data should be retained
- * @param feedType feed type
+ * @param freqType feed type
* @return list of data folders which are expected to be present on cluster
*/
private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
- RetentionUnit retentionUnit, int retentionPeriod, FeedType feedType) {
+ RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
final List<String> finalData = new ArrayList<String>();
//end date is today's date
- final String startLimit = feedType.getFormatter().print(
+ final String startLimit = freqType.getFormatter().print(
retentionUnit.minusTime(currentTime, retentionPeriod));
//now to actually check!
@@ -284,10 +284,10 @@ public class RetentionTest extends BaseTestClass {
RetentionUnit[] retentionUnits = new RetentionUnit[]{RetentionUnit.HOURS,
RetentionUnit.DAYS};// "minutes","hours", "days",
Boolean[] gaps = new Boolean[]{false, true};
- FeedType[] feedTypes = new FeedType[]{FeedType.DAILY, FeedType.YEARLY, FeedType.MONTHLY};
+ FreqType[] freqTypes = new FreqType[]{FreqType.DAILY, FreqType.YEARLY, FreqType.MONTHLY};
final Boolean[] withData = new Boolean[]{true};
- return MathUtil.crossProduct(retentionPeriods, retentionUnits, gaps, feedTypes, withData);
+ return MathUtil.crossProduct(retentionPeriods, retentionUnits, gaps, freqTypes, withData);
}
@AfterClass(alwaysRun = true)