You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pr...@apache.org on 2016/02/17 06:16:34 UTC

falcon git commit: FALCON-1567 Test case for Lifecycle feature

Repository: falcon
Updated Branches:
  refs/heads/master 10ee01a85 -> 237bab6ed


FALCON-1567 Test case for Lifecycle feature

Author: Pragya <mi...@gmail.com>

Reviewers: Paul Isaychuk <pi...@apache.org>

Closes #43 from pragya-mittal/FALCON-1567 and squashes the following commits:

9d46ea4 [Pragya] FALCON-1567 Test case for Lifecycle feature
cc84d5a [Pragya] Merge branch 'master' of https://github.com/apache/falcon
f037385 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
4c19ec0 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
3b7fd63 [Pragya] FALCON-1829 Add regression for submit and schedule process on native scheduler (time based)


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/237bab6e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/237bab6e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/237bab6e

Branch: refs/heads/master
Commit: 237bab6edcefa54b1c3818d8d4f81039b893d637
Parents: 10ee01a
Author: Pragya <mi...@gmail.com>
Authored: Wed Feb 17 10:45:39 2016 +0530
Committer: Pragya Mittal <mi...@gmail.com>
Committed: Wed Feb 17 10:45:39 2016 +0530

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../falcon/regression/core/util/AssertUtil.java |  12 +
 .../falcon/regression/core/util/OozieUtil.java  |  34 ++-
 .../falcon/regression/prism/RetentionTest.java  | 301 ++++++++++++++++++-
 4 files changed, 332 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 77772a0..566f7e1 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1567 Test case for Lifecycle feature  (Pragya Mittal)
+
    FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk)
 
    FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal)

http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
index 0af0c1e..cb79e9c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
@@ -299,6 +299,18 @@ public final class AssertUtil {
     }
 
     /**
+     * Checks that ServiceResponse status is status FAILED with expectedMessage.
+     *
+     * @param response ServiceResponse
+     * @param expectedMessage expected message
+     * @throws JAXBException
+     */
+    public static void assertFailedWithMessage(ServiceResponse response, String expectedMessage) throws JAXBException {
+        assertFailed(response);
+        Assert.assertTrue(response.getMessage().contains(expectedMessage), "Incorrect message in response");
+    }
+
+    /**
      * Checks that Instance/Triage result status is FAILED.
      *
      * @param response APIResult response

http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index e73bc5d..4c609b3 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -30,6 +30,7 @@ import org.apache.oozie.client.Job;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
 import org.joda.time.DateTime;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTimeZone;
@@ -754,8 +755,8 @@ public final class OozieUtil {
      * Returns configuration object of a given bundleID for a given instanceTime.
      *
      * @param oozieClient  oozie client of cluster job is running on
-     * @param bundleID     name of process which job is being analyzed
-     * @param time         job status we are waiting for
+     * @param bundleID     bundleID of given cluster
+     * @param time         instanceTime
      * @throws org.apache.oozie.client.OozieClientException
      * @throws org.json.JSONException
      */
@@ -822,4 +823,33 @@ public final class OozieUtil {
         }
         return counter == propMap.size();
     }
+
+    /**
+     * Returns configuration object of a given bundleID for a given retentionFeed.
+     *
+     * @param oozieClient  oozie client of cluster job is running on
+     * @param bundleID     bundleID of given cluster
+     * @throws OozieClientException
+     */
+    public static Configuration getRetentionConfiguration(OozieClient oozieClient, String bundleID)
+        throws OozieClientException {
+        waitForCoordinatorJobCreation(oozieClient, bundleID);
+        CoordinatorJob coord = null;
+        List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+        for (CoordinatorJob coordinatorJob : coordJobs) {
+            if (coordinatorJob.getAppName().startsWith("FALCON_FEED_RETENTION")) {
+                coord = oozieClient.getCoordJobInfo(coordinatorJob.getId());
+            }
+        }
+
+        Configuration configuration = new Configuration();
+        if (coord != null) {
+            WorkflowJob wid = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId());
+            configuration.addResource(new ByteArrayInputStream(wid.getConf().getBytes()));
+        } else {
+            configuration = null;
+        }
+
+        return configuration;
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 018b83a..b677433 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -20,6 +20,12 @@ package org.apache.falcon.regression.prism;
 
 
 import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.regression.Entities.FeedMerlin;
 import org.apache.falcon.regression.core.bundle.Bundle;
 import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
@@ -36,14 +42,17 @@ import org.apache.falcon.regression.core.util.OozieUtil;
 import org.apache.falcon.regression.core.util.TimeUtil;
 import org.apache.falcon.regression.core.util.Util;
 import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.OozieClientException;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
