You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/02 00:42:10 UTC

[09/13] git commit: FALCON-161 Feed evictor evicts instances not eligible when pattern has dash. Contributed by Venkatesh Seetharam

FALCON-161 Feed evictor evicts instances not eligible when pattern has dash. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/6812bb37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/6812bb37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/6812bb37

Branch: refs/heads/FALCON-85
Commit: 6812bb37ee78aaf8842d9030c249eaa00efabca9
Parents: eb8ca3d
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Nov 1 11:59:33 2013 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Nov 1 13:40:40 2013 -0700

----------------------------------------------------------------------
 .../apache/falcon/catalog/CatalogPartition.java |   2 +-
 .../falcon/entity/CatalogStorageTest.java       |  26 +++-
 .../apache/falcon/retention/FeedEvictor.java    |  14 +-
 .../lifecycle/TableStorageFeedEvictorIT.java    | 135 ++++++++++++-------
 4 files changed, 119 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index ab312e9..c5d4705 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -171,7 +171,7 @@ public class CatalogPartition {
 
     @Override
     public String toString() {
-        return "HCatPartition ["
+        return "CatalogPartition ["
             + (tableName != null ? "tableName=" + tableName + ", " : "tableName=null")
             + (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null")
             + (values != null ? "values=" + values + ", " : "values=null")

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index 37f3f3e..972066d 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -85,18 +85,32 @@ public class CatalogStorageTest {
         Assert.assertFalse(storage.hasPartition("unknown"));
     }
 
+    @DataProvider(name = "invalidFeedURITemplates")
+    public Object[][] createInValidFeedUriTemplates() {
+        return new Object[][] {
+            {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}/${DAY}"},
+            {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}-${DAY}"},
+        };
+    }
+
+    @Test(dataProvider = "invalidFeedURITemplates", expectedExceptions = URISyntaxException.class)
+    public void testParseInvalidFeedUriTemplate(String uriTemplate) throws URISyntaxException {
+        new CatalogStorage(uriTemplate);
+        Assert.fail("Exception must have been thrown");
+    }
+
     @DataProvider(name = "invalidFeedURIs")
-    public Object[][] createParseFeedUriInvalid() {
+    public Object[][] createFeedUriInvalid() {
         return new Object[][] {
-            {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
-            {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
-            {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us", ""},
-            {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us", ""},
+            {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+            {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+            {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us"},
+            {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us"},
         };
     }
 
     @Test(dataProvider = "invalidFeedURIs", expectedExceptions = URISyntaxException.class)
-    public void testParseFeedUriInvalid(String tableUri, String ignore) throws URISyntaxException {
+    public void testParseFeedUriInvalid(String tableUri) throws URISyntaxException {
         new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
         Assert.fail("Exception must have been thrown");
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 75ef78c..372e2d3 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -361,7 +361,13 @@ public class FeedEvictor extends Configured implements Tool {
         LOG.info("Applying retention on " + storage.getTable()
                 + ", Limit: " + retentionLimit + ", timezone: " + timeZone);
 
-        String dateMask = getDateFormatInPath(storage.getUriTemplate());
+        String datedPartitionKey = storage.getDatedPartitionKey();
+        String datePattern = storage.getPartitionValue(datedPartitionKey);
+        String dateMask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
+                .replaceAll(VARS.MONTH.regex(), "MM")
+                .replaceAll(VARS.DAY.regex(), "dd")
+                .replaceAll(VARS.HOUR.regex(), "HH")
+                .replaceAll(VARS.MINUTE.regex(), "mm");
 
         List<CatalogPartition> toBeDeleted = discoverPartitionsToDelete(
                 storage, retentionLimit, timeZone, dateMask);
@@ -389,7 +395,7 @@ public class FeedEvictor extends Configured implements Tool {
                                 String timeZone, String dateMask) throws ELException {
 
         Pair<Date, Date> range = getDateRange(retentionLimit);
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        DateFormat dateFormat = new SimpleDateFormat(dateMask);
         dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
         String beforeDate = dateFormat.format(range.first);
 
@@ -398,9 +404,9 @@ public class FeedEvictor extends Configured implements Tool {
         StringBuilder filterBuffer = new StringBuilder();
         filterBuffer.append(datedPartitionKey)
                 .append(" < ")
-                .append("\"")
+                .append("'")
                 .append(beforeDate)
-                .append("\"");
+                .append("'");
 
         return filterBuffer.toString();
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 4d8ced0..9b672f4 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hcatalog.api.HCatAddPartitionDesc;
 import org.apache.hcatalog.api.HCatClient;
 import org.apache.hcatalog.api.HCatPartition;
+import org.apache.hcatalog.common.HCatException;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.PrintStream;
+import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -69,8 +71,8 @@ public class TableStorageFeedEvictorIT {
     private static final String DATABASE_NAME = "falcon_db";
     private static final String TABLE_NAME = "clicks";
     private static final String EXTERNAL_TABLE_NAME = "clicks_external";
-    private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
-    private static final String FORMAT = "yyyyMMddHHmm";
+    private static final String STORAGE_URL = "hdfs://localhost:41020";
+    private static final String EXTERNAL_TABLE_LOCATION = STORAGE_URL + "/falcon/staging/clicks_external/";
 
     private final InMemoryWriter stream = new InMemoryWriter(System.out);
 
@@ -96,30 +98,46 @@ public class TableStorageFeedEvictorIT {
         HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
     }
 
-    @DataProvider (name = "retentionLimitDataProvider")
-    private Object[][] createRetentionLimitData() {
+    @DataProvider (name = "evictorTestDataProvider")
+    private Object[][] createEvictorTestData() {
         return new Object[][] {
-            {"days(10)", 4},
-            {"days(100)", 7},
+            {"days(10)", "", false},
+            {"days(10)", "", true},
+            {"days(10)", "-", false},
+            {"days(10)", "-", true},
+            {"days(15)", "", false},
+            {"days(15)", "", true},
+            {"days(15)", "-", false},
+            {"days(15)", "-", true},
+            {"days(100)", "", false},
+            {"days(100)", "", true},
+            {"days(100)", "-", false},
+            {"days(100)", "-", true},
         };
     }
 
-    @Test (dataProvider = "retentionLimitDataProvider")
-    public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
-
+    @Test (dataProvider = "evictorTestDataProvider")
+    public void testFeedEvictorForTableStorage(String retentionLimit, String dateSeparator,
+                                               boolean isExternal) throws Exception {
+        final String tableName = isExternal ? EXTERNAL_TABLE_NAME : TABLE_NAME;
         final String timeZone = "UTC";
-        final String dateMask = "yyyyMMdd";
+        final String dateMask = "yyyy" + dateSeparator + "MM" + dateSeparator + "dd";
 
-        Pair<Date, Date> range = getDateRange(retentionLimit);
         List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
-        addPartitions(TABLE_NAME, candidatePartitions, false);
+        addPartitions(tableName, candidatePartitions, isExternal);
+
+        List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, tableName);
+        Assert.assertEquals(partitions.size(), candidatePartitions.size());
+        Pair<Date, Date> range = getDateRange(retentionLimit);
+        List<HCatPartition> filteredPartitions = getFilteredPartitions(tableName, timeZone, dateMask, range);
 
         try {
             stream.clear();
 
-            final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+            final String tableUri = DATABASE_NAME + "/" + tableName
+                    + "/ds=${YEAR}" + dateSeparator + "${MONTH}" + dateSeparator + "${DAY};region=us";
             String feedBasePath = METASTORE_URL + tableUri;
-            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+            String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
 
             FeedEvictor.main(new String[]{
                 "-feedBasePath", feedBasePath,
@@ -131,35 +149,56 @@ public class TableStorageFeedEvictorIT {
                 "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
             });
 
-            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
-            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+            StringBuilder expectedInstancePaths = new StringBuilder();
+            List<String> expectedInstancesEvicted = getExpectedEvictedInstances(
+                    candidatePartitions, range.first, dateMask, timeZone, expectedInstancePaths);
+            int expectedSurvivorSize = candidatePartitions.size() - expectedInstancesEvicted.size();
+
+            List<HCatPartition> survivingPartitions = client.getPartitions(DATABASE_NAME, tableName);
+            Assert.assertEquals(survivingPartitions.size(), expectedSurvivorSize,
+                    "Unexpected number of surviving partitions");
 
-            Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+            Assert.assertEquals(expectedInstancesEvicted.size(), filteredPartitions.size(),
+                    "Unexpected number of evicted partitions");
 
-        } catch (Exception e) {
-            Assert.fail("Unknown exception", e);
+            final String actualInstancesEvicted = readLogFile(new Path(logFile));
+            Assert.assertEquals(actualInstancesEvicted, expectedInstancePaths.toString(),
+                    "Unexpected number of Logged partitions");
+
+            if (isExternal) {
+                verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
+            }
         } finally {
-            dropPartitions(TABLE_NAME, candidatePartitions);
+            dropPartitions(tableName, candidatePartitions);
+            Assert.assertEquals(client.getPartitions(DATABASE_NAME, tableName).size(), 0);
         }
     }
 
-    @Test (dataProvider = "retentionLimitDataProvider")
-    public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
+    @DataProvider (name = "evictorTestInvalidDataProvider")
+    private Object[][] createEvictorTestDataInvalid() {
+        return new Object[][] {
+            {"days(10)", "/", false},
+            {"days(10)", "/", true},
+        };
+    }
 
+    @Test (dataProvider = "evictorTestInvalidDataProvider", expectedExceptions = URISyntaxException.class)
+    public void testFeedEvictorForInvalidTableStorage(String retentionLimit, String dateSeparator,
+                                                      boolean isExternal) throws Exception {
+        final String tableName = isExternal ? EXTERNAL_TABLE_NAME : TABLE_NAME;
         final String timeZone = "UTC";
-        final String dateMask = "yyyyMMdd";
+        final String dateMask = "yyyy" + dateSeparator + "MM" + dateSeparator + "dd";
 
-        Pair<Date, Date> range = getDateRange(retentionLimit);
         List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
-        addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
+        addPartitions(tableName, candidatePartitions, isExternal);
 
         try {
             stream.clear();
 
-            final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+            final String tableUri = DATABASE_NAME + "/" + tableName
+                    + "/ds=${YEAR}" + dateSeparator + "${MONTH}" + dateSeparator + "${DAY};region=us";
             String feedBasePath = METASTORE_URL + tableUri;
-            String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+            String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
 
             FeedEvictor.main(new String[]{
                 "-feedBasePath", feedBasePath,
@@ -171,18 +210,9 @@ public class TableStorageFeedEvictorIT {
                 "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
             });
 
-            List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
-            Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
-            Assert.assertEquals(readLogFile(new Path(logFile)),
-                    getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
-            verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
-
-        } catch (Exception e) {
-            Assert.fail("Unknown exception", e);
+            Assert.fail("Exception must have been thrown");
         } finally {
-            dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
+            dropPartitions(tableName, candidatePartitions);
         }
     }
 
@@ -192,7 +222,7 @@ public class TableStorageFeedEvictorIT {
 
         Pair<Date, Date> range = getDateRange(retentionLimit);
 
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        DateFormat dateFormat = new SimpleDateFormat(dateMask);
         dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
 
         String startDate = dateFormat.format(range.first);
@@ -255,24 +285,35 @@ public class TableStorageFeedEvictorIT {
         }
     }
 
-    public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
-                                           String dateMask, String timeZone) {
+    private List<HCatPartition> getFilteredPartitions(String tableName, String timeZone, String dateMask,
+                                                      Pair<Date, Date> range) throws HCatException {
+        DateFormat dateFormat = new SimpleDateFormat(dateMask);
+        dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+        String filter = "ds < '" + dateFormat.format(range.first) + "'";
+        return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
+    }
+
+    public List<String> getExpectedEvictedInstances(List<String> candidatePartitions, Date date, String dateMask,
+                                                    String timeZone, StringBuilder instancePaths) {
         Collections.sort(candidatePartitions);
-        StringBuilder instances = new StringBuilder("instancePaths=");
+        instancePaths.append("instancePaths=");
 
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        DateFormat dateFormat = new SimpleDateFormat(dateMask);
         dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
         String startDate = dateFormat.format(date);
 
+        List<String> expectedInstances = new ArrayList<String>();
         for (String candidatePartition : candidatePartitions) {
             if (candidatePartition.compareTo(startDate) < 0) {
-                instances.append("[")
+                expectedInstances.add(candidatePartition);
+                instancePaths.append("[")
                         .append(candidatePartition)
                         .append(", in],");
             }
         }
 
-        return instances.toString();
+        return expectedInstances;
     }
 
     private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
@@ -280,7 +321,7 @@ public class TableStorageFeedEvictorIT {
 
         FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
 
-        DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+        DateFormat dateFormat = new SimpleDateFormat(dateMask);
         dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
         String startDate = dateFormat.format(date);