You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ra...@apache.org on 2014/10/29 18:21:12 UTC

git commit: FALCON-841 Test falcon process with different frequencies. Contributed by Raghav Kumar Gautam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master c5c77229f -> aa3e455fb


FALCON-841 Test falcon process with different frequencies. Contributed by Raghav Kumar Gautam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/aa3e455f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/aa3e455f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/aa3e455f

Branch: refs/heads/master
Commit: aa3e455fb7ec8b24a9b5c4f2d43d0704cc86aca9
Parents: c5c7722
Author: Raghav Kumar Gautam <ra...@apache.org>
Authored: Wed Oct 29 10:21:03 2014 -0700
Committer: Raghav Kumar Gautam <ra...@apache.org>
Committed: Wed Oct 29 10:21:03 2014 -0700

----------------------------------------------------------------------
 .../falcon/regression/core/util/TimeUtil.java   |  18 +--
 .../falcon/regression/ProcessFrequencyTest.java | 162 +++++++++++++++++++
 .../regression/hcat/HCatFeedOperationsTest.java |   6 +-
 .../regression/hcat/HCatRetentionTest.java      |  46 +++---
 .../falcon/regression/prism/RetentionTest.java  |  38 ++---
 5 files changed, 216 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
index af74d30..7df4595 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/TimeUtil.java
@@ -18,7 +18,7 @@
 
 package org.apache.falcon.regression.core.util;
 
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
@@ -130,36 +130,36 @@ public final class TimeUtil {
 
     /**
      * Get all possible dates between start and end date gap between subsequent dates be one unit.
-     * of feedType
+     * of freqType
      *
      * @param startDate start date
      * @param endDate   end date
-     * @param feedType  type of the feed
+     * @param freqType  type of the feed
      * @return list of dates
      */
     public static List<DateTime> getDatesOnEitherSide(DateTime startDate, DateTime endDate,
-                                                      FeedType feedType) {
-        return getDatesOnEitherSide(startDate, endDate, 1, feedType);
+                                                      FreqType freqType) {
+        return getDatesOnEitherSide(startDate, endDate, 1, freqType);
     }
 
     /**
      * Get all possible dates between start and end date gap between subsequent dates be one unit.
-     * of feedType
+     * of freqType
      *
      * @param startDate start date
      * @param endDate   end date
      * @param skip      amount of skipping
-     * @param feedType  type of the feed
+     * @param freqType  type of the feed
      * @return list of dates
      */
     public static List<DateTime> getDatesOnEitherSide(DateTime startDate, DateTime endDate,
-                                                      int skip, FeedType feedType) {
+                                                      int skip, FreqType freqType) {
         final List<DateTime> dates = new ArrayList<DateTime>();
         if (!startDate.isAfter(endDate)) {
             dates.add(startDate);
         }
         for (int counter = 0; !startDate.isAfter(endDate) && counter < 1000; ++counter) {
-            startDate = feedType.addTime(startDate, skip);
+            startDate = freqType.addTime(startDate, skip);
             dates.add(startDate);
         }
         return dates;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
new file mode 100644
index 0000000..229cdcb
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessFrequencyTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.falcon.regression;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ProcessMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+
+public class ProcessFrequencyTest extends BaseTestClass {
+    private static final Logger LOGGER = Logger.getLogger(ProcessFrequencyTest.class);
+    private ColoHelper cluster = servers.get(0);
+    private FileSystem clusterFS = serverFS.get(0);
+    private OozieClient clusterOC = serverOC.get(0);
+    private String baseTestHDFSDir = baseHDFSDir + "/ProcessFrequencyTest";
+    private String aggregateWorkflowDir = baseTestHDFSDir + "/aggregator";
+
+    @BeforeClass(alwaysRun = true)
+    public void createTestData() throws Exception {
+        LOGGER.info("in @BeforeClass");
+        HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE);
+        Bundle bundle = BundleUtil.readELBundle();
+        bundle.generateUniqueBundle();
+        bundles[0] = new Bundle(bundle, cluster);
+    }
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup(Method method) throws Exception {
+        LOGGER.info("test name: " + method.getName());
+        bundles[0] = BundleUtil.readELBundle();
+        bundles[0] = new Bundle(bundles[0], cluster);
+        bundles[0].generateUniqueBundle();
+        bundles[0].setProcessWorkflow(aggregateWorkflowDir);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() {
+        removeBundles();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void tearDownClass() throws IOException {
+        cleanTestDirs();
+    }
+
+    /**
+     * Test Process submission with different frequency. Expecting process workflow to run
+     * successfully.
+     * @throws Exception
+     */
+    @Test(dataProvider = "generateProcessFrequencies")
+    public void testProcessWithFrequency(final FreqType freqType, final int freqAmount)
+    throws Exception {
+        final String startDate = "2010-01-02T01:00Z";
+        final String endDate = "2010-01-02T01:01Z";
+        final String inputPath = baseTestHDFSDir + "/input/";
+        bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
+        bundles[0].setOutputFeedLocationData(
+                baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
+        bundles[0].setProcessPeriodicity(freqAmount, freqType.getFalconTimeUnit());
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].setProcessValidity(startDate, endDate);
+        bundles[0].submitFeedsScheduleProcess(prism);
+
+        //upload data
+        HadoopUtil.deleteDirIfExists(inputPath, clusterFS);
+        final String startPath = inputPath + freqType.getFormatter().print(
+                TimeUtil.oozieDateToDate(startDate));
+        HadoopUtil.copyDataToFolder(clusterFS, startPath, OSUtil.NORMAL_INPUT);
+
+        final String processName = Util.readEntityName(bundles[0].getProcessData());
+        //InstanceUtil.waitTillInstancesAreCreated(cluster, bundles[0].getProcessData(), 0);
+        InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 1,
+                CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS, 5);
+        InstancesResult r = prism.getProcessHelper().getRunningInstance(processName);
+        InstanceUtil.validateSuccessWOInstances(r);
+    }
+
+    @DataProvider(name = "generateProcessFrequencies")
+    public Object[][] generateProcessFrequencies() {
+        return new Object[][] {
+                {FreqType.MINUTELY, 2},
+                {FreqType.HOURLY, 3},
+                {FreqType.DAILY, 5},
+                {FreqType.MONTHLY, 7},
+        };
+    }
+
+    /**
+     * Test Process submission with bad frequency. Expecting submissions to fails.
+     * @throws Exception
+     */
+    @Test
+    public void testProcessWithBadFrequency()
+    throws Exception {
+        final String startDate = "2010-01-02T01:00Z";
+        final String endDate = "2010-01-02T01:01Z";
+        final String inputPath = baseTestHDFSDir + "/input/";
+        final FreqType freqType = FreqType.MINUTELY;
+        bundles[0].setInputFeedDataPath(inputPath + freqType.getPathValue());
+        bundles[0].setOutputFeedLocationData(
+                baseTestHDFSDir + "/output-data/" + freqType.getPathValue());
+        bundles[0].submitClusters(prism);
+        bundles[0].submitFeeds(prism);
+
+        bundles[0].setProcessInputStartEnd("now(0,0)", "now(0,0)");
+        bundles[0].setProcessValidity(startDate, endDate);
+        final ProcessMerlin processMerlin = bundles[0].getProcessObject();
+        //a frequency can be bad in two ways - it can have bad amount or it can have bad unit
+        //submit process with bad amount
+        processMerlin.setFrequency(new Frequency("BadAmount", freqType.getFalconTimeUnit()));
+        AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(processMerlin.toString()));
+
+        //submit process with bad unit
+        processMerlin.setFrequency(new Frequency("2993", freqType.getFalconTimeUnit()));
+        final String process = processMerlin.toString();
+        final String newProcess = process.replaceAll("minutes\\(2993\\)", "BadUnit(2993)");
+        AssertUtil.assertFailed(prism.getProcessHelper().submitEntity(newProcess));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index 2ce36a4..f01f30e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -25,7 +25,7 @@ import org.apache.falcon.entity.v0.feed.ActionType;
 import org.apache.falcon.entity.v0.feed.ClusterType;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
 import org.apache.falcon.regression.core.util.AssertUtil;
@@ -129,7 +129,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
         Bundle.submitCluster(bundles[1]);
         feed = bundles[1].getInputFeedFromBundle();
         FeedMerlin feedObj = new FeedMerlin(feed);
-        feedObj.setTableValue(dbName, randomTblName, FeedType.YEARLY.getHcatPathValue());
+        feedObj.setTableValue(dbName, randomTblName, FreqType.YEARLY.getHcatPathValue());
         ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
         AssertUtil.assertFailed(response);
     }
@@ -145,7 +145,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
         Bundle.submitCluster(bundles[0]);
         feed = bundles[0].getInputFeedFromBundle();
         FeedMerlin feedObj = new FeedMerlin(feed);
-        feedObj.setTableValue(dbName, tableName, FeedType.YEARLY.getHcatPathValue());
+        feedObj.setTableValue(dbName, tableName, FreqType.YEARLY.getHcatPathValue());
         ServiceResponse response = prism.getFeedHelper().submitEntity(feedObj.toString());
         AssertUtil.assertSucceeded(response);
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
index aae3125..db40931 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatRetentionTest.java
@@ -21,7 +21,7 @@ package org.apache.falcon.regression.hcat;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
 import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.util.AssertUtil;
@@ -91,15 +91,15 @@ public class HCatRetentionTest extends BaseTestClass {
 
     @Test(enabled = true, dataProvider = "loopBelow", timeOut = 900000, groups = "embedded")
     public void testHCatRetention(int retentionPeriod, RetentionUnit retentionUnit,
-                                  FeedType feedType) throws Exception {
+                                  FreqType freqType) throws Exception {
 
         /*the hcatalog table that is created changes tablename characters to lowercase. So the
           name in the feed should be the same.*/
         tableName = String.format("testhcatretention_%s_%d", retentionUnit.getValue(),
             retentionPeriod);
-        createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, feedType);
+        createPartitionedTable(cli, dBName, tableName, baseTestHDFSDir, freqType);
         FeedMerlin feedElement = new FeedMerlin(bundle.getInputFeedFromBundle());
-        feedElement.setTableValue(dBName, tableName, feedType.getHcatPathValue());
+        feedElement.setTableValue(dBName, tableName, freqType.getHcatPathValue());
         feedElement
             .setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
         if (retentionPeriod <= 0) {
@@ -113,15 +113,15 @@ public class HCatRetentionTest extends BaseTestClass {
                 feedElement.getClusters().getClusters().get(0).getValidity().getEnd(),
                 DateTimeZone.UTC).withSecondOfMinute(0);
             final List<DateTime> dataDates =
-                TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, feedType);
+                TimeUtil.getDatesOnEitherSide(dataStartTime, dataEndTime, freqType);
             final List<String> dataDateStrings = TimeUtil.convertDatesToString(dataDates,
-                    feedType.getFormatter());
+                    freqType.getFormatter());
             AssertUtil.checkForListSizes(dataDates, dataDateStrings);
             final List<String> dataFolders = HadoopUtil.flattenAndPutDataInFolder(clusterFS,
                 OSUtil.OOZIE_EXAMPLE_INPUT_LATE_INPUT, baseTestHDFSDir, dataDateStrings);
-            addPartitionsToExternalTable(cli, dBName, tableName, feedType, dataDates, dataFolders);
+            addPartitionsToExternalTable(cli, dBName, tableName, freqType, dataDates, dataFolders);
             List<String> initialData =
-                getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, feedType);
+                getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir, freqType);
             List<HCatPartition> initialPtnList = cli.getPartitions(dBName, tableName);
             AssertUtil.checkForListSizes(initialData, initialPtnList);
 
