You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/07/28 00:23:58 UTC
drill git commit: DRILL-4786: Read the metadata cache file from the
least common ancestor directory when multiple partitions are selected.
Repository: drill
Updated Branches:
refs/heads/master 4b362f089 -> 69a44ed79
DRILL-4786: Read the metadata cache file from the least common ancestor directory when multiple partitions are selected.
close apache/drill#553
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69a44ed7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69a44ed7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69a44ed7
Branch: refs/heads/master
Commit: 69a44ed79e4a9ea242df2d5ce7a9b3f042120676
Parents: 4b362f0
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri Jul 22 16:42:09 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Jul 27 17:21:05 2016 -0700
----------------------------------------------------------------------
.../planner/AbstractPartitionDescriptor.java | 2 +-
.../planner/FileSystemPartitionDescriptor.java | 9 +-
.../drill/exec/planner/PartitionDescriptor.java | 2 +-
.../logical/partition/PruneScanRule.java | 91 ++++++++++++--------
.../drill/exec/store/dfs/FileSelection.java | 11 ++-
.../exec/store/parquet/ParquetFormatPlugin.java | 3 +-
.../exec/store/parquet/ParquetGroupScan.java | 7 +-
.../store/parquet/TestParquetMetadataCache.java | 60 +++++++++++--
8 files changed, 134 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index 9879492..d8872dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -58,7 +58,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
}
@Override
- public boolean supportsSinglePartOptimization() {
+ public boolean supportsMetadataCachePruning() {
return false;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index ba18bbe..370fb6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -267,8 +267,13 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public boolean supportsSinglePartOptimization() {
- return true;
+ public boolean supportsMetadataCachePruning() {
+ final Object selection = this.table.getSelection();
+ if (selection instanceof FormatSelection
+ && ((FormatSelection)selection).getSelection().getCacheFileRoot() != null) {
+ return true;
+ }
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 4d1bfdd..83cb146 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -96,7 +96,7 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
boolean wasAllPartitionsPruned) throws Exception;
- public boolean supportsSinglePartOptimization();
+ public boolean supportsMetadataCachePruning();
public String getBaseTableLocation();
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 82e3bb7..d3e38fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -48,7 +48,6 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
-import org.apache.drill.exec.planner.SimplePartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -216,10 +215,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int batchIndex = 0;
PartitionLocation firstLocation = null;
LogicalExpression materializedExpr = null;
- boolean checkForSingle = descriptor.supportsSinglePartOptimization();
- boolean isSinglePartition = true;
String[] spInfo = null;
int maxIndex = -1;
+ BitSet matchBitSet = new BitSet();
// Outer loop: iterate over a list of batches of PartitionLocations
for (List<PartitionLocation> partitions : descriptor) {
@@ -279,41 +277,34 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int recordCount = 0;
int qualifiedCount = 0;
- if (checkForSingle &&
+ if (descriptor.supportsMetadataCachePruning() &&
partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
// Inner loop: within each batch iterate over the PartitionLocations
for (PartitionLocation part : partitions) {
assert part.isCompositePartition();
if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
newPartitions.add(part);
- if (isSinglePartition) { // only need to do this if we are already single partition
- // compose the array of partition values for the directories that are referenced by filter:
- // e.g suppose the dir hierarchy is year/quarter/month and the query is:
- // SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
- // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
- // Note that we are not using the PartitionLocation here but composing a different list because
- // we are only interested in the directory columns that are referenced in the filter condition. not
- // the SELECT list or other parts of the query.
- Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
- String[] parts = p.getLeft();
- int tmpIndex = p.getRight();
- if (spInfo == null) {
- for (int j = 0; j <= tmpIndex; j++) {
- if (parts[j] == null) { // prefixes should be non-null
- isSinglePartition = false;
- break;
- }
+ // Rather than using the PartitionLocation, get the array of partition values for the directories that are
+ // referenced by the filter since we are not interested in directory references in other parts of the query.
+ Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
+ String[] parts = p.getLeft();
+ int tmpIndex = p.getRight();
+ maxIndex = Math.max(maxIndex, tmpIndex);
+ if (spInfo == null) { // initialization
+ spInfo = parts;
+ for (int j = 0; j <= tmpIndex; j++) {
+ if (parts[j] != null) {
+ matchBitSet.set(j);
}
- spInfo = parts;
- maxIndex = tmpIndex;
- } else if (maxIndex != tmpIndex) {
- isSinglePartition = false;
- } else {
- // we only want to compare until the maxIndex inclusive since subsequent values would be null
- for (int j = 0; j <= maxIndex; j++) {
- if (!spInfo[j].equals(parts[j])) {
- isSinglePartition = false;
- break;
+ }
+ } else {
+ // compare the new partition with existing partition
+ for (int j=0; j <= tmpIndex; j++) {
+ if (parts[j] == null || spInfo[j] == null) { // nulls don't match
+ matchBitSet.clear(j);
+ } else {
+ if (!parts[j].equals(spInfo[j])) {
+ matchBitSet.clear(j);
}
}
}
@@ -387,16 +378,35 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);
- if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
+ if (descriptor.supportsMetadataCachePruning() && !wasAllPartitionsPruned) {
// if metadata cache file could potentially be used, then assign a proper cacheFileRoot
- String path = "";
- for (int j = 0; j <= maxIndex; j++) {
- path += "/" + spInfo[j];
+ int index = -1;
+ if (!matchBitSet.isEmpty()) {
+ String path = "";
+ index = matchBitSet.length() - 1;
+
+ for (int j = 0; j < matchBitSet.length(); j++) {
+ if (!matchBitSet.get(j)) {
+ // stop at the first index with no match and use the immediate
+ // previous index
+ index = j-1;
+ break;
+ }
+ }
+ for (int j=0; j <= index; j++) {
+ path += "/" + spInfo[j];
+ }
+ cacheFileRoot = descriptor.getBaseTableLocation() + path;
+ }
+ if (index != maxIndex) {
+ // if multiple partitions are being selected, we should not drop the filter
+ // since we are reading the cache file at a parent/ancestor level
+ canDropFilter = false;
}
- cacheFileRoot = descriptor.getBaseTableLocation() + path;
+
}
- RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
+ RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
@@ -418,6 +428,13 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ /** Compose the array of partition values for the directories that are referenced by filter:
+ * e.g suppose the dir hierarchy is year/quarter/month and the query is:
+ * SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
+ * then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
+ * If the query filter condition is WHERE dir1 = 'Q2' (i.e no dir0 condition) then the array will
+ * have [null, 'Q2', null]
+ */
private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet,
Map<Integer, Integer> partitionMap,
ValueVector[] vectors,
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index d357c39..8654749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -326,7 +326,8 @@ public class FileSelection {
return FileSelection.create(statuses, files, root, null, false);
}
- public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection) {
+ public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
+ final String cacheFileRoot) {
final String root = selection.getSelectionRoot();
if (Strings.isNullOrEmpty(root)) {
throw new DrillRuntimeException("Selection root is null or empty" + root);
@@ -351,7 +352,9 @@ public class FileSelection {
// final URI uri = dirPaths.get(0).toUri();
final URI uri = selection.getFileStatuses().get(0).getPath().toUri();
final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
- return new FileSelection(null, dirs, path.toString());
+ FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
+ fileSel.setHadWildcard(selection.hadWildcard());
+ return fileSel;
}
private static Path handleWildCard(final String root) {
@@ -395,4 +398,8 @@ public class FileSelection {
return this.hadWildcard;
}
+ public String getCacheFileRoot() {
+ return cacheFileRoot;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 20c7312..5174893 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -216,7 +216,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
if (fs.exists(dirMetaPath)) {
ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString());
if (mDirs.getDirectories().size() > 0) {
- FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection);
+ FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
+ selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
dirSelection.setExpandedPartial();
return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
new FormatSelection(plugin.getConfig(), dirSelection));
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index b838472..f666439 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -584,12 +584,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
fileSet.add(file.getPath());
}
- } else if (selection.isExpandedPartial() && cacheFileRoot != null) {
+ } else if (selection.isExpandedPartial() && !selection.hadWildcard() &&
+ cacheFileRoot != null) {
this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
if (selection.wasAllPartitionsPruned()) {
// if all partitions were previously pruned, we only need to read 1 file (for the schema)
fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
} else {
+ // we are here if the selection is in the expanded_partial state (i.e it has directories). We get the
+ // list of files from the metadata cache file that is present in the cacheFileRoot directory and populate
+ // the fileSet. However, this is *not* the final list of files that will be scanned in execution since the
+ // second phase of partition pruning will apply on the files and modify the file selection appropriately.
for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
fileSet.add(file.getPath());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index a5c0730..342ee91 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -53,7 +53,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
dataDir2);
}
- @Test // also a negative test case for DRILL-4530
+ @Test
public void testPartitionPruningWithMetadataCache_1() throws Exception {
test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName1));
checkForMetadataFile(tableName1);
@@ -67,11 +67,9 @@ public class TestParquetMetadataCache extends PlanTestBase {
assertEquals(expectedRowCount, actualRowCount);
String numFilesPattern = "numFiles=" + expectedNumFiles;
String usedMetaPattern = "usedMetadataFile=true";
- // since there are 2 or more sub-partitions the single partition cache file optimization does not apply
- // and cacheFileRoot should point to the top level selectionRoot
- String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName1);
+ String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1994", getDfsTestTmpSchemaLocation(), tableName1);
PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
- new String[] {"Filter"});
+ new String[] {});
}
@Test // DRILL-3917, positive test case for DRILL-4530
@@ -280,7 +278,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
String usedMetaPattern = "usedMetadataFile=true";
String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName2);
PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
- new String[] {"Filter"});
+ new String[] {});
}
@Test // DRILL-4530 // non-existent partition (1 subdirectory's cache file will still be read for schema)
@@ -325,6 +323,56 @@ public class TestParquetMetadataCache extends PlanTestBase {
new String[] {});
}
+ @Test // DRILL-4786
+ public void testDrill4786_1() throws Exception {
+ // create metadata cache
+ test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+ checkForMetadataFile(tableName2);
+
+ // run query and check correctness
+ String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+ " where dir0=1995 and dir1 in ('Q1', 'Q2')",
+ getDfsTestTmpSchemaLocation(), tableName2);
+
+ int expectedRowCount = 40;
+ int expectedNumFiles = 4;
+
+ int actualRowCount = testSql(query1);
+ assertEquals(expectedRowCount, actualRowCount);
+ String numFilesPattern = "numFiles=" + expectedNumFiles;
+ String usedMetaPattern = "usedMetadataFile=true";
+ String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995", getDfsTestTmpSchemaLocation(), tableName2);
+ PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+ new String[] {});
+
+ }
+
+ @Test // DRILL-4786
+ public void testDrill4786_2() throws Exception {
+ // create metadata cache
+ test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+ checkForMetadataFile(tableName2);
+
+ // run query and check correctness
+ String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+ " where dir0 in (1994, 1995) and dir1 = 'Q3'",
+ getDfsTestTmpSchemaLocation(), tableName2);
+
+ int expectedRowCount = 40;
+ int expectedNumFiles = 4;
+
+ int actualRowCount = testSql(query1);
+ assertEquals(expectedRowCount, actualRowCount);
+ String numFilesPattern = "numFiles=" + expectedNumFiles;
+ String usedMetaPattern = "usedMetadataFile=true";
+ String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName2);
+ PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+ new String[] {});
+
+ }
+
+
+
private void checkForMetadataFile(String table) throws Exception {
String tmpDir = getDfsTestTmpSchemaLocation();
String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);