You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/05/24 14:58:45 UTC
[iceberg] branch 0.13.x updated: Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch 0.13.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/0.13.x by this push:
new 5b3e462324 Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)
5b3e462324 is described below
commit 5b3e46232446ef9c1fab719d4c331347fe44fb68
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Tue May 24 07:58:39 2022 -0700
Core: Backport filter pushdown fix for metadata tables with evolved specs to 0.13 (#4520) (#4569)
---
.../java/org/apache/iceberg/BaseMetadataTable.java | 10 +-
.../java/org/apache/iceberg/DataFilesTable.java | 22 ++-
.../java/org/apache/iceberg/PartitionsTable.java | 2 +-
.../org/apache/iceberg/TestMetadataTableScans.java | 204 +++++++++++++++++++++
.../TestMetadataTablesWithPartitionEvolution.java | 148 +++++++++++++--
5 files changed, 358 insertions(+), 28 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
index d9b2f797a4..f4966352ec 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetadataTable.java
@@ -35,7 +35,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
* deserialization.
*/
abstract class BaseMetadataTable implements Table, HasTableOperations, Serializable {
- protected static final String PARTITION_FIELD_PREFIX = "partition.";
private final PartitionSpec spec = PartitionSpec.unpartitioned();
private final SortOrder sortOrder = SortOrder.unsorted();
private final TableOperations ops;
@@ -52,18 +51,17 @@ abstract class BaseMetadataTable implements Table, HasTableOperations, Serializa
* This method transforms the table's partition spec to a spec that is used to rewrite the user-provided filter
* expression against the given metadata table.
* <p>
- * The resulting partition spec maps $partitionPrefix.X fields to partition X using an identity partition transform.
+ * The resulting partition spec maps partition.X fields to partition X using an identity partition transform.
* When this spec is used to project an expression for the given metadata table, the projection will remove
- * predicates for non-partition fields (not in the spec) and will remove the "$partitionPrefix." prefix from fields.
+ * predicates for non-partition fields (not in the spec) and will remove the "partition." prefix from fields.
*
* @param metadataTableSchema schema of the metadata table
* @param spec spec on which the metadata table schema is based
- * @param partitionPrefix prefix to remove from each field in the partition spec
* @return a spec used to rewrite the metadata table filters to partition filters using an inclusive projection
*/
- static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec, String partitionPrefix) {
+ static PartitionSpec transformSpec(Schema metadataTableSchema, PartitionSpec spec) {
PartitionSpec.Builder identitySpecBuilder = PartitionSpec.builderFor(metadataTableSchema).checkConflicts(false);
- spec.fields().forEach(pf -> identitySpecBuilder.identity(partitionPrefix + pf.name(), pf.name()));
+ spec.fields().forEach(pf -> identitySpecBuilder.add(pf.fieldId(), pf.name(), "identity"));
return identitySpecBuilder.build();
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 31c0bea319..c20332b770 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -19,10 +19,12 @@
package org.apache.iceberg;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import java.util.Map;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
-import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
@@ -108,16 +110,16 @@ public class DataFilesTable extends BaseMetadataTable {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
- // use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
- Expression partitionFilter = Projections
- .inclusive(
- transformSpec(fileSchema, table().spec(), PARTITION_FIELD_PREFIX),
- caseSensitive)
- .project(rowFilter);
+ Map<Integer, PartitionSpec> specsById = table().specs();
- ManifestEvaluator manifestEval = ManifestEvaluator.forPartitionFilter(
- partitionFilter, table().spec(), caseSensitive);
- CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests, manifestEval::eval);
+ LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
+ PartitionSpec spec = specsById.get(specId);
+ PartitionSpec transformedSpec = transformSpec(fileSchema, spec);
+ return ManifestEvaluator.forRowFilter(rowFilter, transformedSpec, caseSensitive);
+ });
+
+ CloseableIterable<ManifestFile> filtered = CloseableIterable.filter(manifests,
+ manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));
// Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
// This data task needs to use the table schema, which may not include a partition schema to avoid having an
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index cc9f716a4d..dbf1d39d11 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -111,7 +111,7 @@ public class PartitionsTable extends BaseMetadataTable {
// use an inclusive projection to remove the partition name prefix and filter out any non-partition expressions
Expression partitionFilter = Projections
- .inclusive(transformSpec(scan.schema(), table.spec(), PARTITION_FIELD_PREFIX), caseSensitive)
+ .inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.project(scan.filter());
ManifestGroup manifestGroup = new ManifestGroup(table.io(), snapshot.dataManifests(), snapshot.deleteManifests())
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index af99014df6..3bc4c43860 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -34,6 +34,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -713,6 +714,209 @@ public class TestMetadataTableScans extends TableTestBase {
validateIncludesPartitionScan(tasksAndEq, 0);
}
+ @Test
+ public void testFilesTablePartitionFieldRemovalV1() {
+ Assume.assumeTrue(formatVersion == 1);
+ preparePartitionedTable();
+
+ // Change spec and add two data files
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField("id")
+ .commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files with new spec
+ PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+ data10Key.set(1, 10);
+ DataFile data10 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-10.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data10Key)
+ .build();
+ PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+ data10Key.set(1, 11);
+ DataFile data11 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-11.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data11Key)
+ .build();
+
+ table.newFastAppend().appendFile(data10).commit();
+ table.newFastAppend().appendFile(data11).commit();
+
+ Table metadataTable = new DataFilesTable(table.ops(), table);
+ Expression filter = Expressions.and(
+ Expressions.equal("partition.id", 10),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = metadataTable.newScan().filter(filter);
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+ // All 4 original data files written by old spec, plus one data file written by new spec
+ Assert.assertEquals(5, Iterables.size(tasks));
+
+ filter = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ scan = metadataTable.newScan().filter(filter);
+ tasks = scan.planFiles();
+
+ // 1 original data file written by old spec (V1 filters out new specs which don't have this value)
+ Assert.assertEquals(1, Iterables.size(tasks));
+ }
+
+ @Test
+ public void testFilesTablePartitionFieldRemovalV2() {
+ Assume.assumeTrue(formatVersion == 2);
+ preparePartitionedTable();
+
+ // Change spec and add two data and delete files each
+ table.updateSpec()
+ .removeField(Expressions.bucket("data", 16))
+ .addField("id").commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files and two delete files with new spec
+ DataFile data10 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-10.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id=10")
+ .build();
+ DataFile data11 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-11.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartitionPath("id=11")
+ .build();
+
+ table.newFastAppend().appendFile(data10).commit();
+ table.newFastAppend().appendFile(data11).commit();
+
+ Table metadataTable = new DataFilesTable(table.ops(), table);
+ Expression filter = Expressions.and(
+ Expressions.equal("partition.id", 10),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = metadataTable.newScan().filter(filter);
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+ // All 4 original data files written by old spec, plus one new data file written by new spec
+ Assert.assertEquals(5, Iterables.size(tasks));
+
+ filter = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ scan = metadataTable.newScan().filter(filter);
+ tasks = scan.planFiles();
+
+ // 1 original data files written by old spec, plus both of new data file written by new spec
+ Assert.assertEquals(3, Iterables.size(tasks));
+ }
+
+ @Test
+ public void testFilesTablePartitionFieldAddV1() {
+ Assume.assumeTrue(formatVersion == 1);
+ preparePartitionedTable();
+
+ // Change spec and add two data files
+ table.updateSpec()
+ .addField("id")
+ .commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files with new spec
+ PartitionKey data10Key = new PartitionKey(newSpec, table.schema());
+ data10Key.set(0, 0); // data=0
+ data10Key.set(1, 10); // id=10
+ DataFile data10 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-10.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data10Key)
+ .build();
+ PartitionKey data11Key = new PartitionKey(newSpec, table.schema());
+ data11Key.set(0, 1); // data=0
+ data10Key.set(1, 11); // id=11
+ DataFile data11 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-11.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartition(data11Key)
+ .build();
+
+ table.newFastAppend().appendFile(data10).commit();
+ table.newFastAppend().appendFile(data11).commit();
+
+ Table metadataTable = new DataFilesTable(table.ops(), table);
+ Expression filter = Expressions.and(
+ Expressions.equal("partition.id", 10),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = metadataTable.newScan().filter(filter);
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+ // All 4 original data files written by old spec, plus one new data file written by new spec
+ Assert.assertEquals(5, Iterables.size(tasks));
+
+ filter = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ scan = metadataTable.newScan().filter(filter);
+ tasks = scan.planFiles();
+
+ // 1 original data file written by old spec, plus 1 new data file written by new spec
+ Assert.assertEquals(2, Iterables.size(tasks));
+ }
+
+ @Test
+ public void testPartitionSpecEvolutionAdditiveV2() {
+ Assume.assumeTrue(formatVersion == 2);
+ preparePartitionedTable();
+
+ // Change spec and add two data and delete files each
+ table.updateSpec()
+ .addField("id")
+ .commit();
+ PartitionSpec newSpec = table.spec();
+
+ // Add two data files and two delete files with new spec
+ DataFile data10 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-10.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data_bucket=0/id=10")
+ .build();
+ DataFile data11 = DataFiles.builder(newSpec)
+ .withPath("/path/to/data-11.parquet")
+ .withRecordCount(10)
+ .withFileSizeInBytes(10)
+ .withPartitionPath("data_bucket=1/id=11")
+ .build();
+
+ table.newFastAppend().appendFile(data10).commit();
+ table.newFastAppend().appendFile(data11).commit();
+
+ Table metadataTable = new DataFilesTable(table.ops(), table);
+ Expression filter = Expressions.and(
+ Expressions.equal("partition.id", 10),
+ Expressions.greaterThan("record_count", 0));
+ TableScan scan = metadataTable.newScan().filter(filter);
+ CloseableIterable<FileScanTask> tasks = scan.planFiles();
+
+ // All 4 original data files written by old spec, plus one new data file written by new spec
+ Assert.assertEquals(5, Iterables.size(tasks));
+
+ filter = Expressions.and(
+ Expressions.equal("partition.data_bucket", 0),
+ Expressions.greaterThan("record_count", 0));
+ scan = metadataTable.newScan().filter(filter);
+ tasks = scan.planFiles();
+
+ // 1 original data files written by old spec, plus 1 of new data file written by new spec
+ Assert.assertEquals(2, Iterables.size(tasks));
+ }
+
private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals) throws IOException {
try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 0e690bd27d..a93fef0bad 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -22,8 +22,6 @@ package org.apache.iceberg.spark.source;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.HasTableOperations;
@@ -74,14 +72,40 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
"default-namespace", "default"
),
ORC,
- formatVersion()
+ 1
+ },
+ { "testhive", SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default"
+ ),
+ ORC,
+ 2
+ },
+ { "testhadoop", SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hadoop"
+ ),
+ PARQUET,
+ 1
},
{ "testhadoop", SparkCatalog.class.getName(),
ImmutableMap.of(
"type", "hadoop"
),
PARQUET,
- formatVersion()
+ 2
+ },
+ { "spark_catalog", SparkSessionCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "clients", "1",
+ "parquet-enabled", "false",
+ "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
+ ),
+ AVRO,
+ 1
},
{ "spark_catalog", SparkSessionCatalog.class.getName(),
ImmutableMap.of(
@@ -92,17 +116,11 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
"cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
),
AVRO,
- formatVersion()
+ 2
}
};
}
- private static int formatVersion() {
- return RANDOM.nextInt(2) + 1;
- }
-
- private static final Random RANDOM = ThreadLocalRandom.current();
-
private final FileFormat fileFormat;
private final int formatVersion;
@@ -189,6 +207,106 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
}
}
+ @Test
+ public void testFilesMetadataTableFilter() throws ParseException {
+ sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
+ "TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
+ initTable();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ // verify the metadata tables while the current spec is still unpartitioned
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ Dataset<Row> df = loadMetadataTable(tableType);
+ Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty());
+ }
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ // verify the metadata tables after adding the first partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row("d2")),
+ "STRUCT<data:STRING>",
+ tableType,
+ "partition.data = 'd2'");
+ }
+
+ table.updateSpec()
+ .addField("category")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+
+ // verify the metadata tables after adding the second partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(ImmutableList.of(row("d2", null), row("d2", "c2")),
+ "STRUCT<data:STRING,category:STRING>",
+ tableType,
+ "partition.data = 'd2'");
+ }
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row("d2", "c2")),
+ "STRUCT<data:STRING,category:STRING>",
+ tableType,
+ "partition.category = 'c2'");
+ }
+
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+
+ // Verify new partitions do not show up for removed 'partition.data=d2' query
+ sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd2')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (4, 'c4', 'd2')", tableName);
+
+ // Verify new partitions do show up for 'partition.category=c2' query
+ sql("INSERT INTO TABLE %s VALUES (5, 'c2', 'd5')", tableName);
+
+ // no new partition should show up for 'data' partition query as partition field has been removed
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row("d2", null), row("d2", "c2")),
+ "STRUCT<data:STRING,category:STRING>",
+ tableType,
+ "partition.data = 'd2'");
+ }
+ // new partition shows up from 'category' partition field query
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, "c2"), row("d2", "c2")),
+ "STRUCT<data:STRING,category:STRING>",
+ tableType,
+ "partition.category = 'c2'");
+ }
+
+ table.updateSpec()
+ .renameField("category", "category_another_name")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+
+ // Verify new partitions do show up for 'category=c2' query
+ sql("INSERT INTO TABLE %s VALUES (6, 'c2', 'd6')", tableName);
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, "c2"), row(null, "c2"), row("d2", "c2")),
+ "STRUCT<data:STRING,category_another_name:STRING>",
+ tableType,
+ "partition.category_another_name = 'c2'");
+ }
+ }
+
@Test
public void testEntriesMetadataTable() throws ParseException {
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
@@ -299,7 +417,15 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
MetadataTableType tableType) throws ParseException {
+ assertPartitions(expectedPartitions, expectedTypeAsString, tableType, null);
+ }
+
+ private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
+ MetadataTableType tableType, String filter) throws ParseException {
Dataset<Row> df = loadMetadataTable(tableType);
+ if (filter != null) {
+ df = df.filter(filter);
+ }
DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString);
switch (tableType) {