You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/10/13 10:36:39 UTC

falcon git commit: FALCON-1524 Improve Lifecycle Retention validation checks. Contributed by Ajay Yadava.

Repository: falcon
Updated Branches:
  refs/heads/master 666a2a26b -> 4a64dec0e


FALCON-1524 Improve Lifecycle Retention validation checks. Contributed by Ajay Yadava.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/4a64dec0
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/4a64dec0
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/4a64dec0

Branch: refs/heads/master
Commit: 4a64dec0ee4d5cf685bde953197e84526cf32d7e
Parents: 666a2a2
Author: Ajay Yadava <aj...@gmail.com>
Authored: Tue Oct 13 13:32:58 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Tue Oct 13 13:36:36 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/entity/parser/FeedEntityParser.java  | 11 +++++-
 .../lifecycle/retention/AgeBasedDelete.java     | 16 ++++++++
 .../java/org/apache/falcon/util/DateUtil.java   | 31 ++++++++++++++++
 .../entity/parser/FeedEntityParserTest.java     | 39 ++++++++++++++++++++
 5 files changed, 98 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e8486f..2dc7e3c 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,8 @@ Trunk (Unreleased)
     FALCON-1027 Falcon proxy user support(Sowmya Ramesh)
 
   IMPROVEMENTS
+    FALCON-1524 Improve Lifecycle Retention validation checks(Ajay Yadava)
+
     FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) 
 
     FALCON-1527 Release Falcon Unit test jar(Pavan Kumar Kolamuri via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
index 6be2495..c5cfdd2 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/FeedEntityParser.java
@@ -47,8 +47,8 @@ import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.expression.ExpressionHelper;
 import org.apache.falcon.group.FeedGroup;
 import org.apache.falcon.group.FeedGroupMap;
-import org.apache.falcon.util.DateUtil;
 import org.apache.falcon.service.LifecyclePolicyMap;
+import org.apache.falcon.util.DateUtil;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.slf4j.Logger;
@@ -133,6 +133,7 @@ public class FeedEntityParser extends EntityParser<Feed> {
                     throw new ValidationException("Retention is a mandatory stage, didn't find it for cluster: "
                             + cluster.getName());
                 }
+                validateRetentionFrequency(feed, cluster.getName());
                 for (String policyName : FeedHelper.getPolicies(feed, cluster.getName())) {
                     map.get(policyName).validate(feed, cluster.getName());
                 }
@@ -140,6 +141,14 @@ public class FeedEntityParser extends EntityParser<Feed> {
         }
     }
 