@@ -133,9 +133,9 @@ public class HCatRetentionTest extends BaseTestClass {
             AssertUtil.assertSucceeded(prism.getFeedHelper().suspend(feedElement.toString()));
 
             List<String> expectedOutput = getExpectedOutput(retentionPeriod, retentionUnit,
-                feedType, new DateTime(DateTimeZone.UTC), initialData);
+                    freqType, new DateTime(DateTimeZone.UTC), initialData);
             List<String> finalData = getHadoopDataFromDir(clusterFS, baseTestHDFSDir, testDir,
-                feedType);
+                    freqType);
             List<HCatPartition> finalPtnList = cli.getPartitions(dBName, tableName);
 
             logger.info("checking expectedOutput and finalPtnList");
@@ -151,10 +151,10 @@ public class HCatRetentionTest extends BaseTestClass {
     }
 
     private static List<String> getHadoopDataFromDir(FileSystem fs, String hadoopPath,
-                                                     String dir, FeedType feedType)
+                                                     String dir, FreqType freqType)
         throws IOException {
         List<String> finalResult = new ArrayList<String>();
-        final int dirDepth = feedType.getDirDepth();
+        final int dirDepth = freqType.getDirDepth();
 
         List<Path> results = HadoopUtil.getAllDirsRecursivelyHDFS(fs,
             new Path(hadoopPath), dirDepth);
@@ -174,21 +174,21 @@ public class HCatRetentionTest extends BaseTestClass {
      *
      * @param retentionPeriod retention period
      * @param retentionUnit   retention unit
-     * @param feedType        feed type
+     * @param freqType        feed type
      * @param endDateUTC      end date of retention
      * @param inputData       input data on which retention was applied
      * @return expected output of the retention
      */
     private static List<String> getExpectedOutput(int retentionPeriod,
                                                   RetentionUnit retentionUnit,
-                                                  FeedType feedType,
+                                                  FreqType freqType,
                                                   DateTime endDateUTC,
                                                   List<String> inputData) {
         List<String> finalData = new ArrayList<String>();
 
         //convert the end date to the same format
         final String endLimit =
-            feedType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
+            freqType.getFormatter().print(retentionUnit.minusTime(endDateUTC, retentionPeriod));
         //now to actually check!
         for (String testDate : inputData) {
             if (testDate.compareTo(endLimit) >= 0) {
@@ -199,7 +199,7 @@ public class HCatRetentionTest extends BaseTestClass {
     }
 
     private static void createPartitionedTable(HCatClient client, String dbName, String tableName,
-                                               String tableLoc, FeedType dataType)
+                                               String tableLoc, FreqType dataType)
         throws HCatException {
         ArrayList<HCatFieldSchema> cols = new ArrayList<HCatFieldSchema>();
         ArrayList<HCatFieldSchema> ptnCols = new ArrayList<HCatFieldSchema>();
@@ -240,7 +240,7 @@ public class HCatRetentionTest extends BaseTestClass {
     }
 
     private static void addPartitionsToExternalTable(HCatClient client, String dbName,
-                                                     String tableName, FeedType feedType,
+                                                     String tableName, FreqType freqType,
                                                      List<DateTime> dataDates,
                                                      List<String> dataFolders)
         throws HCatException {
@@ -249,7 +249,7 @@ public class HCatRetentionTest extends BaseTestClass {
         for (int i = 0; i < dataDates.size(); ++i) {
             final String dataFolder = dataFolders.get(i);
             final DateTime dataDate = dataDates.get(i);
-            switch (feedType) {
+            switch (freqType) {
                 case MINUTELY:
                     ptn.put("minute", "" + dataDate.getMinuteOfHour());
                 case HOURLY:
@@ -262,7 +262,7 @@ public class HCatRetentionTest extends BaseTestClass {
                     ptn.put("year", "" + dataDate.getYear());
                     break;
                 default:
-                    Assert.fail("Unexpected feedType = " + feedType);
+                    Assert.fail("Unexpected freqType = " + freqType);
             }
             //Each HCat partition maps to a directory, not to a file
             HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName,
@@ -278,11 +278,11 @@ public class HCatRetentionTest extends BaseTestClass {
             RetentionUnit.MONTHS};// "minutes","years",
         Integer[] periods = new Integer[]{7, 824, 43}; // a negative value like -4 should be covered
         // in validation scenarios.
-        FeedType[] dataTypes =
-            new FeedType[]{
+        FreqType[] dataTypes =
+            new FreqType[]{
                 //disabling since falcon has support is for only for single hcat partition
-                //FeedType.DAILY, FeedType.MINUTELY, FeedType.HOURLY, FeedType.MONTHLY,
-                FeedType.YEARLY};
+                //FreqType.DAILY, FreqType.MINUTELY, FreqType.HOURLY, FreqType.MONTHLY,
+                FreqType.YEARLY};
         return MathUtil.crossProduct(periods, retentionUnits, dataTypes);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/aa3e455f/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index c271ecf..a17e1ba 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -22,7 +22,7 @@ package org.apache.falcon.regression.prism;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
-import org.apache.falcon.regression.core.enumsAndConstants.FeedType;
+import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
 import org.apache.falcon.regression.core.enumsAndConstants.RetentionUnit;
 import org.apache.falcon.regression.core.helpers.ColoHelper;
 import org.apache.falcon.regression.core.response.ServiceResponse;
@@ -91,7 +91,7 @@ public class RetentionTest extends BaseTestClass {
      */
     @Test
     public void testRetentionWithEmptyDirectories() throws Exception {
-        testRetention(24, RetentionUnit.HOURS, true, FeedType.DAILY, false);
+        testRetention(24, RetentionUnit.HOURS, true, FreqType.DAILY, false);
     }
 
     /**
@@ -101,14 +101,14 @@ public class RetentionTest extends BaseTestClass {
      * @param retentionPeriod period for which data should be retained
      * @param retentionUnit type of retention limit attribute
      * @param gaps defines gaps within list of data folders
-     * @param feedType feed type
+     * @param freqType feed type
      * @param withData should folders be filled with data or not
      * @throws Exception
      */
     @Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
     public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
-        final boolean gaps, final FeedType feedType, final boolean withData) throws Exception {
-        bundles[0].setInputFeedDataPath(testHDFSDir + feedType.getPathValue());
+        final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
+        bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
         final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
         feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
 
@@ -116,9 +116,9 @@ public class RetentionTest extends BaseTestClass {
         if (retentionPeriod > 0) {
             AssertUtil.assertSucceeded(response);
 
-            replenishData(feedType, gaps, withData);
+            replenishData(freqType, gaps, withData);
 
-            commonDataRetentionWorkflow(feedObject.toString(), feedType, retentionUnit,
+            commonDataRetentionWorkflow(feedObject.toString(), freqType, retentionUnit,
                 retentionPeriod);
         } else {
             AssertUtil.assertFailed(response);
@@ -128,20 +128,20 @@ public class RetentionTest extends BaseTestClass {
     /**
      * Generates folders based on proposed periodicity and then fills them with data if required.
      *
-     * @param feedType feed retention limit type
+     * @param freqType feed retention limit type
      * @param gap defines what amount of units should be skipped
      * @param withData should folders be filled with data or not
      * @throws Exception
      */
-    private void replenishData(FeedType feedType, boolean gap, boolean withData) throws Exception {
+    private void replenishData(FreqType freqType, boolean gap, boolean withData) throws Exception {
         int skip = 1;
         if (gap) {
             skip = gaps[new Random().nextInt(gaps.length)];
         }
         final DateTime today = new DateTime(DateTimeZone.UTC);
         final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
-            feedType.addTime(today, -36), feedType.addTime(today, 36), skip, feedType);
-        final List<String> dataDates = TimeUtil.convertDatesToString(times, feedType.getFormatter());
+            freqType.addTime(today, -36), freqType.addTime(today, 36), skip, freqType);
+        final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
         logger.info("dataDates = " + dataDates);
         dataDates.add(HadoopUtil.SOMETHING_RANDOM);
         if (withData) {
@@ -157,7 +157,7 @@ public class RetentionTest extends BaseTestClass {
      * and which was retained.
      *
      * @param feed analyzed retention feed
-     * @param feedType feed type
+     * @param freqType feed type
      * @param retentionUnit type of retention limit attribute
      * @param retentionPeriod period for which data should be retained
      * @throws OozieClientException
@@ -166,7 +166,7 @@ public class RetentionTest extends BaseTestClass {
      * @throws AuthenticationException
      * @throws JMSException
      */
-    private void commonDataRetentionWorkflow(String feed, FeedType feedType,
+    private void commonDataRetentionWorkflow(String feed, FreqType freqType,
         RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
         IOException, URISyntaxException, AuthenticationException, JMSException {
         //get Data created in the cluster
@@ -192,7 +192,7 @@ public class RetentionTest extends BaseTestClass {
 
         //now see if retention value was matched to as expected
         List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
-            retentionPeriod, feedType);
+            retentionPeriod, freqType);
         logger.info("initialData = " + initialData);
         logger.info("finalData = " + finalData);
         logger.info("expectedOutput = " + expectedOutput);
@@ -252,14 +252,14 @@ public class RetentionTest extends BaseTestClass {
      * @param currentTime current date
      * @param retentionUnit type of retention limit attribute
      * @param retentionPeriod period for which data should be retained
-     * @param feedType feed type
+     * @param freqType feed type
      * @return list of data folders which are expected to be present on cluster
      */
     private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
-        RetentionUnit retentionUnit, int retentionPeriod, FeedType feedType) {
+        RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
         final List<String> finalData = new ArrayList<String>();
         //end date is today's date
-        final String startLimit = feedType.getFormatter().print(
+        final String startLimit = freqType.getFormatter().print(
                 retentionUnit.minusTime(currentTime, retentionPeriod));
 
         //now to actually check!
@@ -284,10 +284,10 @@ public class RetentionTest extends BaseTestClass {
         RetentionUnit[] retentionUnits = new RetentionUnit[]{RetentionUnit.HOURS,
             RetentionUnit.DAYS};// "minutes","hours", "days",
         Boolean[] gaps = new Boolean[]{false, true};
-        FeedType[] feedTypes = new FeedType[]{FeedType.DAILY, FeedType.YEARLY, FeedType.MONTHLY};
+        FreqType[] freqTypes = new FreqType[]{FreqType.DAILY, FreqType.YEARLY, FreqType.MONTHLY};
         final Boolean[] withData = new Boolean[]{true};
 
-        return MathUtil.crossProduct(retentionPeriods, retentionUnits, gaps, feedTypes, withData);
+        return MathUtil.crossProduct(retentionPeriods, retentionUnits, gaps, freqTypes, withData);
     }
 
     @AfterClass(alwaysRun = true)