+import org.json.JSONException;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -110,7 +119,7 @@ public class RetentionTest extends BaseTestClass {
      */
     @Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
     public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
-        final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
+            final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
         bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
         final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
         feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
@@ -122,7 +131,7 @@ public class RetentionTest extends BaseTestClass {
             replenishData(freqType, gaps, withData);
 
             commonDataRetentionWorkflow(feedObject.toString(), freqType, retentionUnit,
-                retentionPeriod);
+                    retentionPeriod);
         } else {
             AssertUtil.assertFailed(response);
         }
@@ -143,7 +152,7 @@ public class RetentionTest extends BaseTestClass {
         }
         final DateTime today = new DateTime(DateTimeZone.UTC);
         final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
-            freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
+                freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
         final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
         LOGGER.info("dataDates = " + dataDates);
         dataDates.add(HadoopUtil.SOMETHING_RANDOM);
@@ -169,7 +178,7 @@ public class RetentionTest extends BaseTestClass {
      * @throws JMSException
      */
     private void commonDataRetentionWorkflow(String feed, FreqType freqType,
-        RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
+            RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
             IOException, URISyntaxException, AuthenticationException, JMSException,
             InterruptedException {
         //get Data created in the cluster
@@ -187,7 +196,7 @@ public class RetentionTest extends BaseTestClass {
         List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
 
         //get current time minus duration of last status check - to get actual time when eviction has started
-        final DateTime currentTime = new DateTime(new DateTime(DateTimeZone.UTC).toDate().getTime() - 10000);
+        final DateTime currentTime = new DateTime(DateTimeZone.UTC).minus(10000);
         LOGGER.info("Current time is " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentTime.toDate()));
         LOGGER.info("workflows: " + workflows);
         messageConsumer.interrupt();
@@ -198,7 +207,7 @@ public class RetentionTest extends BaseTestClass {
 
         //now see if retention value was matched to as expected
         List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
-            retentionPeriod, freqType);
+                retentionPeriod, freqType);
         LOGGER.info("initialData = " + initialData);
         LOGGER.info("finalData = " + finalData);
         LOGGER.info("expectedOutput = " + expectedOutput);
@@ -207,10 +216,10 @@ public class RetentionTest extends BaseTestClass {
         missingData.removeAll(expectedOutput);
         validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData);
         Assert.assertEquals(finalData.size(), expectedOutput.size(),
-            "Expected and actual sizes of retained data are different! Please check.");
+                "Expected and actual sizes of retained data are different! Please check.");
 
         Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
-            expectedOutput.toArray(new String[expectedOutput.size()])));
+                expectedOutput.toArray(new String[expectedOutput.size()])));
 
         //check that root directory exists
         Assert.assertTrue(clusterFS.exists(new Path(testHDFSDir)), "Base data directory should be present.");
@@ -227,7 +236,7 @@ public class RetentionTest extends BaseTestClass {
      * @throws JMSException
      */
     private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages,
-        List<String> missingData) throws OozieClientException, JMSException {
+            List<String> missingData) throws OozieClientException, JMSException {
         //just verify that each element in queue is same as deleted data!
         List<String> workflowIds = OozieUtil.getWorkflowJobs(clusterOC,
                 OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
@@ -248,10 +257,10 @@ public class RetentionTest extends BaseTestClass {
             }
         }
         Assert.assertEquals(deletedFolders.size(), missingData.size(),
-            "Output size is different than expected!");
+                "Output size is different than expected!");
         Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]),
-            deletedFolders.toArray(new String[deletedFolders.size()])),
-            "The missing data and message for delete operation don't correspond");
+                        deletedFolders.toArray(new String[deletedFolders.size()])),
+                "The missing data and message for delete operation don't correspond");
     }
 
     /**
@@ -265,7 +274,7 @@ public class RetentionTest extends BaseTestClass {
      * @return list of data folders which are expected to be present on cluster
      */
     private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
