You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/08/09 22:58:58 UTC

[iceberg] branch master updated: Core: Add predicate pushdown for files metadata table (#2926)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a3bfed  Core: Add predicate pushdown for files metadata table (#2926)
7a3bfed is described below

commit 7a3bfedf25cb74624ed898cec81e3bcadffb2373
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Mon Aug 9 15:58:51 2021 -0700

    Core: Add predicate pushdown for files metadata table (#2926)
---
 .../java/org/apache/iceberg/BaseMetadataTable.java |  20 ++
 .../java/org/apache/iceberg/DataFilesTable.java    |  25 +-
 .../java/org/apache/iceberg/PartitionsTable.java   |  21 +-
 .../org/apache/iceberg/TestMetadataTableScans.java | 259 ++++++++++++++-------
 4 files changed, 220 insertions(+), 105 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 81ecaef..0a764f7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
  * deserialization.
  */
 abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
+  protected static final String PARTITION_FIELD_PREFIX = "partition.";
   private final PartitionSpec spec = PartitionSpec.unpartitioned();
   private final SortOrder sortOrder = SortOrder.unsorted();
   private final TableOperations ops;
@@ -47,6 +48,25 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
     this.name = name;
   }
 
+  /**
+   * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
+   * expression against the given metadata table.
+   * <p>
+   * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
+   * When this spec is used to project an expression for the given metadata table, the projection will remove
+   * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
+   *
+   * @param metadataTableSchena schema of the metadata table
+   * @param spec spec on which the metadata table schema is based
+   * @param partitionPrefix prefix to remove from each field in the partition spec
+   * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
+   */
+  static PartitionSpec transformSpec(Schema metadataTableSchena, PartitionSpec spec, String partitionPrefix) {
+    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchena);
+    spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
+    return identitySpecBuilder.build();
+  }
+
   abstract MetadataTableType metadataTableType();
 
   protected Table table() {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 145663c..d6b80ee 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -21,9 +21,12 @@ package org.apache.iceberg;
 
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
@@ -109,7 +112,22 @@ public class DataFilesTable extends BaseMetadataTable {
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      return CloseableIterable.transform(manifests, manifest ->
+      // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
+      Expression partitionFilter = Projections
+          .inclusive(
+              transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
+              caseSensitive)
+          .project(rowFilter);
+
+      ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
+          partitionFilter, table().spec(), caseSensitive);
+      CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
+
+      // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
+      // This data task needs to use the table schema, which may not include a partition schema to avoid having an
+      // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
+      // all cases.
+      return CloseableIterable.transform(filtered, manifest ->
           new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
     }
   }
@@ -138,5 +156,10 @@ public class DataFilesTable extends BaseMetadataTable {
     public Iterable<FileScanTask> split(long splitSize) {
       return ImmutableList.of(this); // don't split
     }
+
+    @VisibleForTesting
+    ManifestFile manifest() {
+      return manifest;
+    }
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 0215dfd..285cd0f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -37,7 +37,6 @@ public class PartitionsTable extends BaseMetadataTable {
   private final Schema schema;
   static final boolean PLAN_SCANS_WITH_WORKER_POOL =
       SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
-  private static final String PARTITION_FIELD_PREFIX = "partition.";
 
   PartitionsTable(TableOperations ops, Table table) {
     this(ops, table, table.name() + ".partitions");
@@ -112,7 +111,7 @@ public class PartitionsTable extends BaseMetadataTable {
 
     // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
     Expression partitionFilter = Projections
-        .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
+        .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
         .project(scan.filter());
 
     ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
@@ -132,24 +131,6 @@ public class PartitionsTable extends BaseMetadataTable {
     return manifestGroup.planFiles();
   }
 
-  /**
-   * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
-   * expression against the partitions table.
-   * <p>
-   * The resulting partition spec maps partition.X fields to partition X using an identity partition transform. When
-   * this spec is used to project an expression for the partitions table, the projection will remove predicates for
-   * non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
-   *
-   * @param partitionTableSchema schema of the partition table
-   * @param spec spec on which the partition table schema is based
-   * @return a spec used to rewrite partition table filters to partition filters using an inclusive projection
-   */
-  private static PartitionSpec transformSpec(Schema partitionTableSchema, PartitionSpec spec) {
-    PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(partitionTableSchema);
-    spec.fields().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name()));
-    return identitySpecBuilder.build();
-  }
-
   private class PartitionsScan extends StaticTableScan {
     PartitionsScan(TableOperations ops, Table table) {
       super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this.metadataTableType().name(),
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index a6a411e..9e811f4 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Test;
@@ -47,6 +48,21 @@ public class TestMetadataTableScans extends TableTestBase {
     super(formatVersion);
   }
 
+  private void preparePartitionedTable() {
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_0)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_1)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_2)
+        .commit();
+    table.newFastAppend()
+        .appendFile(FILE_PARTITION_3)
+        .commit();
+  }
+
   @Test
   public void testManifestsTableAlwaysIgnoresResiduals() throws IOException {
     table.newFastAppend()
@@ -164,18 +180,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanNoFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
     Types.StructType expected = new Schema(
@@ -194,18 +199,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanAndFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
@@ -220,18 +214,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanLtFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
@@ -247,18 +230,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanOrFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
@@ -274,20 +246,10 @@ public class TestMetadataTableScans extends TableTestBase {
     validateIncludesPartitionScan(tasksOr, 3);
   }
 
+
   @Test
   public void testPartitionsScanNotFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
     Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
@@ -300,18 +262,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanInFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
@@ -325,18 +276,7 @@ public class TestMetadataTableScans extends TableTestBase {
 
   @Test
   public void testPartitionsTableScanNotNullFilter() {
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_0)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_1)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_2)
-        .commit();
-    table.newFastAppend()
-        .appendFile(FILE_PARTITION_3)
-        .commit();
+    preparePartitionedTable();
 
     Table partitionsTable = new PartitionsTable(table.ops(), table);
 
@@ -351,6 +291,137 @@ public class TestMetadataTableScans extends TableTestBase {
   }
 
   @Test
+  public void testFilesTableScanNoFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Types.StructType expected = new Schema(
+        required(102, "partition", Types.StructType.of(
+            optional(1000, "data_bucket", Types.IntegerType.get())),
+            "Partition data tuple, schema based on the partition spec")).asStruct();
+
+    TableScan scanNoFilter = dataFilesTable.newScan().select("partition.data_bucket");
+    Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+    CloseableIterable<FileScanTask> tasksAndEq = scanNoFilter.planFiles();
+
+    Assert.assertEquals(4, Iterables.size(tasksAndEq));
+    validateFileScanTasks(tasksAndEq, 0);
+    validateFileScanTasks(tasksAndEq, 1);
+    validateFileScanTasks(tasksAndEq, 2);
+    validateFileScanTasks(tasksAndEq, 3);
+  }
+
+  @Test
+  public void testFilesTableScanAndFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+    CloseableIterable<FileScanTask> tasksAndEq = scanAndEq.planFiles();
+    Assert.assertEquals(1, Iterables.size(tasksAndEq));
+    validateFileScanTasks(tasksAndEq, 0);
+  }
+
+  @Test
+  public void testFilesTableScanAndFilterWithPlanTasks() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression andEquals = Expressions.and(
+        Expressions.equal("partition.data_bucket", 0),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+    CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+    Assert.assertEquals(1, Iterables.size(tasksAndEq));
+    validateCombinedScanTasks(tasksAndEq, 0);
+  }
+
+  @Test
+  public void testFilesTableScanLtFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression lt = Expressions.lessThan("partition.data_bucket", 2);
+    TableScan scan = dataFilesTable.newScan().filter(lt);
+    CloseableIterable<FileScanTask> tasksLt = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksLt));
+    validateFileScanTasks(tasksLt, 0);
+    validateFileScanTasks(tasksLt, 1);
+  }
+
+  @Test
+  public void testFilesTableScanOrFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression or = Expressions.or(
+        Expressions.equal("partition.data_bucket", 2),
+        Expressions.greaterThan("record_count", 0));
+    TableScan scan = dataFilesTable.newScan()
+        .filter(or);
+    CloseableIterable<FileScanTask> tasksOr = scan.planFiles();
+    Assert.assertEquals(4, Iterables.size(tasksOr));
+    validateFileScanTasks(tasksOr, 0);
+    validateFileScanTasks(tasksOr, 1);
+    validateFileScanTasks(tasksOr, 2);
+    validateFileScanTasks(tasksOr, 3);
+  }
+
+  @Test
+  public void testFilesScanNotFilter() {
+    preparePartitionedTable();
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+    TableScan scan = dataFilesTable.newScan()
+        .filter(not);
+    CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksNot));
+    validateFileScanTasks(tasksNot, 2);
+    validateFileScanTasks(tasksNot, 3);
+  }
+
+  @Test
+  public void testFilesTableScanInFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+    Expression set = Expressions.in("partition.data_bucket", 2, 3);
+    TableScan scan = dataFilesTable.newScan()
+          .filter(set);
+    CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+    Assert.assertEquals(2, Iterables.size(tasksNot));
+
+    validateFileScanTasks(tasksNot, 2);
+    validateFileScanTasks(tasksNot, 3);
+  }
+
+  @Test
+  public void testFilesTableScanNotNullFilter() {
+    preparePartitionedTable();
+
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    Expression unary = Expressions.notNull("partition.data_bucket");
+    TableScan scan = dataFilesTable.newScan()
+        .filter(unary);
+    CloseableIterable<FileScanTask> tasksUnary = scan.planFiles();
+    Assert.assertEquals(4, Iterables.size(tasksUnary));
+
+    validateFileScanTasks(tasksUnary, 0);
+    validateFileScanTasks(tasksUnary, 1);
+    validateFileScanTasks(tasksUnary, 2);
+    validateFileScanTasks(tasksUnary, 3);
+  }
+
+  @Test
   public void testDataFilesTableSelection() throws IOException {
     table.newFastAppend()
         .appendFile(FILE_A)
@@ -391,4 +462,24 @@ public class TestMetadataTableScans extends TableTestBase {
         StreamSupport.stream(tasks.spliterator(), false).anyMatch(
             a -> a.file().partition().get(0, Object.class).equals(partValue)));
   }
+
+  private void validateFileScanTasks(CloseableIterable<FileScanTask> fileScanTasks, int partValue) {
+    Assert.assertTrue("File scan tasks do not include correct file",
+        StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> {
+          ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest();
+          return manifestHasPartition(mf, partValue);
+        }));
+  }
+
+  private void validateCombinedScanTasks(CloseableIterable<CombinedScanTask> tasks, int partValue) {
+    StreamSupport.stream(tasks.spliterator(), false)
+        .flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest()))
+        .anyMatch(m -> manifestHasPartition(m, partValue));
+  }
+
+  private boolean manifestHasPartition(ManifestFile mf, int partValue) {
+    int lower = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound());
+    int upper = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound());
+    return (lower <= partValue) && (upper >= partValue);
+  }
 }