You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/10/13 10:36:39 UTC
falcon git commit: FALCON-1524 Improve Lifecycle Retention validation
checks. Contributed by Ajay Yadava.
Repository: falcon
Updated Branches:
refs/heads/master 666a2a26b -> 4a64dec0e
FALCON-1524 Improve Lifecycle Retention validation checks. Contributed by Ajay Yadava.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4a64dec0
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4a64dec0
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4a64dec0
Branch: refs/heads/master
Commit: 4a64dec0ee4d5cf685bde953197e84526cf32d7e
Parents: 666a2a2
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Oct 13 13:32:58 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Oct 13 13:36:36 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/entity/parser/FeedEntityParser.java | 11 +++++-
.../lifecycle/retention/AgeBasedDelete.java | 16 ++++++++
.../java/org/apache/falcon/util/DateUtil.java | 31 ++++++++++++++++
.../entity/parser/FeedEntityParserTest.java | 39 ++++++++++++++++++++
5 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e8486f..2dc7e3c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,8 @@ Trunk (Unreleased)
FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
IMPROVEMENTS
+ FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)
+
FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao)
FALCON-1527 Release Falcon Unit test jar(Pavan Kumar Kolamuri via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 6be2495..c5cfdd2 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -47,8 +47,8 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.group.FeedGroup;
import org.apache.falcon.group.FeedGroupMap;
-import org.apache.falcon.util.DateUtil;
import org.apache.falcon.service.LifecyclePolicyMap;
+import org.apache.falcon.util.DateUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.slf4j.Logger;
@@ -133,6 +133,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
+ cluster.getName());
}
+ validateRetentionFrequency(feed, cluster.getName());
for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
map.get(policyName).validate(feed, cluster.getName());
}
@@ -140,6 +141,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
}
}
+ private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException {
+ Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName);
+ Frequency feedFrequency = feed.getFrequency();
+ if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) {
+ throw new ValidationException("Retention can not be more frequent than data availability.");
+ }
+ }
+
private Set<Process> findProcesses(Set<Entity> referenced) {
Set<Process> processes = new HashSet<Process>();
for (Entity entity : referenced) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
index a4ae780..ccb0290 100644
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.feed.Property;
import org.apache.falcon.entity.v0.feed.RetentionStage;
import org.apache.falcon.entity.v0.feed.Sla;
import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.util.StartupProperties;
import java.util.Date;
@@ -48,6 +49,21 @@ public class AgeBasedDelete extends RetentionPolicy {
if (cluster != null) {
validateLimitWithSla(feed, cluster, retentionLimit.toString());
validateLimitWithLateData(feed, cluster, retentionLimit.toString());
+ String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
+ "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory");
+ if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) {
+ validateRetentionFrequencyForOozie(feed, clusterName);
+ }
+ }
+ }
+
+
+ private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException {
+ // retention shouldn't be more frequent than hours(1) for Oozie Builders.
+ Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName);
+ if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes
+ && retentionFrequency.getFrequencyAsInt() < 60) {
+ throw new ValidationException("Feed Retention can not be more frequent than hours(1)");
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index 82163cc..b70fa20 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -18,6 +18,7 @@
package org.apache.falcon.util;
import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.Frequency;
import java.util.Calendar;
import java.util.Date;
@@ -28,6 +29,11 @@ import java.util.TimeZone;
*/
public final class DateUtil {
+ private static final long MINUTE_IN_MS = 60 * 1000L;
+ private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+ private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
+ private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
+
//Friday, April 16, 9999 7:12:55 AM UTC corresponding date
public static final Date NEVER = new Date(Long.parseLong("253379862775000"));
@@ -45,6 +51,31 @@ public final class DateUtil {
public static String getDateFormatFromTime(long milliSeconds) {
return SchemaHelper.getDateFormat().format((new Date(milliSeconds)));
+ }
+
+ /**
+ * This function should not be used for scheduling related functions as it may cause correctness issues in those
+ * scenarios.
+ * @param frequency
+ * @return
+ */
+ public static Long getFrequencyInMillis(Frequency frequency){
+ switch (frequency.getTimeUnit()) {
+
+ case months:
+ return MONTH_IN_MS * frequency.getFrequencyAsInt();
+
+ case days:
+ return DAY_IN_MS * frequency.getFrequencyAsInt();
+
+ case hours:
+ return HOUR_IN_MS * frequency.getFrequencyAsInt();
+
+ case minutes:
+ return MINUTE_IN_MS * frequency.getFrequencyAsInt();
+ default:
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 1c43800..905be68 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -183,6 +183,45 @@ public class FeedEntityParserTest extends AbstractTestBase {
parser.validate(feed);
}
+ @Test
+ public void testValidRetentionFrequency() throws Exception {
+ Feed feed = parser.parseAndValidate(this.getClass()
+ .getResourceAsStream(FEED3_XML));
+
+ feed.setFrequency(Frequency.fromString("minutes(30)"));
+ Frequency frequency = Frequency.fromString("minutes(60)");
+ feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+ parser.validate(feed); // no validation exception should be thrown
+
+ frequency = Frequency.fromString("hours(1)");
+ feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+ parser.validate(feed); // no validation exception should be thrown
+ }
+
+ @Test(expectedExceptions = ValidationException.class,
+ expectedExceptionsMessageRegExp = ".*Retention can not be more frequent than data availability.*")
+ public void testRetentionFrequentThanFeed() throws Exception {
+ Feed feed = parser.parseAndValidate(this.getClass()
+ .getResourceAsStream(FEED3_XML));
+
+ feed.setFrequency(Frequency.fromString("hours(2)"));
+ Frequency frequency = Frequency.fromString("minutes(60)");
+ feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+ parser.validate(feed);
+ }
+
+ @Test(expectedExceptions = ValidationException.class,
+ expectedExceptionsMessageRegExp = ".*Feed Retention can not be more frequent than.*")
+ public void testRetentionFrequency() throws Exception {
+ Feed feed = parser.parseAndValidate(this.getClass()
+ .getResourceAsStream(FEED3_XML));
+
+ feed.setFrequency(Frequency.fromString("minutes(30)"));
+ Frequency frequency = Frequency.fromString("minutes(59)");
+ feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+ parser.validate(feed);
+ }
+
@Test(expectedExceptions = ValidationException.class)
public void applyValidationInvalidFeed() throws Exception {
Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class