You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2016/07/28 00:23:58 UTC

drill git commit: DRILL-4786: Read the metadata cache file from the least common ancestor directory when multiple partitions are selected.

Repository: drill
Updated Branches:
  refs/heads/master 4b362f089 -> 69a44ed79


DRILL-4786: Read the metadata cache file from the least common ancestor directory when multiple partitions are selected.

close apache/drill#553


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/69a44ed7
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/69a44ed7
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/69a44ed7

Branch: refs/heads/master
Commit: 69a44ed79e4a9ea242df2d5ce7a9b3f042120676
Parents: 4b362f0
Author: Aman Sinha <as...@maprtech.com>
Authored: Fri Jul 22 16:42:09 2016 -0700
Committer: Aman Sinha <as...@maprtech.com>
Committed: Wed Jul 27 17:21:05 2016 -0700

----------------------------------------------------------------------
 .../planner/AbstractPartitionDescriptor.java    |  2 +-
 .../planner/FileSystemPartitionDescriptor.java  |  9 +-
 .../drill/exec/planner/PartitionDescriptor.java |  2 +-
 .../logical/partition/PruneScanRule.java        | 91 ++++++++++++--------
 .../drill/exec/store/dfs/FileSelection.java     | 11 ++-
 .../exec/store/parquet/ParquetFormatPlugin.java |  3 +-
 .../exec/store/parquet/ParquetGroupScan.java    |  7 +-
 .../store/parquet/TestParquetMetadataCache.java | 60 +++++++++++--
 8 files changed, 134 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index 9879492..d8872dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -58,7 +58,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
   }
 
   @Override
