You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pr...@apache.org on 2016/02/17 06:16:34 UTC
falcon git commit: FALCON-1567 Test case for Lifecycle feature
Repository: falcon
Updated Branches:
refs/heads/master 10ee01a85 -> 237bab6ed
FALCON-1567 Test case for Lifecycle feature
Author: Pragya <mi...@gmail.com>
Reviewers: Paul Isaychuk <pi...@apache.org>
Closes #43 from pragya-mittal/FALCON-1567 and squashes the following commits:
9d46ea4 [Pragya] FALCON-1567 Test case for Lifecycle feature
cc84d5a [Pragya] Merge branch 'master' of https://github.com/apache/falcon
f037385 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
4c19ec0 [Pragya] Merge branch 'master' of https://github.com/apache/falcon
3b7fd63 [Pragya] FALCON-1829 Add regression for submit and schedule process on native scheduler (time based)
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/237bab6e
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/237bab6e
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/237bab6e
Branch: refs/heads/master
Commit: 237bab6edcefa54b1c3818d8d4f81039b893d637
Parents: 10ee01a
Author: Pragya <mi...@gmail.com>
Authored: Wed Feb 17 10:45:39 2016 +0530
Committer: Pragya Mittal <mi...@gmail.com>
Committed: Wed Feb 17 10:45:39 2016 +0530
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../falcon/regression/core/util/AssertUtil.java | 12 +
.../falcon/regression/core/util/OozieUtil.java | 34 ++-
.../falcon/regression/prism/RetentionTest.java | 301 ++++++++++++++++++-
4 files changed, 332 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index 77772a0..566f7e1 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1567 Test case for Lifecycle feature (Pragya Mittal)
+
FALCON-1784 Add regression test for for FALCON-1647 (Paul Isaychuk)
FALCON-1829 Add regression for submit and schedule process on native scheduler (time based) (Pragya Mittal)
http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
index 0af0c1e..cb79e9c 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
@@ -299,6 +299,18 @@ public final class AssertUtil {
}
/**
+ * Checks that ServiceResponse status is status FAILED with expectedMessage.
+ *
+ * @param response ServiceResponse
+ * @param expectedMessage expected message
+ * @throws JAXBException
+ */
+ public static void assertFailedWithMessage(ServiceResponse response, String expectedMessage) throws JAXBException {
+ assertFailed(response);
+ Assert.assertTrue(response.getMessage().contains(expectedMessage), "Incorrect message in response");
+ }
+
+ /**
* Checks that Instance/Triage result status is FAILED.
*
* @param response APIResult response
http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index e73bc5d..4c609b3 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -30,6 +30,7 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
import org.joda.time.DateTime;
import org.apache.log4j.Logger;
import org.joda.time.DateTimeZone;
@@ -754,8 +755,8 @@ public final class OozieUtil {
* Returns configuration object of a given bundleID for a given instanceTime.
*
* @param oozieClient oozie client of cluster job is running on
- * @param bundleID name of process which job is being analyzed
- * @param time job status we are waiting for
+ * @param bundleID bundleID of given cluster
+ * @param time instanceTime
* @throws org.apache.oozie.client.OozieClientException
* @throws org.json.JSONException
*/
@@ -822,4 +823,33 @@ public final class OozieUtil {
}
return counter == propMap.size();
}
+
+ /**
+ * Returns configuration object of a given bundleID for a given retentionFeed.
+ *
+ * @param oozieClient oozie client of cluster job is running on
+ * @param bundleID bundleID of given cluster
+ * @throws OozieClientException
+ */
+ public static Configuration getRetentionConfiguration(OozieClient oozieClient, String bundleID)
+ throws OozieClientException {
+ waitForCoordinatorJobCreation(oozieClient, bundleID);
+ CoordinatorJob coord = null;
+ List<CoordinatorJob> coordJobs = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
+ for (CoordinatorJob coordinatorJob : coordJobs) {
+ if (coordinatorJob.getAppName().startsWith("FALCON_FEED_RETENTION")) {
+ coord = oozieClient.getCoordJobInfo(coordinatorJob.getId());
+ }
+ }
+
+ Configuration configuration = new Configuration();
+ if (coord != null) {
+ WorkflowJob wid = oozieClient.getJobInfo(coord.getActions().get(0).getExternalId());
+ configuration.addResource(new ByteArrayInputStream(wid.getConf().getBytes()));
+ } else {
+ configuration = null;
+ }
+
+ return configuration;
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/237bab6e/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
index 018b83a..b677433 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/RetentionTest.java
@@ -20,6 +20,12 @@ package org.apache.falcon.regression.prism;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LateArrival;
+import org.apache.falcon.entity.v0.feed.Lifecycle;
+import org.apache.falcon.entity.v0.feed.Properties;
+import org.apache.falcon.entity.v0.feed.Property;
+import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.regression.Entities.FeedMerlin;
import org.apache.falcon.regression.core.bundle.Bundle;
import org.apache.falcon.regression.core.enumsAndConstants.FreqType;
@@ -36,14 +42,17 @@ import org.apache.falcon.regression.core.util.OozieUtil;
import org.apache.falcon.regression.core.util.TimeUtil;
import org.apache.falcon.regression.core.util.Util;
import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
+import org.json.JSONException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -110,7 +119,7 @@ public class RetentionTest extends BaseTestClass {
*/
@Test(groups = {"0.1", "0.2", "prism"}, dataProvider = "betterDP", priority = -1)
public void testRetention(final int retentionPeriod, final RetentionUnit retentionUnit,
- final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
+ final boolean gaps, final FreqType freqType, final boolean withData) throws Exception {
bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
feedObject.setRetentionValue(retentionUnit.getValue() + "(" + retentionPeriod + ")");
@@ -122,7 +131,7 @@ public class RetentionTest extends BaseTestClass {
replenishData(freqType, gaps, withData);
commonDataRetentionWorkflow(feedObject.toString(), freqType, retentionUnit,
- retentionPeriod);
+ retentionPeriod);
} else {
AssertUtil.assertFailed(response);
}
@@ -143,7 +152,7 @@ public class RetentionTest extends BaseTestClass {
}
final DateTime today = new DateTime(DateTimeZone.UTC);
final List<DateTime> times = TimeUtil.getDatesOnEitherSide(
- freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
+ freqType.addTime(today, -36), freqType.addTime(today, -1), skip, freqType);
final List<String> dataDates = TimeUtil.convertDatesToString(times, freqType.getFormatter());
LOGGER.info("dataDates = " + dataDates);
dataDates.add(HadoopUtil.SOMETHING_RANDOM);
@@ -169,7 +178,7 @@ public class RetentionTest extends BaseTestClass {
* @throws JMSException
*/
private void commonDataRetentionWorkflow(String feed, FreqType freqType,
- RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
+ RetentionUnit retentionUnit, int retentionPeriod) throws OozieClientException,
IOException, URISyntaxException, AuthenticationException, JMSException,
InterruptedException {
//get Data created in the cluster
@@ -187,7 +196,7 @@ public class RetentionTest extends BaseTestClass {
List<String> workflows = OozieUtil.waitForRetentionWorkflowToSucceed(bundleId, clusterOC);
//get current time minus duration of last status check - to get actual time when eviction has started
- final DateTime currentTime = new DateTime(new DateTime(DateTimeZone.UTC).toDate().getTime() - 10000);
+ final DateTime currentTime = new DateTime(DateTimeZone.UTC).minus(10000);
LOGGER.info("Current time is " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentTime.toDate()));
LOGGER.info("workflows: " + workflows);
messageConsumer.interrupt();
@@ -198,7 +207,7 @@ public class RetentionTest extends BaseTestClass {
//now see if retention value was matched to as expected
List<String> expectedOutput = filterDataOnRetention(initialData, currentTime, retentionUnit,
- retentionPeriod, freqType);
+ retentionPeriod, freqType);
LOGGER.info("initialData = " + initialData);
LOGGER.info("finalData = " + finalData);
LOGGER.info("expectedOutput = " + expectedOutput);
@@ -207,10 +216,10 @@ public class RetentionTest extends BaseTestClass {
missingData.removeAll(expectedOutput);
validateDataFromFeedQueue(feedName, messageConsumer.getReceivedMessages(), missingData);
Assert.assertEquals(finalData.size(), expectedOutput.size(),
- "Expected and actual sizes of retained data are different! Please check.");
+ "Expected and actual sizes of retained data are different! Please check.");
Assert.assertTrue(Arrays.deepEquals(finalData.toArray(new String[finalData.size()]),
- expectedOutput.toArray(new String[expectedOutput.size()])));
+ expectedOutput.toArray(new String[expectedOutput.size()])));
//check that root directory exists
Assert.assertTrue(clusterFS.exists(new Path(testHDFSDir)), "Base data directory should be present.");
@@ -227,7 +236,7 @@ public class RetentionTest extends BaseTestClass {
* @throws JMSException
*/
private void validateDataFromFeedQueue(String feedName, List<MapMessage> messages,
- List<String> missingData) throws OozieClientException, JMSException {
+ List<String> missingData) throws OozieClientException, JMSException {
//just verify that each element in queue is same as deleted data!
List<String> workflowIds = OozieUtil.getWorkflowJobs(clusterOC,
OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
@@ -248,10 +257,10 @@ public class RetentionTest extends BaseTestClass {
}
}
Assert.assertEquals(deletedFolders.size(), missingData.size(),
- "Output size is different than expected!");
+ "Output size is different than expected!");
Assert.assertTrue(Arrays.deepEquals(missingData.toArray(new String[missingData.size()]),
- deletedFolders.toArray(new String[deletedFolders.size()])),
- "The missing data and message for delete operation don't correspond");
+ deletedFolders.toArray(new String[deletedFolders.size()])),
+ "The missing data and message for delete operation don't correspond");
}
/**
@@ -265,7 +274,7 @@ public class RetentionTest extends BaseTestClass {
* @return list of data folders which are expected to be present on cluster
*/
private List<String> filterDataOnRetention(List<String> inputData, DateTime currentTime,
- RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
+ RetentionUnit retentionUnit, int retentionPeriod, FreqType freqType) {
final List<String> finalData = new ArrayList<>();
//end date is today's date
final String startLimit = freqType.getFormatter().print(
@@ -289,8 +298,8 @@ public class RetentionTest extends BaseTestClass {
// a negative value like -4 should be covered in validation scenarios.
Integer[] retentionPeriods = new Integer[]{0, 10080, 60, 8, 24};
RetentionUnit[] retentionUnits = new RetentionUnit[]{
- RetentionUnit.HOURS,
- RetentionUnit.DAYS,
+ RetentionUnit.HOURS,
+ RetentionUnit.DAYS,
}; // "minutes","hours", "days",
Boolean[] gaps = new Boolean[]{false, true};
FreqType[] freqTypes = new FreqType[]{FreqType.DAILY, FreqType.YEARLY, FreqType.MONTHLY};
@@ -298,4 +307,266 @@ public class RetentionTest extends BaseTestClass {
return MatrixUtil.crossProduct(retentionPeriods, retentionUnits, gaps, freqTypes, withData);
}
+
+ /**
+ * Submit a feed having minutely lifecycle frequency.
+ * It would fail since lifecycle retention frequency has to be >= 1 hour.
+ */
+ @Test
+ public void testTooFrequentRetentionLifecycleStage() throws Exception {
+ String startTime = TimeUtil.getTimeWrtSystemTime(0);
+ String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+ LateArrival lateArrival = new LateArrival();
+ lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.minutes));
+
+ FreqType freqType = FreqType.MINUTELY;
+ Frequency retentionPeriodGlobal=new Frequency("30", Frequency.TimeUnit.minutes);
+ Frequency retentionFrequencyGlobal=new Frequency("15", Frequency.TimeUnit.minutes);
+
+ bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+ final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feedObject.setLateArrival(lateArrival);
+ feedObject.setValidity(startTime, endTime);
+ feedObject.setFrequency(new Frequency("minutes(10)"));
+ feedObject.setRetentionValue("minutes(10)");
+
+ feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+ "", "", true));
+
+ final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+ AssertUtil.assertFailedWithMessage(response, "Feed Retention can not be more frequent than hours(1)");
+
+ }
+
+
+ /**
+ * Submits and schedules a feed with lifecycle tag at cluster and global level.
+ * Responses are checked and retention is validated correspondingly.
+ * Uses lifecycleDPFail dataProvider to handle possible scenarios.
+ *
+ * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+ * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not)
+ */
+ @Test(dataProvider = "lifecycleDPFail")
+ public void clusterGlobalNoRetentionStageTest(boolean globalLevel, boolean clusterLevel) throws Exception {
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(0);
+ String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+ LateArrival lateArrival = new LateArrival();
+ lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+ final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feedObject.setLateArrival(lateArrival);
+ feedObject.setValidity(startTime, endTime);
+
+ if (globalLevel) {
+ feedObject.setLifecycle(new Lifecycle());
+ }
+ if (clusterLevel) {
+ feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle());
+ }
+
+ final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+ AssertUtil.assertFailedWithMessage(response, "Retention is a mandatory stage, didn't find it for cluster");
+
+ }
+
+ @DataProvider(name = "lifecycleDPFail")
+ public Object[][] getLifecycleFail() {
+ return new Object[][]{
+ {true, true}, // cluster/global : No retention stage. Should fail.
+ {true, false}, // global : no retention stage. Should fail.
+ {false, true}, // cluster : no retention stage.Should fail.
+ };
+ }
+
+ /**
+ * Submits and schedules a feed with lifecycle tag at cluster and global level.
+ * Responses are checked and retention is validated correspondingly.
+ * Uses getLifecycleWithGlobalStage dataProvider to handle possible scenarios.
+ *
+ * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+ * @param clusterLevel : boolean (whether lifecycle is enabled for cluster level or not)
+ */
+ @Test(dataProvider = "getLifecycleWithGlobalStage")
+ public void retentionStageFromGlobalTest(boolean globalLevel, boolean clusterLevel) throws Exception {
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(0);
+ String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+ FreqType freqType = FreqType.HOURLY;
+ Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours);
+ Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours);
+
+ String priorityGlobal = "HIGH";
+ String queue = "default";
+
+ LateArrival lateArrival = new LateArrival();
+ lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+ bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+ final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feedObject.setLateArrival(lateArrival);
+ feedObject.setValidity(startTime, endTime);
+
+ if (globalLevel) {
+ feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+ priorityGlobal, queue, true));
+ }
+
+ if (clusterLevel) {
+ feedObject.getClusters().getClusters().get(0).setLifecycle(new Lifecycle());
+ }
+
+ replenishData(freqType, false, false);
+
+ final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+
+ AssertUtil.assertSucceeded(response);
+ commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS,
+ retentionPeriodGlobal.getFrequencyAsInt());
+ validateFrequency(feedObject.getName(), retentionFrequencyGlobal.getFrequencyAsInt()*60);
+ validatePriorityAndQueue(feedObject.getName(), priorityGlobal, queue);
+
+ }
+
+ @DataProvider(name = "getLifecycleWithGlobalStage")
+ public Object[][] getLifecycleWithGlobalStage() {
+ return new Object[][]{
+ {true, false}, // Global level lifecycle. Should pass.
+ {true, true}, // Cluster level : no retention stage - (pick from global). Should pass.
+
+ };
+ }
+
+ /**
+ * Submits and schedules a feed with lifecycle tag at cluster and global level.
+ * Responses are checked and retention is validated correspondingly.
+ * Uses getLifecycleWithClusterStage dataProvider to handle possible scenarios.
+ *
+ * @param globalLevel : boolean (whether lifecycle is enabled for global level or not)
+ * @param globalWithStage : boolean (whether global lifecycle has retention stage defined or not)
+ */
+ @Test(dataProvider = "getLifecycleWithClusterStage")
+ public void retentionStageFromClusterTest(boolean globalLevel, boolean globalWithStage) throws Exception {
+
+ String startTime = TimeUtil.getTimeWrtSystemTime(0);
+ String endTime = TimeUtil.addMinsToTime(startTime, 120);
+
+ FreqType freqType = FreqType.HOURLY;
+ Frequency retentionPeriodGlobal=new Frequency("2", Frequency.TimeUnit.hours);
+ Frequency retentionFrequencyGlobal=new Frequency("1", Frequency.TimeUnit.hours);
+
+ Frequency retentionPeriodCluster=new Frequency("4", Frequency.TimeUnit.hours);
+ Frequency retentionFrequencyCluster=new Frequency("3", Frequency.TimeUnit.hours);
+
+ String priorityGlobal = "HIGH";
+ String priorityCluster = "LOW";
+ String queue = "default";
+
+ LateArrival lateArrival = new LateArrival();
+ lateArrival.setCutOff(new Frequency("1", Frequency.TimeUnit.hours));
+
+ bundles[0].setInputFeedDataPath(testHDFSDir + freqType.getPathValue());
+ final FeedMerlin feedObject = new FeedMerlin(bundles[0].getInputFeedFromBundle());
+ feedObject.setLateArrival(lateArrival);
+ feedObject.setValidity(startTime, endTime);
+
+ if (globalLevel) {
+ feedObject.setLifecycle(createLifecycle(retentionPeriodGlobal, retentionFrequencyGlobal,
+ priorityGlobal, queue, globalWithStage));
+ }
+
+ feedObject.getClusters().getClusters().get(0).setLifecycle(createLifecycle(retentionPeriodCluster,
+ retentionFrequencyCluster, priorityCluster, queue, true));
+
+ replenishData(freqType, false, false);
+
+ final ServiceResponse response = prism.getFeedHelper().submitEntity(feedObject.toString());
+
+ AssertUtil.assertSucceeded(response);
+ commonDataRetentionWorkflow(feedObject.toString(), freqType, RetentionUnit.HOURS,
+ retentionPeriodCluster.getFrequencyAsInt());
+ validateFrequency(feedObject.getName(), retentionFrequencyCluster.getFrequencyAsInt()*60);
+ validatePriorityAndQueue(feedObject.getName(), priorityCluster, queue);
+
+ }
+
+ @DataProvider(name = "getLifecycleWithClusterStage")
+ public Object[][] getLifecycleWithClusterStage() {
+ return new Object[][]{
+
+ {true, true}, // Cluster level lifecylce. Should pass.
+ {false, false}, // Cluster level with no global level lifecylce. Should pass.
+ {true, false}, // Cluster level with empty global level lifecycle.Should pass.
+
+ };
+ }
+
+ /**
+ * Method to create lifecycle tag to be used by feed for lifecycle retention.
+ * @param retentionPeriod : lifecycle retention period.
+ * @param retentionFrequency : lifecycle retention frequency.
+ * @param priority : lifecycle retention priority.
+ * @param queue : lifecycle retention queue.
+ */
+ private Lifecycle createLifecycle(Frequency retentionPeriod, Frequency retentionFrequency,
+ String priority, String queue, boolean stage) {
+ Lifecycle lifecycle = new Lifecycle();
+ if (stage) {
+ String LIMIT_PROPERTY_NAME = "retention.policy.agebaseddelete.limit";
+ Property property = new Property();
+ property.setName(LIMIT_PROPERTY_NAME);
+ property.setValue(retentionPeriod.getTimeUnit() + "(" + retentionPeriod.getFrequencyAsInt() + ")");
+
+ Properties properties = new Properties();
+ properties.getProperties().add(property);
+ RetentionStage retentionStage = new RetentionStage();
+ retentionStage.setFrequency(new Frequency(retentionFrequency.getTimeUnit() +
+ "(" + retentionFrequency.getFrequencyAsInt() + ")"));
+
+ if (!priority.isEmpty()) {
+ retentionStage.setPriority(priority);
+ }
+ if (!queue.isEmpty()) {
+ retentionStage.setQueue(queue);
+ }
+ retentionStage.setProperties(properties);
+ lifecycle.setRetentionStage(retentionStage);
+ }
+ return lifecycle;
+ }
+
+ /**
+ * Validates feed retention frequency with expected frequency.
+ * @param feedName : feed name.
+ * @param frequency : expected frequency.
+ */
+ private void validateFrequency(String feedName, int frequency)
+ throws OozieClientException, JMSException, JSONException {
+ List<CoordinatorJob> coordJobs = OozieUtil.getBundleCoordinators(clusterOC,
+ OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
+ CoordinatorJob coordJobInfo = clusterOC.getCoordJobInfo(coordJobs.get(0).getId());
+ Assert.assertEquals(coordJobInfo.getFrequency(), String.valueOf(frequency),
+ "Invalid retention frequency : " + frequency);
+ }
+
+ /**
+ * Validates feed retention queue and priority with expected values.
+ * @param feedName : feed name.
+ * @param expectedPriority : expected priority.
+ * @param expectedQueue : expected queue.
+ */
+ private void validatePriorityAndQueue(String feedName, String expectedPriority, String expectedQueue)
+ throws OozieClientException, JMSException, JSONException {
+
+ Configuration configuration = OozieUtil.getRetentionConfiguration(clusterOC,
+ OozieUtil.getBundles(clusterOC, feedName, EntityType.FEED).get(0));
+ String priority = configuration.get("jobPriority");
+ String queue = configuration.get("queueName");
+ Assert.assertEquals(priority, expectedPriority, "Priority should be : " + expectedPriority);
+ Assert.assertEquals(queue, expectedQueue, "Queue should be : " + expectedQueue);
+ }
}