-        RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
+            RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
         final List<String> finalData = new ArrayList<>();
         //end date is today's date
         final String startLimit = freqType.getFormatter().print(
@@ -289,8 +298,8 @@ public class RetentionTest extends BaseTestClass {
         // a negative value like -4 should be covered in validation scenarios.
         Integer[] retentionPeriods = new Integer[]{0, 10080, 60, 8, 24};
         RetentionUnit[] retentionUnits = new RetentionUnit[]{
-            RetentionUnit.HOURS,
-            RetentionUnit.DAYS,
+                RetentionUnit.HOURS,
+                RetentionUnit.DAYS,
         }; // "minutes","hours", "days",
         Boolean[] gaps = new Boolean[]{false, true};
         FreqType[] freqTypes = new FreqType[]{FreqType.DAILY, FreqType.YEARLY, FreqType.MONTHLY};
@@ -298,4 +307,266 @@ public class RetentionTest extends BaseTestClass {
 
         return MatrixUtil.crossProduct(retentionPeriods, retentionUnits, gaps, freqTypes, withData);
     }
+
+    /**
+     * Submit a feed having minutely lifecycle frequency.
+     * It would fail since lifecycle retention frequency has to be >= 1 hour.
+     */
+    @Test
+    public void testTooFrequentRetentionLifecycleStage() throws Exception {
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.minutes));
+
+        FreqType freqType = FreqType.MINUTELY;
+        Frequency retentionPeriodGlobal=new Frequency("30", Frequency.TimeUnit.minutes);
+        Frequency retentionFrequencyGlobal=new Frequency("15", Frequency.TimeUnit.minutes);
+
+        bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+        final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        feedObject.setLateArrival(lateArrival);
+        feedObject.setValidity(startTime, endTime);
+        feedObject.setFrequency(new Frequency("minutes(10)"));
+        feedObject.setRetentionValue("minutes(10)");
+
+        feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+                "", "", true));
+
+        final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+        AssertUtil.assertFailedWithMessage(response, "Feed Retention can not be more frequent than hours(1)");
+
+    }
+
+
+    /**
+     * Submits and schedules a feed with lifecycle tag at cluster and global level.
+     * Responses are checked and retention is validated correspondingly.
+     * Uses lifecycleDPFail dataProvider to handle possible scenarios.
+     *
+     * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+     * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not)
+     */
+    @Test(dataProvider = "lifecycleDPFail")
+    public void clusterGlobalNoRetentionStageTest(boolean globalLevel, boolean clusterLevel) throws Exception {
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+        final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        feedObject.setLateArrival(lateArrival);
+        feedObject.setValidity(startTime, endTime);
+
+        if (globalLevel) {
+            feedObject.setLifecycle(new Lifecycle());
+        }
+        if (clusterLevel) {
+            feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle());
+        }
+
+        final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+        AssertUtil.assertFailedWithMessage(response, "Retention is a mandatory stage, didn't find it for cluster");
+
+    }
+
+    @DataProvider(name = "lifecycleDPFail")
+    public Object[][] getLifecycleFail() {
+        return new Object[][]{
+                {true, true}, // cluster/global : No retention stage. Should fail.
+                {true, false}, // global : no retention stage. Should fail.
+                {false, true}, // cluster : no retention stage.Should fail.
+        };
+    }
+
+    /**
+     * Submits and schedules a feed with lifecycle tag at cluster and global level.
+     * Responses are checked and retention is validated correspondingly.
+     * Uses getLifecycleWithGlobalStage dataProvider to handle possible scenarios.
+     *
+     * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+     * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not)
+     */
+    @Test(dataProvider = "getLifecycleWithGlobalStage")
+    public void retentionStageFromGlobalTest(boolean globalLevel, boolean clusterLevel) throws Exception {
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+        FreqType freqType = FreqType.HOURLY;
+        Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours);
+        Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours);
+
+        String priorityGlobal = "HIGH";
+        String queue = "default";
+
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+        bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+        final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        feedObject.setLateArrival(lateArrival);
+        feedObject.setValidity(startTime, endTime);
+
+        if (globalLevel) {
+            feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+                    priorityGlobal, queue, true));
+        }
+
+        if (clusterLevel) {
+            feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle());
+        }
+
+        replenishData(freqType, false, false);
+
+        final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+
+        AssertUtil.assertSucceeded(response);
+        commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS,
+                retentionPeriodGlobal.getFrequencyAsInt());
+        validateFrequency(feedObject.getName(), retentionFrequencyGlobal.getFrequencyAsInt()*60);
+        validatePriorityAndQueue(feedObject.getName(), priorityGlobal, queue);
+
+    }
+
+    @DataProvider(name = "getLifecycleWithGlobalStage")
+    public Object[][] getLifecycleWithGlobalStage() {
+        return new Object[][]{
+                {true, false}, // Global level lifecycle. Should pass.
+                {true, true}, // Cluster level : no retention stage - (pick from global). Should pass.
+
+        };
+    }
+
+    /**
+     * Submits and schedules a feed with lifecycle tag at cluster and global level.
+     * Responses are checked and retention is validated correspondingly.
+     * Uses getLifecycleWithClusterStage dataProvider to handle possible scenarios.
+     *
+     * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+     * @param globalWithStage : boolean (whether global lifecycle has retention stage defined or not)
+     */
+    @Test(dataProvider = "getLifecycleWithClusterStage")
+    public void retentionStageFromClusterTest(boolean globalLevel, boolean globalWithStage) throws Exception {
+
+        String startTime = TimeUtil.getTimeWrtSystemTime(0);
+        String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+        FreqType freqType = FreqType.HOURLY;
+        Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours);
+        Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours);
+
+        Frequency retentionPeriodCluster=new Frequency("4", Frequency.TimeUnit.hours);
+        Frequency retentionFrequencyCluster=new Frequency("3", Frequency.TimeUnit.hours);
+
+        String priorityGlobal = "HIGH";
+        String priorityCluster = "LOW";
+        String queue = "default";
+
+        LateArrival lateArrival = new LateArrival();
+        lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+        bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+        final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+        feedObject.setLateArrival(lateArrival);
+        feedObject.setValidity(startTime, endTime);
+
+        if (globalLevel) {
+            feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+                    priorityGlobal, queue, globalWithStage));
+        }
+
+        feedObject.getClusters().getClusters().get(0).setLifecycle(createLifecycle(retentionPeriodCluster,
+                retentionFrequencyCluster, priorityCluster, queue, true));
+
+        replenishData(freqType, false, false);
+
+        final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+
+        AssertUtil.assertSucceeded(response);
+        commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS,
+                retentionPeriodCluster.getFrequencyAsInt());
+        validateFrequency(feedObject.getName(), retentionFrequencyCluster.getFrequencyAsInt()*60);
+        validatePriorityAndQueue(feedObject.getName(), priorityCluster, queue);
+
+    }
+
+    @DataProvider(name = "getLifecycleWithClusterStage")
+    public Object[][] getLifecycleWithClusterStage() {
+        return new Object[][]{
+
+                {true, true}, // Cluster level lifecylce. Should pass.
+                {false, false}, // Cluster level with no global level lifecylce. Should pass.
+                {true, false}, // Cluster level with empty global level lifecycle.Should pass.
+
+        };
+    }
+
+    /**
+     * Method to create lifecycle tag to be used by feed for lifecycle retention.
+     * @param retentionPeriod : lifecycle retention period.
+     * @param retentionFrequency : lifecycle retention frequency.
+     * @param priority : lifecycle retention priority.
+     * @param queue : lifecycle retention queue.
+     */
+    private Lifecycle createLifecycle(Frequency retentionPeriod, Frequency retentionFrequency,
+            String priority, String queue, boolean stage) {
+        Lifecycle lifecycle = new Lifecycle();
+        if (stage) {
+            String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
+            Property property = new Property();
+            property.setName(LIMIT_PROPERTY_NAME);
+            property.setValue(retentionPeriod.getTimeUnit() + "(" + retentionPeriod.getFrequencyAsInt() + ")");
+
+            Properties properties = new Properties();
+            properties.getProperties().add(property);
+            RetentionStage retentionStage = new RetentionStage();
+            retentionStage.setFrequency(new Frequency(retentionFrequency.getTimeUnit() +
+                    "(" + retentionFrequency.getFrequencyAsInt() + ")"));
+
+            if (!priority.isEmpty()) {
+                retentionStage.setPriority(priority);
+            }
+            if (!queue.isEmpty()) {
+                retentionStage.setQueue(queue);
+            }
+            retentionStage.setProperties(properties);
+            lifecycle.setRetentionStage(retentionStage);
+        }
+        return lifecycle;
+    }
+
+    /**
+     * Validates feed retention frequency with expected frequency.
+     * @param feedName : feed name.
+     * @param frequency : expected frequency.
+     */
+    private void validateFrequency(String feedName, int frequency)
+            throws OozieClientException, JMSException, JSONException {
+        List<CoordinatorJob> coordJobs = OozieUtil.getBundleCoordinators(clusterOC,
+                OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
+        CoordinatorJob coordJobInfo = clusterOC.getCoordJobInfo(coordJobs.get(0).getId());
+        Assert.assertEquals(coordJobInfo.getFrequency(), String.valueOf(frequency),
+                "Invalid retention frequency : " + frequency);
+    }
+
+    /**
+     * Validates feed retention queue and priority with expected values.
+     * @param feedName : feed name.
+     * @param expectedPriority : expected priority.
+     * @param expectedQueue : expected queue.
+     */
+    private void validatePriorityAndQueue(String feedName, String expectedPriority, String expectedQueue)
+            throws OozieClientException, JMSException, JSONException {
+
+        Configuration configuration = OozieUtil.getRetentionConfiguration(clusterOC,
+                OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
+        String priority = configuration.get("jobPriority");
+        String queue = configuration.get("queueName");
+        Assert.assertEquals(priority, expectedPriority, "Priority should be : " + expectedPriority);
+        Assert.assertEquals(queue, expectedQueue, "Queue should be : " + expectedQueue);
+    }
 }