+    private void validateRetentionFrequency(Feed feed, String clusterName) throws FalconException {
+        Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName);
+        Frequency feedFrequency = feed.getFrequency();
+        if (DateUtil.getFrequencyInMillis(retentionFrequency) < DateUtil.getFrequencyInMillis(feedFrequency)) {
+            throw new ValidationException("Retention can not be more frequent than data availability.");
+        }
+    }
+
     private Set<Process> findProcesses(Set<Entity> referenced) {
         Set<Process> processes = new HashSet<Process>();
         for (Entity entity : referenced) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
index a4ae780..ccb0290 100644
--- a/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
+++ b/common/src/main/java/org/apache/falcon/lifecycle/retention/AgeBasedDelete.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.feed.Property;
 import org.apache.falcon.entity.v0.feed.RetentionStage;
 import org.apache.falcon.entity.v0.feed.Sla;
 import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.falcon.util.StartupProperties;
 
 import java.util.Date;
 
@@ -48,6 +49,21 @@ public class AgeBasedDelete extends RetentionPolicy {
         if (cluster != null) {
             validateLimitWithSla(feed, cluster, retentionLimit.toString());
             validateLimitWithLateData(feed, cluster, retentionLimit.toString());
+            String lifecycleEngine = StartupProperties.get().getProperty("lifecycle.engine.impl",
+                    "org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory");
+            if ("org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory".equals(lifecycleEngine)) {
+                validateRetentionFrequencyForOozie(feed, clusterName);
+            }
+        }
+    }
+
+
+    private void validateRetentionFrequencyForOozie(Feed feed, String clusterName) throws FalconException {
+        // retention shouldn't be more frequent than hours(1) for Oozie Builders.
+        Frequency retentionFrequency = FeedHelper.getRetentionFrequency(feed, clusterName);
+        if (retentionFrequency.getTimeUnit() == Frequency.TimeUnit.minutes
+                && retentionFrequency.getFrequencyAsInt() < 60) {
+            throw new ValidationException("Feed Retention can not be more frequent than hours(1)");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/main/java/org/apache/falcon/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java
index 82163cc..b70fa20 100644
--- a/common/src/main/java/org/apache/falcon/util/DateUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java
@@ -18,6 +18,7 @@
 package org.apache.falcon.util;
 
 import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.Frequency;
 
 import java.util.Calendar;
 import java.util.Date;
@@ -28,6 +29,11 @@ import java.util.TimeZone;
  */
 public final class DateUtil {
 
+    private static final long MINUTE_IN_MS = 60 * 1000L;
+    private static final long HOUR_IN_MS = 60 * MINUTE_IN_MS;
+    private static final long DAY_IN_MS = 24 * HOUR_IN_MS;
+    private static final long MONTH_IN_MS = 31 * DAY_IN_MS;
+
     //Friday, April 16, 9999 7:12:55 AM UTC corresponding date
     public static final Date NEVER = new Date(Long.parseLong("253379862775000"));
 
@@ -45,6 +51,31 @@ public final class DateUtil {
 
     public static String getDateFormatFromTime(long milliSeconds) {
         return SchemaHelper.getDateFormat().format((new Date(milliSeconds)));
+    }
+
+    /**
+     * This function should not be used for scheduling related functions as it may cause correctness issues in those
+     * scenarios.
+     * @param frequency
+     * @return
+     */
+    public static Long getFrequencyInMillis(Frequency frequency){
+        switch (frequency.getTimeUnit()) {
+
+        case months:
+            return MONTH_IN_MS * frequency.getFrequencyAsInt();
+
+        case days:
+            return DAY_IN_MS * frequency.getFrequencyAsInt();
+
+        case hours:
+            return HOUR_IN_MS * frequency.getFrequencyAsInt();
+
+        case minutes:
+            return MINUTE_IN_MS * frequency.getFrequencyAsInt();
 
+        default:
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/4a64dec0/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
index 1c43800..905be68 100644
--- a/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/parser/FeedEntityParserTest.java
@@ -183,6 +183,45 @@ public class FeedEntityParserTest extends AbstractTestBase {
         parser.validate(feed);
     }
 
+    @Test
+    public void testValidRetentionFrequency() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+
+        feed.setFrequency(Frequency.fromString("minutes(30)"));
+        Frequency frequency = Frequency.fromString("minutes(60)");
+        feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+        parser.validate(feed); // no validation exception should be thrown
+
+        frequency = Frequency.fromString("hours(1)");
+        feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+        parser.validate(feed); // no validation exception should be thrown
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+        expectedExceptionsMessageRegExp = ".*Retention can not be more frequent than data availability.*")
+    public void testRetentionFrequentThanFeed() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+
+        feed.setFrequency(Frequency.fromString("hours(2)"));
+        Frequency frequency = Frequency.fromString("minutes(60)");
+        feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+        parser.validate(feed);
+    }
+
+    @Test(expectedExceptions = ValidationException.class,
+        expectedExceptionsMessageRegExp = ".*Feed Retention can not be more frequent than.*")
+    public void testRetentionFrequency() throws Exception {
+        Feed feed = parser.parseAndValidate(this.getClass()
+                .getResourceAsStream(FEED3_XML));
+
+        feed.setFrequency(Frequency.fromString("minutes(30)"));
+        Frequency frequency = Frequency.fromString("minutes(59)");
+        feed.getLifecycle().getRetentionStage().setFrequency(frequency);
+        parser.validate(feed);
+    }
+
     @Test(expectedExceptions = ValidationException.class)
     public void applyValidationInvalidFeed() throws Exception {
         Feed feed = parser.parseAndValidate(ProcessEntityParserTest.class