You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/04/24 12:07:22 UTC
git commit: FALCON-284 Hcatalog based feed retention doesn't work
when partition filter spans across multiple partition keys. Contributed by
Satish Mittal
Repository: incubator-falcon
Updated Branches:
refs/heads/master 7fa6ca57e -> 173ebec19
FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across multiple partition keys. Contributed by Satish Mittal
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/173ebec1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/173ebec1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/173ebec1
Branch: refs/heads/master
Commit: 173ebec197358e1881d3e58732881e27149d05a6
Parents: 7fa6ca5
Author: Shwetha GS <sh...@gmail.com>
Authored: Thu Apr 24 15:37:15 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Thu Apr 24 15:37:15 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../falcon/catalog/AbstractCatalogService.java | 10 +
.../falcon/catalog/HiveCatalogService.java | 20 ++
.../falcon/entity/common/FeedDataPath.java | 9 +
.../apache/falcon/retention/FeedEvictor.java | 194 ++++++++++++----
.../falcon/catalog/HiveCatalogServiceIT.java | 19 ++
.../lifecycle/TableStorageFeedEvictorIT.java | 228 ++++++++++++++++++-
7 files changed, 433 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27c2056..153aebe 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -113,6 +113,9 @@ Trunk (Unreleased)
FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
+ FALCON-284 Hcatalog based feed retention doesn't work when partition filter spans across
+ multiple partition keys. (Satish Mittal via Shwetha GS)
+
FALCON-409 Not able to create a package. (Raju Bairishetti via Shwetha GS)
FALCON-396 minor logging typo in FalconTopicSubscriber. (Raghav Kumar Gautam via Shwetha GS)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
index fc9c3b1..df55b88 100644
--- a/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/AbstractCatalogService.java
@@ -107,4 +107,14 @@ public abstract class AbstractCatalogService {
*/
public abstract CatalogPartition getPartition(String catalogUrl, String database, String tableName,
Map<String, String> partitionSpec) throws FalconException;
+
+ /**
+ * @param catalogUrl url for the catalog service
+ * @param database database the table belongs to
+ * @param tableName table name
+ * @return list of partition column names of the table
+ * @throws FalconException
+ */
+ public abstract List<String> getTablePartitionCols(String catalogUrl, String database,
+ String tableName) throws FalconException;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
index 3c3660e..30736f3 100644
--- a/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
+++ b/common/src/main/java/org/apache/falcon/catalog/HiveCatalogService.java
@@ -173,6 +173,7 @@ public class HiveCatalogService extends AbstractCatalogService {
HCatClient client = get(catalogUrl);
List<HCatPartition> hCatPartitions = client.listPartitionsByFilter(database, tableName, filter);
for (HCatPartition hCatPartition : hCatPartitions) {
+ LOG.info("Partition: " + hCatPartition.getValues());
CatalogPartition partition = createCatalogPartition(hCatPartition);
catalogPartitionList.add(partition);
}
@@ -233,4 +234,23 @@ public class HiveCatalogService extends AbstractCatalogService {
throw new FalconException("Exception fetching partition:" + e.getMessage(), e);
}
}
+
+ @Override
+ public List<String> getTablePartitionCols(String catalogUrl, String database,
+ String tableName) throws FalconException {
+ LOG.info("Fetching partition columns of table: " + tableName);
+
+ try {
+ HCatClient client = get(catalogUrl);
+ HCatTable table = client.getTable(database, tableName);
+ List<HCatFieldSchema> partSchema = table.getPartCols();
+ List<String> partCols = new ArrayList<String>();
+ for (HCatFieldSchema part : partSchema) {
+ partCols.add(part.getName());
+ }
+ return partCols;
+ } catch (HCatException e) {
+ throw new FalconException("Exception fetching partition columns: " + e.getMessage(), e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
index 4031e14..39e636b 100644
--- a/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
+++ b/common/src/main/java/org/apache/falcon/entity/common/FeedDataPath.java
@@ -52,6 +52,15 @@ public final class FeedDataPath {
}
return null;
}
+
+ public static VARS presentIn(String str) {
+ for (VARS var : VARS.values()) {
+ if (str.contains(var.datePattern)) {
+ return var;
+ }
+ }
+ return null;
+ }
}
public static final Pattern PATTERN = Pattern.compile(VARS.YEAR.regex()
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index a8db52e..138a769 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -33,6 +33,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -84,6 +85,15 @@ public class FeedEvictor extends Configured implements Tool {
private static final String FORMAT = "yyyyMMddHHmm";
+ // constants to be used while preparing HCatalog partition filter query
+ private static final String FILTER_ST_BRACKET = "(";
+ private static final String FILTER_END_BRACKET = ")";
+ private static final String FILTER_QUOTE = "'";
+ private static final String FILTER_AND = " and ";
+ private static final String FILTER_OR = " or ";
+ private static final String FILTER_LESS_THAN = " < ";
+ private static final String FILTER_EQUALS = " = ";
+
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
@@ -375,16 +385,13 @@ public class FeedEvictor extends Configured implements Tool {
LOG.info("Applying retention on " + storage.getTable()
+ ", Limit: " + retentionLimit + ", timezone: " + timeZone);
- String datedPartitionKey = storage.getDatedPartitionKey();
- String datePattern = storage.getPartitionValue(datedPartitionKey);
- String dateMask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
- .replaceAll(VARS.MONTH.regex(), "MM")
- .replaceAll(VARS.DAY.regex(), "dd")
- .replaceAll(VARS.HOUR.regex(), "HH")
- .replaceAll(VARS.MINUTE.regex(), "mm");
+ // get sorted date partition keys and values
+ List<String> datedPartKeys = new ArrayList<String>();
+ List<String> datedPartValues = new ArrayList<String>();
+ fillSortedDatedPartitionKVs(storage, datedPartKeys, datedPartValues, retentionLimit, timeZone);
List<CatalogPartition> toBeDeleted = discoverPartitionsToDelete(
- storage, retentionLimit, timeZone, dateMask);
+ storage, datedPartKeys, datedPartValues);
if (toBeDeleted.isEmpty()) {
LOG.info("No partitions to delete.");
return;
@@ -393,69 +400,164 @@ public class FeedEvictor extends Configured implements Tool {
final boolean isTableExternal = CatalogServiceFactory.getCatalogService().isTableExternal(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
- dropPartitions(storage, toBeDeleted, isTableExternal);
+ dropPartitions(storage, toBeDeleted, datedPartKeys, isTableExternal);
}
- private List<CatalogPartition> discoverPartitionsToDelete(CatalogStorage storage, String retentionLimit,
- String timeZone, String dateMask)
- throws FalconException, ELException {
+ private List<CatalogPartition> discoverPartitionsToDelete(CatalogStorage storage,
+ List<String> datedPartKeys, List<String> datedPartValues) throws FalconException, ELException {
- final String filter = createFilter(storage, retentionLimit, timeZone, dateMask);
+ final String filter = createFilter(datedPartKeys, datedPartValues);
return CatalogServiceFactory.getCatalogService().listPartitionsByFilter(
storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), filter);
}
- private String createFilter(CatalogStorage storage, String retentionLimit,
- String timeZone, String dateMask) throws ELException {
-
+ private void fillSortedDatedPartitionKVs(CatalogStorage storage, List<String> sortedPartKeys,
+ List<String> sortedPartValues, String retentionLimit, String timeZone) throws ELException {
Pair<Date, Date> range = getDateRange(retentionLimit);
- DateFormat dateFormat = new SimpleDateFormat(dateMask);
- dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
- String beforeDate = dateFormat.format(range.first);
- String datedPartitionKey = storage.getDatedPartitionKey();
+ // sort partition keys and values by the date pattern present in value
+ Map<VARS, String> sortedPartKeyMap = new TreeMap<VARS, String>();
+ Map<VARS, String> sortedPartValueMap = new TreeMap<VARS, String>();
+ for (Entry<String, String> entry : storage.getPartitions().entrySet()) {
+ String datePattern = entry.getValue();
+ String mask = datePattern.replaceAll(VARS.YEAR.regex(), "yyyy")
+ .replaceAll(VARS.MONTH.regex(), "MM")
+ .replaceAll(VARS.DAY.regex(), "dd")
+ .replaceAll(VARS.HOUR.regex(), "HH")
+ .replaceAll(VARS.MINUTE.regex(), "mm");
+ // find the first date pattern present in date mask
+ VARS vars = VARS.presentIn(mask);
+ // skip this partition if date mask doesn't contain any date format
+ if (vars == null) {
+ continue;
+ }
+
+ // construct dated partition value as per format
+ DateFormat dateFormat = new SimpleDateFormat(mask);
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ String partitionValue = dateFormat.format(range.first);
+
+ // add partition key and value in their sorted maps
+ if (!sortedPartKeyMap.containsKey(vars)) {
+ sortedPartKeyMap.put(vars, entry.getKey());
+ }
+
+ if (!sortedPartValueMap.containsKey(vars)) {
+ sortedPartValueMap.put(vars, partitionValue);
+ }
+ }
+
+ // add map entries to lists of partition keys and values
+ sortedPartKeys.addAll(sortedPartKeyMap.values());
+ sortedPartValues.addAll(sortedPartValueMap.values());
+ }
+
+ private String createFilter(List<String> datedPartKeys, List<String> datedPartValues)
+ throws ELException {
+
+ int numPartitions = datedPartKeys.size();
+
+ /* Construct filter query string. As an example, suppose the dated partition keys
+ * are: [year, month, day, hour] and dated partition values are [2014, 02, 24, 10].
+ * Then the filter query generated is of the format:
+ * "(year < '2014') or (year = '2014' and month < '02') or
+ * (year = '2014' and month = '02' and day < '24') or
+ * or (year = '2014' and month = '02' and day = '24' and hour < '10')"
+ */
StringBuilder filterBuffer = new StringBuilder();
- filterBuffer.append(datedPartitionKey)
- .append(" < ")
- .append("'")
- .append(beforeDate)
- .append("'");
+ for (int curr = 0; curr < numPartitions; curr++) {
+ if (curr > 0) {
+ filterBuffer.append(FILTER_OR);
+ }
+ filterBuffer.append(FILTER_ST_BRACKET);
+ for (int prev = 0; prev < curr; prev++) {
+ filterBuffer.append(datedPartKeys.get(prev))
+ .append(FILTER_EQUALS)
+ .append(FILTER_QUOTE)
+ .append(datedPartValues.get(prev))
+ .append(FILTER_QUOTE)
+ .append(FILTER_AND);
+ }
+ filterBuffer.append(datedPartKeys.get(curr))
+ .append(FILTER_LESS_THAN)
+ .append(FILTER_QUOTE)
+ .append(datedPartValues.get(curr))
+ .append(FILTER_QUOTE)
+ .append(FILTER_END_BRACKET);
+ }
return filterBuffer.toString();
}
private void dropPartitions(CatalogStorage storage, List<CatalogPartition> partitionsToDelete,
- boolean isTableExternal) throws FalconException, IOException {
-
+ List<String> datedPartKeys, boolean isTableExternal) throws FalconException, IOException {
+
+ // get table partition columns
+ List<String> partColumns = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
+ storage.getCatalogUrl(), storage.getDatabase(), storage.getTable());
+
+ /* In case partition columns are a super-set of dated partitions, there can be multiple
+ * partitions that share the same set of date-partition values. All such partitions can
+ * be deleted by issuing a single HCatalog dropPartition call per date-partition values.
+ * Arrange the partitions grouped by each set of date-partition values.
+ */
+ Map<Map<String, String>, List<CatalogPartition>> dateToPartitionsMap = new HashMap<
+ Map<String, String>, List<CatalogPartition>>();
for (CatalogPartition partitionToDrop : partitionsToDelete) {
- if (dropPartition(storage, partitionToDrop, isTableExternal)) {
- LOG.info("Deleted partition: " + partitionToDrop.getValues());
- buffer.append(partitionToDrop.getValues().get(0)).append(',');
- instancePaths.append(partitionToDrop.getValues()).append(",");
+ // create a map of name-values of all columns of this partition
+ Map<String, String> partitions = new HashMap<String, String>();
+ for (int i = 0; i < partColumns.size(); i++) {
+ partitions.put(partColumns.get(i), partitionToDrop.getValues().get(i));
+ }
+
+ // create a map of name-values of dated sub-set of this partition
+ Map<String, String> datedPartitions = new HashMap<String, String>();
+ for (String datedPart : datedPartKeys) {
+ datedPartitions.put(datedPart, partitions.get(datedPart));
}
- }
- }
- private boolean dropPartition(CatalogStorage storage, CatalogPartition partitionToDrop,
- boolean isTableExternal) throws FalconException, IOException {
+ // add a map entry of this catalog partition corresponding to its date-partition values
+ List<CatalogPartition> catalogPartitions;
+ if (dateToPartitionsMap.containsKey(datedPartitions)) {
+ catalogPartitions = dateToPartitionsMap.get(datedPartitions);
+ } else {
+ catalogPartitions = new ArrayList<CatalogPartition>();
+ }
+ catalogPartitions.add(partitionToDrop);
+ dateToPartitionsMap.put(datedPartitions, catalogPartitions);
+ }
- String datedPartitionKey = storage.getDatedPartitionKey();
+ // delete each entry within dateToPartitions Map
+ for (Entry<Map<String, String>, List<CatalogPartition>> entry : dateToPartitionsMap.entrySet()) {
+ dropPartitionInstances(storage, entry.getValue(), entry.getKey(), isTableExternal);
+ }
+ }
- Map<String, String> partitions = new HashMap<String, String>();
- partitions.put(datedPartitionKey, partitionToDrop.getValues().get(0));
+ private void dropPartitionInstances(CatalogStorage storage, List<CatalogPartition> partitionsToDrop,
+ Map<String, String> partSpec, boolean isTableExternal) throws FalconException, IOException {
- boolean dropped = CatalogServiceFactory.getCatalogService().dropPartitions(
- storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partitions);
+ boolean deleted = CatalogServiceFactory.getCatalogService().dropPartitions(
+ storage.getCatalogUrl(), storage.getDatabase(), storage.getTable(), partSpec);
- boolean deleted = true;
- if (isTableExternal) { // nuke the dirs if an external table
- final String location = partitionToDrop.getLocation();
- final Path path = new Path(location);
- deleted = path.getFileSystem(new Configuration()).delete(path, true);
+ if (!deleted) {
+ return;
}
- return dropped && deleted;
+ for (CatalogPartition partitionToDrop : partitionsToDrop) {
+ if (isTableExternal) { // nuke the dirs if an external table
+ final String location = partitionToDrop.getLocation();
+ final Path path = new Path(location);
+ deleted = path.getFileSystem(new Configuration()).delete(path, true);
+ }
+ if (!isTableExternal || deleted) {
+ // replace ',' with ';' since message producer splits instancePaths string by ','
+ String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
+ LOG.info("Deleted partition: " + partitionInfo);
+ buffer.append(partSpec).append(',');
+ instancePaths.append(partitionInfo).append(",");
+ }
+ }
}
private void deleteParentIfEmpty(FileSystem fs, Path parent, Path feedBasePath) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index fd004a1..6966a8d 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -37,6 +37,7 @@ import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -334,4 +335,22 @@ public class HiveCatalogServiceIT {
Assert.assertTrue(reInstatedCreateTime > originalCreateTime);
}
+
+ @DataProvider (name = "tableName")
+ public Object[][] createTableName() {
+ return new Object[][] {
+ {TABLE_NAME},
+ {EXTERNAL_TABLE_NAME},
+ };
+ }
+
+ @Test (dataProvider = "tableName")
+ public void testGetTablePartitionCols(String tableName) throws Exception {
+ List<String> partCols = CatalogServiceFactory.getCatalogService().getTablePartitionCols(
+ METASTORE_URL, DATABASE_NAME, tableName);
+ Assert.assertEquals(partCols.size(), 2);
+ Collections.sort(partCols);
+ Assert.assertEquals(partCols.get(0), "ds");
+ Assert.assertEquals(partCols.get(1), "region");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/173ebec1/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 770780e..894a194 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -58,6 +58,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
+import java.util.TreeMap;
/**
* Test for FeedEvictor for table.
@@ -71,8 +72,12 @@ public class TableStorageFeedEvictorIT {
private static final String DATABASE_NAME = "falcon_db";
private static final String TABLE_NAME = "clicks";
private static final String EXTERNAL_TABLE_NAME = "clicks_external";
+ private static final String MULTI_COL_DATED_TABLE_NAME = "downloads";
+ private static final String MULTI_COL_DATED_EXTERNAL_TABLE_NAME = "downloads_external";
private static final String STORAGE_URL = "jail://global:00";
private static final String EXTERNAL_TABLE_LOCATION = STORAGE_URL + "/falcon/staging/clicks_external/";
+ private static final String MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION = STORAGE_URL
+ + "/falcon/staging/downloads_external/";
private final InMemoryWriter stream = new InMemoryWriter(System.out);
@@ -89,12 +94,19 @@ public class TableStorageFeedEvictorIT {
HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME, partitionKeys);
HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME,
partitionKeys, EXTERNAL_TABLE_LOCATION);
+
+ final List<String> multiColDatedPartitionKeys = Arrays.asList("year", "month", "day", "region");
+ HiveTestUtils.createTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_TABLE_NAME, multiColDatedPartitionKeys);
+ HiveTestUtils.createExternalTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_EXTERNAL_TABLE_NAME,
+ multiColDatedPartitionKeys, MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
}
@AfterClass
public void close() throws Exception {
HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, EXTERNAL_TABLE_NAME);
HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, TABLE_NAME);
+ HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_EXTERNAL_TABLE_NAME);
+ HiveTestUtils.dropTable(METASTORE_URL, DATABASE_NAME, MULTI_COL_DATED_TABLE_NAME);
HiveTestUtils.dropDatabase(METASTORE_URL, DATABASE_NAME);
}
@@ -162,8 +174,7 @@ public class TableStorageFeedEvictorIT {
"Unexpected number of evicted partitions");
final String actualInstancesEvicted = readLogFile(new Path(logFile));
- Assert.assertEquals(actualInstancesEvicted, expectedInstancePaths.toString(),
- "Unexpected number of Logged partitions");
+ validateInstancePaths(actualInstancesEvicted, expectedInstancePaths.toString());
if (isExternal) {
verifyFSPartitionsAreDeleted(candidatePartitions, range.first, dateMask, timeZone);
@@ -216,6 +227,74 @@ public class TableStorageFeedEvictorIT {
}
}
+ @DataProvider (name = "multiColDatedEvictorTestDataProvider")
+ private Object[][] createMultiColDatedEvictorTestData() {
+ return new Object[][] {
+ {"days(10)", false},
+ {"days(10)", true},
+ {"days(15)", false},
+ {"days(15)", true},
+ {"days(100)", false},
+ {"days(100)", true},
+ };
+ }
+
+ @Test (dataProvider = "multiColDatedEvictorTestDataProvider")
+ public void testFeedEvictorForMultiColDatedTableStorage(String retentionLimit, boolean isExternal)
+ throws Exception {
+ final String tableName = isExternal ? MULTI_COL_DATED_EXTERNAL_TABLE_NAME : MULTI_COL_DATED_TABLE_NAME;
+ final String timeZone = "UTC";
+
+ List<Map<String, String>> candidatePartitions = getMultiColDatedCandidatePartitions("days(10)", timeZone, 3);
+ addMultiColDatedPartitions(tableName, candidatePartitions, isExternal);
+
+ List<HCatPartition> partitions = client.getPartitions(DATABASE_NAME, tableName);
+ Assert.assertEquals(partitions.size(), candidatePartitions.size());
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+ List<HCatPartition> filteredPartitions = getMultiColDatedFilteredPartitions(tableName, timeZone, range);
+
+ try {
+ stream.clear();
+
+ final String tableUri = DATABASE_NAME + "/" + tableName
+ + "/year=${YEAR};month=${MONTH};day=${DAY};region=us";
+ String feedBasePath = METASTORE_URL + tableUri;
+ String logFile = STORAGE_URL + "/falcon/staging/feed/instancePaths-2013-09-13-01-00.csv";
+
+ FeedEvictor.main(new String[]{
+ "-feedBasePath", feedBasePath,
+ "-retentionType", "instance",
+ "-retentionLimit", retentionLimit,
+ "-timeZone", timeZone,
+ "-frequency", "daily",
+ "-logFile", logFile,
+ "-falconFeedStorageType", Storage.TYPE.TABLE.name(),
+ });
+
+ StringBuilder expectedInstancePaths = new StringBuilder();
+ List<Map<String, String>> expectedInstancesEvicted = getMultiColDatedExpectedEvictedInstances(
+ candidatePartitions, range.first, timeZone, expectedInstancePaths);
+ int expectedSurvivorSize = candidatePartitions.size() - expectedInstancesEvicted.size();
+
+ List<HCatPartition> survivingPartitions = client.getPartitions(DATABASE_NAME, tableName);
+ Assert.assertEquals(survivingPartitions.size(), expectedSurvivorSize,
+ "Unexpected number of surviving partitions");
+
+ Assert.assertEquals(expectedInstancesEvicted.size(), filteredPartitions.size(),
+ "Unexpected number of evicted partitions");
+
+ final String actualInstancesEvicted = readLogFile(new Path(logFile));
+ validateInstancePaths(actualInstancesEvicted, expectedInstancePaths.toString());
+
+ if (isExternal) {
+ verifyMultiColDatedFSPartitionsAreDeleted(candidatePartitions, range.first, timeZone);
+ }
+ } finally {
+ dropMultiColDatedPartitions(tableName, candidatePartitions);
+ Assert.assertEquals(client.getPartitions(DATABASE_NAME, tableName).size(), 0);
+ }
+ }
+
public List<String> getCandidatePartitions(String retentionLimit, String dateMask,
String timeZone, int limit) throws Exception {
List<String> partitions = new ArrayList<String>();
@@ -244,6 +323,41 @@ public class TableStorageFeedEvictorIT {
return partitions;
}
+ public List<Map<String, String>> getMultiColDatedCandidatePartitions(String retentionLimit,
+ String timeZone, int limit) throws Exception {
+ List<Map<String, String>> partitions = new ArrayList<Map<String, String>>();
+
+ Pair<Date, Date> range = getDateRange(retentionLimit);
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(range.first);
+ for (int i = 1; i <= limit; i++) {
+ calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+ String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+ Map<String, String> dateParts = new TreeMap<String, String>();
+ dateParts.put("year", dateCols[0]);
+ dateParts.put("month", dateCols[1]);
+ dateParts.put("day", dateCols[2]);
+ partitions.add(dateParts);
+ }
+
+ calendar.setTime(range.second);
+ for (int i = 1; i <= limit; i++) {
+ calendar.add(Calendar.DAY_OF_MONTH, -(i + 1));
+ String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+ Map<String, String> dateParts = new TreeMap<String, String>();
+ dateParts.put("year", dateCols[0]);
+ dateParts.put("month", dateCols[1]);
+ dateParts.put("day", dateCols[2]);
+ partitions.add(dateParts);
+ }
+
+ return partitions;
+ }
+
private Pair<Date, Date> getDateRange(String period) throws ELException {
Long duration = (Long) EVALUATOR.evaluate("${" + period + "}",
Long.class, RESOLVER, RESOLVER);
@@ -271,6 +385,28 @@ public class TableStorageFeedEvictorIT {
}
}
+ private void addMultiColDatedPartitions(String tableName, List<Map<String, String>> candidatePartitions,
+ boolean isTableExternal) throws Exception {
+ Path path = new Path(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
+ FileSystem fs = path.getFileSystem(new Configuration());
+
+ for (Map<String, String> candidatePartition : candidatePartitions) {
+ if (isTableExternal) {
+ StringBuilder pathStr = new StringBuilder(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION);
+ for (Map.Entry<String, String> entry : candidatePartition.entrySet()) {
+ pathStr.append(entry.getKey()).append("=").append(entry.getValue()).append("/");
+ }
+ pathStr.append("region=in");
+ touch(fs, pathStr.toString());
+ }
+
+ candidatePartition.put("region", "in");
+ HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(
+ DATABASE_NAME, tableName, null, candidatePartition).build();
+ client.addPartition(addPtn);
+ }
+ }
+
private void touch(FileSystem fs, String path) throws Exception {
fs.create(new Path(path)).close();
}
@@ -285,6 +421,14 @@ public class TableStorageFeedEvictorIT {
}
}
+ private void dropMultiColDatedPartitions(String tableName, List<Map<String, String>> candidatePartitions)
+ throws Exception {
+
+ for (Map<String, String> partition : candidatePartitions) {
+ client.dropPartitions(DATABASE_NAME, tableName, partition, true);
+ }
+ }
+
private List<HCatPartition> getFilteredPartitions(String tableName, String timeZone, String dateMask,
Pair<Date, Date> range) throws HCatException {
DateFormat dateFormat = new SimpleDateFormat(dateMask);
@@ -294,6 +438,24 @@ public class TableStorageFeedEvictorIT {
return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
}
+ private List<HCatPartition> getMultiColDatedFilteredPartitions(String tableName, String timeZone,
+ Pair<Date, Date> range) throws HCatException {
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.setTime(range.first);
+ String[] dateCols = dateFormat.format(calendar.getTime()).split("-");
+ // filter eg: "(year < '2014') or (year = '2014' and month < '02') or
+ // (year = '2014' and month = '02' and day < '24')"
+ String filter1 = "(year < '" + dateCols[0] + "')";
+ String filter2 = "(year = '" + dateCols[0] + "' and month < '" + dateCols[1] + "')";
+ String filter3 = "(year = '" + dateCols[0] + "' and month = '" + dateCols[1]
+ + "' and day < '" + dateCols[2] + "')";
+ String filter = filter1 + " or " + filter2 + " or " + filter3;
+ return client.listPartitionsByFilter(DATABASE_NAME, tableName, filter);
+ }
+
public List<String> getExpectedEvictedInstances(List<String> candidatePartitions, Date date, String dateMask,
String timeZone, StringBuilder instancePaths) {
Collections.sort(candidatePartitions);
@@ -307,9 +469,29 @@ public class TableStorageFeedEvictorIT {
for (String candidatePartition : candidatePartitions) {
if (candidatePartition.compareTo(startDate) < 0) {
expectedInstances.add(candidatePartition);
+ instancePaths.append("[").append(candidatePartition).append("; in],");
+ }
+ }
+
+ return expectedInstances;
+ }
+
+ public List<Map<String, String>> getMultiColDatedExpectedEvictedInstances(List<Map<String, String>>
+ candidatePartitions, Date date, String timeZone, StringBuilder instancePaths) throws Exception {
+ instancePaths.append("instancePaths=");
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ String startDate = dateFormat.format(date);
+
+ List<Map<String, String>> expectedInstances = new ArrayList<Map<String, String>>();
+ for (Map<String, String> partition : candidatePartitions) {
+ String partDate = partition.get("year") + partition.get("month") + partition.get("day");
+ if (partDate.compareTo(startDate) < 0) {
+ expectedInstances.add(partition);
instancePaths.append("[")
- .append(candidatePartition)
- .append(", in],");
+ .append(partition.values().toString().replace("," , ";"))
+ .append("; in],");
}
}
@@ -335,6 +517,25 @@ public class TableStorageFeedEvictorIT {
}
}
+ private void verifyMultiColDatedFSPartitionsAreDeleted(List<Map<String, String>> candidatePartitions,
+ Date date, String timeZone) throws Exception {
+
+ FileSystem fs = new Path(MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION).getFileSystem(new Configuration());
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
+ dateFormat.setTimeZone(TimeZone.getTimeZone(timeZone));
+ String startDate = dateFormat.format(date);
+
+ for (Map<String, String> partition : candidatePartitions) {
+ String partDate = partition.get("year") + partition.get("month") + partition.get("day");
+ final String path = MULTI_COL_DATED_EXTERNAL_TABLE_LOCATION
+ + partition.get("year") + "/" + partition.get("month") + "/" + partition.get("day") + "/region=in";
+ if (partDate.compareTo(startDate) < 0 && fs.exists(new Path(path))) {
+ Assert.fail("Expecting " + path + " to be deleted");
+ }
+ }
+ }
+
private String readLogFile(Path logFile) throws IOException {
ByteArrayOutputStream writer = new ByteArrayOutputStream();
InputStream date = logFile.getFileSystem(new Configuration()).open(logFile);
@@ -342,6 +543,25 @@ public class TableStorageFeedEvictorIT {
return writer.toString();
}
+ // instance paths could be deleted in any order; compare the list of evicted paths
+ private void validateInstancePaths(String actualInstancesEvicted, String expectedInstancePaths) {
+ String[] actualEvictedPathStr = actualInstancesEvicted.split("=");
+ String[] expectedEvictedPathStr = expectedInstancePaths.split("=");
+ if (actualEvictedPathStr.length == 1) {
+ Assert.assertEquals(expectedEvictedPathStr.length, 1);
+ } else {
+ Assert.assertEquals(actualEvictedPathStr.length, 2);
+ Assert.assertEquals(expectedEvictedPathStr.length, 2);
+
+ String[] actualEvictedPaths = actualEvictedPathStr[1].split(",");
+ String[] expectedEvictedPaths = actualEvictedPathStr[1].split(",");
+ Arrays.sort(actualEvictedPaths);
+ Arrays.sort(expectedEvictedPaths);
+ Assert.assertEquals(actualEvictedPaths, expectedEvictedPaths,
+ "Unexpected number of Logged partitions");
+ }
+ }
+
private static class InMemoryWriter extends PrintStream {
private final StringBuffer buffer = new StringBuffer();