-  public boolean supportsSinglePartOptimization() {
+  public boolean supportsMetadataCachePruning() {
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index ba18bbe..370fb6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -267,8 +267,13 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
   }
 
   @Override
-  public boolean supportsSinglePartOptimization() {
-    return true;
+  public boolean supportsMetadataCachePruning() {
+    final Object selection = this.table.getSelection();
+    if (selection instanceof FormatSelection
+        && ((FormatSelection)selection).getSelection().getCacheFileRoot() != null) {
+      return true;
+    }
+    return false;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 4d1bfdd..83cb146 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -96,7 +96,7 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
   public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
       boolean wasAllPartitionsPruned) throws Exception;
 
-  public boolean supportsSinglePartOptimization();
+  public boolean supportsMetadataCachePruning();
 
   public String getBaseTableLocation();
 

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 82e3bb7..d3e38fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -48,7 +48,6 @@ import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionDescriptor;
 import org.apache.drill.exec.planner.PartitionLocation;
-import org.apache.drill.exec.planner.SimplePartitionLocation;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
 import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -216,10 +215,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     int batchIndex = 0;
     PartitionLocation firstLocation = null;
     LogicalExpression materializedExpr = null;
-    boolean checkForSingle = descriptor.supportsSinglePartOptimization();
-    boolean isSinglePartition = true;
     String[] spInfo = null;
     int maxIndex = -1;
+    BitSet matchBitSet = new BitSet();
 
     // Outer loop: iterate over a list of batches of PartitionLocations
     for (List<PartitionLocation> partitions : descriptor) {
@@ -279,41 +277,34 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
         int recordCount = 0;
         int qualifiedCount = 0;
 
-        if (checkForSingle &&
+        if (descriptor.supportsMetadataCachePruning() &&
             partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
           // Inner loop: within each batch iterate over the PartitionLocations
           for (PartitionLocation part : partitions) {
             assert part.isCompositePartition();
             if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
               newPartitions.add(part);
-              if (isSinglePartition) { // only need to do this if we are already single partition
-                // compose the array of partition values for the directories that are referenced by filter:
-                // e.g suppose the dir hierarchy is year/quarter/month and the query is:
-                //     SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
-                // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
-                // Note that we are not using the PartitionLocation here but composing a different list because
-                // we are only interested in the directory columns that are referenced in the filter condition. not
-                // the SELECT list or other parts of the query.
-                Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
-                String[] parts = p.getLeft();
-                int tmpIndex = p.getRight();
-                if (spInfo == null) {
-                  for (int j = 0; j <= tmpIndex; j++) {
-                    if (parts[j] == null) { // prefixes should be non-null
-                      isSinglePartition = false;
-                      break;
-                    }
+              // Rather than using the PartitionLocation, get the array of partition values for the directories that are
+              // referenced by the filter since we are not interested in directory references in other parts of the query.
+              Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
+              String[] parts = p.getLeft();
+              int tmpIndex = p.getRight();
+              maxIndex = Math.max(maxIndex, tmpIndex);
+              if (spInfo == null) { // initialization
+                spInfo = parts;
+                for (int j = 0; j <= tmpIndex; j++) {
+                  if (parts[j] != null) {
+                    matchBitSet.set(j);
                   }
-                  spInfo = parts;
-                  maxIndex = tmpIndex;
-                } else if (maxIndex != tmpIndex) {
-                  isSinglePartition = false;
-                } else {
-                  // we only want to compare until the maxIndex inclusive since subsequent values would be null
-                  for (int j = 0; j <= maxIndex; j++) {
-                    if (!spInfo[j].equals(parts[j])) {
-                      isSinglePartition = false;
-                      break;
+                }
+              } else {
+                // compare the new partition with existing partition
+                for (int j=0; j <= tmpIndex; j++) {
+                  if (parts[j] == null || spInfo[j] == null) { // nulls don't match
+                    matchBitSet.clear(j);
+                  } else {
+                    if (!parts[j].equals(spInfo[j])) {
+                      matchBitSet.clear(j);
                     }
                   }
                 }
@@ -387,16 +378,35 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
       condition = condition.accept(reverseVisitor);
       pruneCondition = pruneCondition.accept(reverseVisitor);
 
-      if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
+      if (descriptor.supportsMetadataCachePruning() && !wasAllPartitionsPruned) {
         // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
-        String path = "";
-        for (int j = 0; j <= maxIndex; j++) {
-          path += "/" + spInfo[j];
+        int index = -1;
+        if (!matchBitSet.isEmpty()) {
+          String path = "";
+          index = matchBitSet.length() - 1;
+
+          for (int j = 0; j < matchBitSet.length(); j++) {
+            if (!matchBitSet.get(j)) {
+              // stop at the first index with no match and use the immediate
+              // previous index
+              index = j-1;
+              break;
+            }
+          }
+          for (int j=0; j <= index; j++) {
+            path += "/" + spInfo[j];
+          }
+          cacheFileRoot = descriptor.getBaseTableLocation() + path;
+        }
+        if (index != maxIndex) {
+          // if multiple partitions are being selected, we should not drop the filter
+          // since we are reading the cache file at a parent/ancestor level
+          canDropFilter = false;
         }
-        cacheFileRoot = descriptor.getBaseTableLocation() + path;
+
       }
 
-      RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
+      RelNode inputRel = descriptor.supportsMetadataCachePruning() ?
           descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
             descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
 
@@ -418,6 +428,13 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
     }
   }
 
+  /** Compose the array of partition values for the directories that are referenced by filter:
+   *  e.g suppose the dir hierarchy is year/quarter/month and the query is:
+   *     SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
+   * then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
+   * If the query filter condition is WHERE dir1 = 'Q2'  (i.e no dir0 condition) then the array will
+   * have [null, 'Q2', null]
+   */
   private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet,
       Map<Integer, Integer> partitionMap,
       ValueVector[] vectors,

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index d357c39..8654749 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -326,7 +326,8 @@ public class FileSelection {
     return FileSelection.create(statuses, files, root, null, false);
   }
 
-  public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection) {
+  public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
+      final String cacheFileRoot) {
     final String root = selection.getSelectionRoot();
     if (Strings.isNullOrEmpty(root)) {
       throw new DrillRuntimeException("Selection root is null or empty" + root);
@@ -351,7 +352,9 @@ public class FileSelection {
     // final URI uri = dirPaths.get(0).toUri();
     final URI uri = selection.getFileStatuses().get(0).getPath().toUri();
     final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
-    return new FileSelection(null, dirs, path.toString());
+    FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
+    fileSel.setHadWildcard(selection.hadWildcard());
+    return fileSel;
   }
 
   private static Path handleWildCard(final String root) {
@@ -395,4 +398,8 @@ public class FileSelection {
     return this.hadWildcard;
   }
 
+  public String getCacheFileRoot() {
+    return cacheFileRoot;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 20c7312..5174893 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -216,7 +216,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
         if (fs.exists(dirMetaPath)) {
           ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath.toString());
           if (mDirs.getDirectories().size() > 0) {
-            FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection);
+            FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
+                selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
             dirSelection.setExpandedPartial();
             return new DynamicDrillTable(fsPlugin, storageEngineName, userName,
                 new FormatSelection(plugin.getConfig(), dirSelection));

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index b838472..f666439 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -584,12 +584,17 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
         fileSet.add(file.getPath());
       }
 
-    } else if (selection.isExpandedPartial() && cacheFileRoot != null) {
+    } else if (selection.isExpandedPartial() && !selection.hadWildcard() &&
+        cacheFileRoot != null) {
       this.parquetTableMetadata = Metadata.readBlockMeta(fs, metaFilePath.toString());
       if (selection.wasAllPartitionsPruned()) {
         // if all partitions were previously pruned, we only need to read 1 file (for the schema)
         fileSet.add(this.parquetTableMetadata.getFiles().get(0).getPath());
       } else {
+        // we are here if the selection is in the expanded_partial state (i.e it has directories).  We get the
+        // list of files from the metadata cache file that is present in the cacheFileRoot directory and populate
+        // the fileSet. However, this is *not* the final list of files that will be scanned in execution since the
+        // second phase of partition pruning will apply on the files and modify the file selection appropriately.
         for (Metadata.ParquetFileMetadata file : this.parquetTableMetadata.getFiles()) {
           fileSet.add(file.getPath());
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/69a44ed7/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index a5c0730..342ee91 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -53,7 +53,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
         dataDir2);
   }
 
-  @Test // also a negative test case for DRILL-4530
+  @Test
   public void testPartitionPruningWithMetadataCache_1() throws Exception {
     test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName1));
     checkForMetadataFile(tableName1);
@@ -67,11 +67,9 @@ public class TestParquetMetadataCache extends PlanTestBase {
     assertEquals(expectedRowCount, actualRowCount);
     String numFilesPattern = "numFiles=" + expectedNumFiles;
     String usedMetaPattern = "usedMetadataFile=true";
-    // since there are 2 or more sub-partitions the single partition cache file optimization does not apply
-    // and cacheFileRoot should point to the top level selectionRoot
-    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName1);
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1994", getDfsTestTmpSchemaLocation(), tableName1);
     PlanTestBase.testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {"Filter"});
+        new String[] {});
   }
 
   @Test // DRILL-3917, positive test case for DRILL-4530
@@ -280,7 +278,7 @@ public class TestParquetMetadataCache extends PlanTestBase {
     String usedMetaPattern = "usedMetadataFile=true";
     String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName2);
     PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
-        new String[] {"Filter"});
+        new String[] {});
   }
 
   @Test // DRILL-4530  // non-existent partition (1 subdirectory's cache file will still be read for schema)
@@ -325,6 +323,56 @@ public class TestParquetMetadataCache extends PlanTestBase {
         new String[] {});
   }
 
+  @Test // DRILL-4786
+  public void testDrill4786_1() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir0=1995 and dir1 in ('Q1', 'Q2')",
+        getDfsTestTmpSchemaLocation(), tableName2);
+
+    int expectedRowCount = 40;
+    int expectedNumFiles = 4;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s/1995", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {});
+
+  }
+
+  @Test // DRILL-4786
+  public void testDrill4786_2() throws Exception {
+    // create metadata cache
+    test(String.format("refresh table metadata dfs_test.`%s/%s`", getDfsTestTmpSchemaLocation(), tableName2));
+    checkForMetadataFile(tableName2);
+
+    // run query and check correctness
+    String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/%s` " +
+            " where dir0 in (1994, 1995) and dir1 = 'Q3'",
+        getDfsTestTmpSchemaLocation(), tableName2);
+
+    int expectedRowCount = 40;
+    int expectedNumFiles = 4;
+
+    int actualRowCount = testSql(query1);
+    assertEquals(expectedRowCount, actualRowCount);
+    String numFilesPattern = "numFiles=" + expectedNumFiles;
+    String usedMetaPattern = "usedMetadataFile=true";
+    String cacheFileRootPattern = String.format("cacheFileRoot=%s/%s", getDfsTestTmpSchemaLocation(), tableName2);
+    PlanTestBase.testPlanMatchingPatterns(query1, new String[]{numFilesPattern, usedMetaPattern, cacheFileRootPattern},
+        new String[] {});
+
+  }
+
+
+
   private void checkForMetadataFile(String table) throws Exception {
     String tmpDir = getDfsTestTmpSchemaLocation();
     String metaFile = Joiner.on("/").join(tmpDir, table, Metadata.METADATA_FILENAME);