You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/11/02 00:42:10 UTC
[09/13] git commit: FALCON-161 Feed evictor evicts instances not
eligible when pattern has dash. Contributed by Venkatesh Seetharam
FALCON-161 Feed evictor evicts instances not eligible when pattern has dash. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/6812bb37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/6812bb37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/6812bb37
Branch: refs/heads/FALCON-85
Commit: 6812bb37ee78aaf8842d9030c249eaa00efabca9
Parents: eb8ca3d
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Fri Nov 1 11:59:33 2013 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Fri Nov 1 13:40:40 2013 -0700
----------------------------------------------------------------------
.../apache/falcon/catalog/CatalogPartition.java | 2 +-
.../falcon/entity/CatalogStorageTest.java | 26 +++-
.../apache/falcon/retention/FeedEvictor.java | 14 +-
.../lifecycle/TableStorageFeedEvictorIT.java | 135 ++++++++++++-------
4 files changed, 119 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
index ab312e9..c5d4705 100644
--- a/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
+++ b/common/src/main/java/org/apache/falcon/catalog/CatalogPartition.java
@@ -171,7 +171,7 @@ public class CatalogPartition {
@Override
public String toString() {
- return "HCatPartition ["
+ return "CatalogPartition ["
+ (tableName != null ? "tableName=" + tableName + ", " : "tableName=null")
+ (databaseName != null ? "dbName=" + databaseName + ", " : "dbName=null")
+ (values != null ? "values=" + values + ", " : "values=null")
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
index 37f3f3e..972066d 100644
--- a/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/CatalogStorageTest.java
@@ -85,18 +85,32 @@ public class CatalogStorageTest {
Assert.assertFalse(storage.hasPartition("unknown"));
}
+ @DataProvider(name = "invalidFeedURITemplates")
+ public Object[][] createInValidFeedUriTemplates() {
+ return new Object[][] {
+ {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}/${DAY}"},
+ {"thrift://localhost:49083/clicksdb/clicks/region=us;ds=${YEAR}/${MONTH}-${DAY}"},
+ };
+ }
+
+ @Test(dataProvider = "invalidFeedURITemplates", expectedExceptions = URISyntaxException.class)
+ public void testParseInvalidFeedUriTemplate(String uriTemplate) throws URISyntaxException {
+ new CatalogStorage(uriTemplate);
+ Assert.fail("Exception must have been thrown");
+ }
+
@DataProvider(name = "invalidFeedURIs")
- public Object[][] createParseFeedUriInvalid() {
+ public Object[][] createFeedUriInvalid() {
return new Object[][] {
- {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
- {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us", ""},
- {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us", ""},
- {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us", ""},
+ {"catalog:default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+ {"default:clicks:ds=${YEAR}-${MONTH}-${DAY}#region=us"},
+ {"catalog:default#ds=${YEAR}-${MONTH}-${DAY};region=us"},
+ {"catalog://default/clicks#ds=${YEAR}-${MONTH}-${DAY}:region=us"},
};
}
@Test(dataProvider = "invalidFeedURIs", expectedExceptions = URISyntaxException.class)
- public void testParseFeedUriInvalid(String tableUri, String ignore) throws URISyntaxException {
+ public void testParseFeedUriInvalid(String tableUri) throws URISyntaxException {
new CatalogStorage(CatalogStorage.CATALOG_URL, tableUri);
Assert.fail("Exception must have been thrown");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 75ef78c..372e2d3 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -361,7 +361,13 @@ public class FeedEvictor extends Configured implements Tool {
LOG.info("Applying retention on " + storage.getTable()
+ ", Limit: " + retentionLimit + ", timezone: " + timeZone);
- String dateMask = getDateFormatInPath(storage.getUriTemplate());
+ String datedPartitionKey = storage.getDatedPartitionKey();
+ String datePattern = storage.getPartitionValue(datedPartitionKey);
+ String dateMask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
+ .replaceAll(VARS.MONTH.regex(), "MM")
+ .replaceAll(VARS.DAY.regex(), "dd")
+ .replaceAll(VARS.HOUR.regex(), "HH")
+ .replaceAll(VARS.MINUTE.regex(), "mm");
List<CatalogPartition> toBeDeleted = discoverPartitionsToDelete(
storage, retentionLimit, timeZone, dateMask);
@@ -389,7 +395,7 @@ public class FeedEvictor extends Configured implements Tool {
String timeZone, String dateMask) throws ELException {
Pair<Date, Date> range = getDateRange(retentionLimit);
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ DateFormat dateFormat = new SimpleDateFormat(dateMask);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
String beforeDate = dateFormat.format(range.first);
@@ -398,9 +404,9 @@ public class FeedEvictor extends Configured implements Tool {
StringBuilder filterBuffer = new StringBuilder();
filterBuffer.append(datedPartitionKey)
.append(" < ")
- .append("\"")
+ .append("'")
.append(beforeDate)
- .append("\"");
+ .append("'");
return filterBuffer.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/6812bb37/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 4d8ced0..9b672f4 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hcatalog.api.HCatAddPartitionDesc;
import org.apache.hcatalog.api.HCatClient;
import org.apache.hcatalog.api.HCatPartition;
+import org.apache.hcatalog.common.HCatException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -45,6 +46,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.net.URISyntaxException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -69,8 +71,8 @@ public class TableStorageFeedEvictorIT {
private static final String DATABASE_NAME = "falcon_db";
private static final String TABLE_NAME = "clicks";
private static final String EXTERNAL_TABLE_NAME = "clicks_external";
- private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/clicks_external/";
- private static final String FORMAT = "yyyyMMddHHmm";
+ private static final String STORAGE_URL = "hdfs://localhost:41020";
+ private static final String EXTERNAL_TABLE_LOCATION = STORAGE_URL + "/falcon/staging/clicks_external/";
private final InMemoryWriter stream = new InMemoryWriter(System.out);
@@ -96,30 +98,46 @@ public class TableStorageFeedEvictorIT {
HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
}
- @DataProvider (name = "retentionLimitDataProvider")
- private Object[][] createRetentionLimitData() {
+ @DataProvider (name = "evictorTestDataProvider")
+ private Object[][] createEvictorTestData() {
return new Object[][] {
- {"days(10)", 4},
- {"days(100)", 7},
+ {"days(10)", "", false},
+ {"days(10)", "", true},
+ {"days(10)", "-", false},
+ {"days(10)", "-", true},
+ {"days(15)", "", false},
+ {"days(15)", "", true},
+ {"days(15)", "-", false},
+ {"days(15)", "-", true},
+ {"days(100)", "", false},
+ {"days(100)", "", true},
+ {"days(100)", "-", false},
+ {"days(100)", "-", true},
};
}
- @Test (dataProvider = "retentionLimitDataProvider")
- public void testFeedEvictorForTable(String retentionLimit, int expectedSize) throws Exception {
-
+ @Test (dataProvider = "evictorTestDataProvider")
+ public void testFeedEvictorForTableStorage(String retentionLimit, String dateSeparator,
+ boolean isExternal) throws Exception {
+ final String tableName = isExternal ? EXTERNAL_TABLE_NAME : TABLE_NAME;
final String timeZone = "UTC";
- final String dateMask = "yyyyMMdd";
+ final String dateMask = "yyyy" + dateSeparator + "MM" + dateSeparator + "dd";
- Pair<Date, Date> range = getDateRange(retentionLimit);
List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
- addPartitions(TABLE_NAME, candidatePartitions, false);
+ addPartitions(tableName, candidatePartitions, isExternal);
+
+ List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, tableName);
+ Assert.assertEquals(partitions.size(), candidatePartitions.size());
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+ List<HCatPartition> filteredPartitions = getFilteredPartitions(tableName, timeZone, dateMask, range);
try {
stream.clear();
- final String tableUri = DATABASE_NAME + "/" + TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+ final String tableUri = DATABASE_NAME + "/" + tableName
+ + "/ds=${YEAR}" + dateSeparator + "${MONTH}" + dateSeparator + "${DAY};region=us";
String feedBasePath = METASTORE_URL + tableUri;
- String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+ String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", feedBasePath,
@@ -131,35 +149,56 @@ public class TableStorageFeedEvictorIT {
"-falconFeedStorageType", Storage.TYPE.TABLE.name(),
});
- List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, TABLE_NAME);
- Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
+ StringBuilder expectedInstancePaths = new StringBuilder();
+ List<String> expectedInstancesEvicted = getExpectedEvictedInstances(
+ candidatePartitions, range.first, dateMask, timeZone, expectedInstancePaths);
+ int expectedSurvivorSize = candidatePartitions.size() - expectedInstancesEvicted.size();
+
+ List<HCatPartition> survivingPartitions = client.getPartitions(DATABASE_NAME, tableName);
+ Assert.assertEquals(survivingPartitions.size(), expectedSurvivorSize,
+ "Unexpected number of surviving partitions");
- Assert.assertEquals(readLogFile(new Path(logFile)),
- getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
+ Assert.assertEquals(expectedInstancesEvicted.size(), filteredPartitions.size(),
+ "Unexpected number of evicted partitions");
- } catch (Exception e) {
- Assert.fail("Unknown exception", e);
+ final String actualInstancesEvicted = readLogFile(new Path(logFile));
+ Assert.assertEquals(actualInstancesEvicted, expectedInstancePaths.toString(),
+ "Unexpected number of Logged partitions");
+
+ if (isExternal) {
+ verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
+ }
} finally {
- dropPartitions(TABLE_NAME, candidatePartitions);
+ dropPartitions(tableName, candidatePartitions);
+ Assert.assertEquals(client.getPartitions(DATABASE_NAME, tableName).size(), 0);
}
}
- @Test (dataProvider = "retentionLimitDataProvider")
- public void testFeedEvictorForExternalTable(String retentionLimit, int expectedSize) throws Exception {
+ @DataProvider (name = "evictorTestInvalidDataProvider")
+ private Object[][] createEvictorTestDataInvalid() {
+ return new Object[][] {
+ {"days(10)", "/", false},
+ {"days(10)", "/", true},
+ };
+ }
+ @Test (dataProvider = "evictorTestInvalidDataProvider", expectedExceptions = URISyntaxException.class)
+ public void testFeedEvictorForInvalidTableStorage(String retentionLimit, String dateSeparator,
+ boolean isExternal) throws Exception {
+ final String tableName = isExternal ? EXTERNAL_TABLE_NAME : TABLE_NAME;
final String timeZone = "UTC";
- final String dateMask = "yyyyMMdd";
+ final String dateMask = "yyyy" + dateSeparator + "MM" + dateSeparator + "dd";
- Pair<Date, Date> range = getDateRange(retentionLimit);
List<String> candidatePartitions = getCandidatePartitions("days(10)", dateMask, timeZone, 3);
- addPartitions(EXTERNAL_TABLE_NAME, candidatePartitions, true);
+ addPartitions(tableName, candidatePartitions, isExternal);
try {
stream.clear();
- final String tableUri = DATABASE_NAME + "/" + EXTERNAL_TABLE_NAME + "/ds=${YEAR}${MONTH}${DAY};region=us";
+ final String tableUri = DATABASE_NAME + "/" + tableName
+ + "/ds=${YEAR}" + dateSeparator + "${MONTH}" + dateSeparator + "${DAY};region=us";
String feedBasePath = METASTORE_URL + tableUri;
- String logFile = "hdfs://localhost:41020/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+ String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
FeedEvictor.main(new String[]{
"-feedBasePath", feedBasePath,
@@ -171,18 +210,9 @@ public class TableStorageFeedEvictorIT {
"-falconFeedStorageType", Storage.TYPE.TABLE.name(),
});
- List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, EXTERNAL_TABLE_NAME);
- Assert.assertEquals(partitions.size(), expectedSize, "Unexpected number of partitions");
-
- Assert.assertEquals(readLogFile(new Path(logFile)),
- getExpectedInstancePaths(candidatePartitions, range.first, dateMask, timeZone));
-
- verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
-
- } catch (Exception e) {
- Assert.fail("Unknown exception", e);
+ Assert.fail("Exception must have been thrown");
} finally {
- dropPartitions(EXTERNAL_TABLE_NAME, candidatePartitions);
+ dropPartitions(tableName, candidatePartitions);
}
}
@@ -192,7 +222,7 @@ public class TableStorageFeedEvictorIT {
Pair<Date, Date> range = getDateRange(retentionLimit);
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ DateFormat dateFormat = new SimpleDateFormat(dateMask);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
String startDate = dateFormat.format(range.first);
@@ -255,24 +285,35 @@ public class TableStorageFeedEvictorIT {
}
}
- public String getExpectedInstancePaths(List<String> candidatePartitions, Date date,
- String dateMask, String timeZone) {
+ private List<HCatPartition> getFilteredPartitions(String tableName, String timeZone, String dateMask,
+ Pair<Date, Date> range) throws HCatException {
+ DateFormat dateFormat = new SimpleDateFormat(dateMask);
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+ String filter = "ds < '" + dateFormat.format(range.first) + "'";
+ return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
+ }
+
+ public List<String> getExpectedEvictedInstances(List<String> candidatePartitions, Date date, String dateMask,
+ String timeZone, StringBuilder instancePaths) {
Collections.sort(candidatePartitions);
- StringBuilder instances = new StringBuilder("instancePaths=");
+ instancePaths.append("instancePaths=");
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ DateFormat dateFormat = new SimpleDateFormat(dateMask);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
String startDate = dateFormat.format(date);
+ List<String> expectedInstances = new ArrayList<String>();
for (String candidatePartition : candidatePartitions) {
if (candidatePartition.compareTo(startDate) < 0) {
- instances.append("[")
+ expectedInstances.add(candidatePartition);
+ instancePaths.append("[")
.append(candidatePartition)
.append(", in],");
}
}
- return instances.toString();
+ return expectedInstances;
}
private void verifyFSPartitionsAreDeleted(List<String> candidatePartitions, Date date,
@@ -280,7 +321,7 @@ public class TableStorageFeedEvictorIT {
FileSystem fs = new Path(EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
- DateFormat dateFormat = new SimpleDateFormat(FORMAT.substring(0, dateMask.length()));
+ DateFormat dateFormat = new SimpleDateFormat(dateMask);
dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
String startDate = dateFormat.format(date);