You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/08/09 22:58:58 UTC
[iceberg] branch master updated: Core: Add predicate pushdown for
files metadata table (#2926)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7a3bfed Core: Add predicate pushdown for files metadata table (#2926)
7a3bfed is described below
commit 7a3bfedf25cb74624ed898cec81e3bcadffb2373
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Mon Aug 9 15:58:51 2021 -0700
Core: Add predicate pushdown for files metadata table (#2926)
---
.../java/org/apache/iceberg/BaseMetadataTable.java | 20 ++
.../java/org/apache/iceberg/DataFilesTable.java | 25 +-
.../java/org/apache/iceberg/PartitionsTable.java | 21 +-
.../org/apache/iceberg/TestMetadataTableScans.java | 259 ++++++++++++++-------
4 files changed, 220 insertions(+), 105 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index 81ecaef..0a764f7 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
* deserialization.
*/
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
+ protected static final String PARTITION_FIELD_PREFIX = "partition.";
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final TableOperations ops;
@@ -47,6 +48,25 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
this.name = name;
}
+ /**
+ * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
+ * expression against the given metadata table.
+ * <p>
+ * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
+ * When this spec is used to project an expression for the given metadata table, the projection will remove
+ * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
+ *
+ * @param metadataTableSchena schema of the metadata table
+ * @param spec spec on which the metadata table schema is based
+ * @param partitionPrefix prefix to remove from each field in the partition spec
+ * @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
+ */
+ static PartitionSpec transformSpec(Schema metadataTableSchena, PartitionSpec spec, String partitionPrefix) {
+ PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchena);
+ spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
+ return identitySpecBuilder.build();
+ }
+
abstract MetadataTableType metadataTableType();
protected Table table() {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 145663c..d6b80ee 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -21,9 +21,12 @@ package org.apache.iceberg;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.ManifestEvaluator;
+import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
@@ -109,7 +112,22 @@ public class DataFilesTable extends BaseMetadataTable {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
- return CloseableIterable.transform(manifests, manifest ->
+ // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
+ Expression partitionFilter = Projections
+ .inclusive(
+ transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
+ caseSensitive)
+ .project(rowFilter);
+
+ ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
+ partitionFilter, table().spec(), caseSensitive);
+ CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
+
+ // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
+ // This data task needs to use the table schema, which may not include a partition schema to avoid having an
+ // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
+ // all cases.
+ return CloseableIterable.transform(filtered, manifest ->
new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
}
}
@@ -138,5 +156,10 @@ public class DataFilesTable extends BaseMetadataTable {
public Iterable<FileScanTask> split(long splitSize) {
return ImmutableList.of(this); // don't split
}
+
+ @VisibleForTesting
+ ManifestFile manifest() {
+ return manifest;
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index 0215dfd..285cd0f 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -37,7 +37,6 @@ public class PartitionsTable extends BaseMetadataTable {
private final Schema schema;
static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
- private static final String PARTITION_FIELD_PREFIX = "partition.";
PartitionsTable(TableOperations ops, Table table) {
this(ops, table, table.name() + ".partitions");
@@ -112,7 +111,7 @@ public class PartitionsTable extends BaseMetadataTable {
// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
- .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
+ .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
.project(scan.filter());
ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
@@ -132,24 +131,6 @@ public class PartitionsTable extends BaseMetadataTable {
return manifestGroup.planFiles();
}
- /**
- * This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
- * expression against the partitions table.
- * <p>
- * The resulting partition spec maps partition.X fields to partition X using an identity partition transform. When
- * this spec is used to project an expression for the partitions table, the projection will remove predicates for
- * non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
- *
- * @param partitionTableSchema schema of the partition table
- * @param spec spec on which the partition table schema is based
- * @return a spec used to rewrite partition table filters to partition filters using an inclusive projection
- */
- private static PartitionSpec transformSpec(Schema partitionTableSchema, PartitionSpec spec) {
- PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(partitionTableSchema);
- spec.fields().forEach(pf -> identitySpecBuilder.identity(PARTITION_FIELD_PREFIX + pf.name(), pf.name()));
- return identitySpecBuilder.build();
- }
-
private class PartitionsScan extends StaticTableScan {
PartitionsScan(TableOperations ops, Table table) {
super(ops, table, PartitionsTable.this.schema(), PartitionsTable.this.metadataTableType().name(),
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index a6a411e..9e811f4 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -26,6 +26,7 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
+import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -47,6 +48,21 @@ public class TestMetadataTableScans extends TableTestBase {
super(formatVersion);
}
+ private void preparePartitionedTable() {
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_0)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_1)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_2)
+ .commit();
+ table.newFastAppend()
+ .appendFile(FILE_PARTITION_3)
+ .commit();
+ }
+
@Test
public void testManifestsTableAlwaysIgnoresResiduals() throws IOException {
table.newFastAppend()
@@ -164,18 +180,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanNoFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Types.StructType expected = new Schema(
@@ -194,18 +199,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanAndFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
@@ -220,18 +214,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanLtFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
@@ -247,18 +230,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanOrFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
@@ -274,20 +246,10 @@ public class TestMetadataTableScans extends TableTestBase {
validateIncludesPartitionScan(tasksOr, 3);
}
+
@Test
public void testPartitionsScanNotFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
@@ -300,18 +262,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanInFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
@@ -325,18 +276,7 @@ public class TestMetadataTableScans extends TableTestBase {
@Test
public void testPartitionsTableScanNotNullFilter() {
- table.newFastAppend()
- .appendFile(FILE_PARTITION_0)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_1)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_2)
- .commit();
- table.newFastAppend()
- .appendFile(FILE_PARTITION_3)
- .commit();
+ preparePartitionedTable();
Table partitionsTable = new PartitionsTable(table.ops(), table);
@@ -351,6 +291,137 @@ public class TestMetadataTableScans extends TableTestBase {
}
@Test
+ public void testFilesTableScanNoFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Types.StructType expected = new Schema(
+ required(102, "partition", Types.StructType.of(
+ optional(1000, "data_bucket", Types.IntegerType.get())),
+ "Partition data tuple, schema based on the partition spec")).asStruct();
+
+ TableScan scanNoFilter = dataFilesTable.newScan().select("partition.data_bucket");
+ Assert.assertEquals(expected, scanNoFilter.schema().asStruct());
+ CloseableIterable<FileScanTask> tasksAndEq = scanNoFilter.planFiles();
+
+ Assert.assertEquals(4, Iterables.size(tasksAndEq));
+ validateFileScanTasks(tasksAndEq, 0);
+ validateFileScanTasks(tasksAndEq, 1);
+ validateFileScanTasks(tasksAndEq, 2);
+ validateFileScanTasks(tasksAndEq, 3);
+ }
+
+ @Test
+ public void testFilesTableScanAndFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+ CloseableIterable<FileScanTask> tasksAndEq = scanAndEq.planFiles();
+ Assert.assertEquals(1, Iterables.size(tasksAndEq));
+ validateFileScanTasks(tasksAndEq, 0);
+ }
+
+ @Test
+ public void testFilesTableScanAndFilterWithPlanTasks() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression andEquals = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scanAndEq = dataFilesTable.newScan().filter(andEquals);
+ CloseableIterable<CombinedScanTask> tasksAndEq = scanAndEq.planTasks();
+ Assert.assertEquals(1, Iterables.size(tasksAndEq));
+ validateCombinedScanTasks(tasksAndEq, 0);
+ }
+
+ @Test
+ public void testFilesTableScanLtFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression lt = Expressions.lessThan("partition.data_bucket", 2);
+ TableScan scan = dataFilesTable.newScan().filter(lt);
+ CloseableIterable<FileScanTask> tasksLt = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksLt));
+ validateFileScanTasks(tasksLt, 0);
+ validateFileScanTasks(tasksLt, 1);
+ }
+
+ @Test
+ public void testFilesTableScanOrFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression or = Expressions.or(
+ Expressions.equal("partition.data_bucket", 2),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = dataFilesTable.newScan()
+ .filter(or);
+ CloseableIterable<FileScanTask> tasksOr = scan.planFiles();
+ Assert.assertEquals(4, Iterables.size(tasksOr));
+ validateFileScanTasks(tasksOr, 0);
+ validateFileScanTasks(tasksOr, 1);
+ validateFileScanTasks(tasksOr, 2);
+ validateFileScanTasks(tasksOr, 3);
+ }
+
+ @Test
+ public void testFilesScanNotFilter() {
+ preparePartitionedTable();
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression not = Expressions.not(Expressions.lessThan("partition.data_bucket", 2));
+ TableScan scan = dataFilesTable.newScan()
+ .filter(not);
+ CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksNot));
+ validateFileScanTasks(tasksNot, 2);
+ validateFileScanTasks(tasksNot, 3);
+ }
+
+ @Test
+ public void testFilesTableScanInFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+
+ Expression set = Expressions.in("partition.data_bucket", 2, 3);
+ TableScan scan = dataFilesTable.newScan()
+ .filter(set);
+ CloseableIterable<FileScanTask> tasksNot = scan.planFiles();
+ Assert.assertEquals(2, Iterables.size(tasksNot));
+
+ validateFileScanTasks(tasksNot, 2);
+ validateFileScanTasks(tasksNot, 3);
+ }
+
+ @Test
+ public void testFilesTableScanNotNullFilter() {
+ preparePartitionedTable();
+
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ Expression unary = Expressions.notNull("partition.data_bucket");
+ TableScan scan = dataFilesTable.newScan()
+ .filter(unary);
+ CloseableIterable<FileScanTask> tasksUnary = scan.planFiles();
+ Assert.assertEquals(4, Iterables.size(tasksUnary));
+
+ validateFileScanTasks(tasksUnary, 0);
+ validateFileScanTasks(tasksUnary, 1);
+ validateFileScanTasks(tasksUnary, 2);
+ validateFileScanTasks(tasksUnary, 3);
+ }
+
+ @Test
public void testDataFilesTableSelection() throws IOException {
table.newFastAppend()
.appendFile(FILE_A)
@@ -391,4 +462,24 @@ public class TestMetadataTableScans extends TableTestBase {
StreamSupport.stream(tasks.spliterator(), false).anyMatch(
a -> a.file().partition().get(0, Object.class).equals(partValue)));
}
+
+ private void validateFileScanTasks(CloseableIterable<FileScanTask> fileScanTasks, int partValue) {
+ Assert.assertTrue("File scan tasks do not include correct file",
+ StreamSupport.stream(fileScanTasks.spliterator(), false).anyMatch(t -> {
+ ManifestFile mf = ((DataFilesTable.ManifestReadTask) t).manifest();
+ return manifestHasPartition(mf, partValue);
+ }));
+ }
+
+ private void validateCombinedScanTasks(CloseableIterable<CombinedScanTask> tasks, int partValue) {
+ StreamSupport.stream(tasks.spliterator(), false)
+ .flatMap(c -> c.files().stream().map(t -> ((DataFilesTable.ManifestReadTask) t).manifest()))
+ .anyMatch(m -> manifestHasPartition(m, partValue));
+ }
+
+ private boolean manifestHasPartition(ManifestFile mf, int partValue) {
+ int lower = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound());
+ int upper = Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound());
+ return (lower <= partValue) && (upper >= partValue);
+ }
}