You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/06/22 07:37:18 UTC
[iceberg] branch master updated: Core: Metadata table queries fail if a partition column was reused in V2 (#4662)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a5efb53f3 Core: Metadata table queries fail if a partition column was reused in V2 (#4662)
a5efb53f3 is described below
commit a5efb53f3588306731afed71ad422891a588ccdf
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Wed Jun 22 09:37:12 2022 +0200
Core: Metadata table queries fail if a partition column was reused in V2 (#4662)
---
.../apache/iceberg/BaseUpdatePartitionSpec.java | 32 +++++++++++-
.../test/java/org/apache/iceberg/ScanTestBase.java | 61 ++++++++++++++++++++++
.../org/apache/iceberg/TestMetadataTableScans.java | 6 ++-
.../TestMetadataTablesWithPartitionEvolution.java | 42 +++++++++------
4 files changed, 123 insertions(+), 18 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
index a6a55a5fa..0181b98ac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
+++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
@@ -108,6 +108,35 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
return lastAssignedPartitionId;
}
+ /**
+ * In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field
+ * ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField.
+ * @param sourceTransform pair of source ID and transform for this PartitionField addition
+ * @param name target partition field name, if specified
+ * @return the recycled or newly created partition field
+ */
+ private PartitionField recycleOrCreatePartitionField(Pair<Integer, Transform<?, ?>> sourceTransform, String name) {
+ if (formatVersion == 2 && base != null) {
+ int sourceId = sourceTransform.first();
+ Transform<?, ?> transform = sourceTransform.second();
+
+ Set<PartitionField> allHistoricalFields = Sets.newHashSet();
+ for (PartitionSpec partitionSpec : base.specs()) {
+ allHistoricalFields.addAll(partitionSpec.fields());
+ }
+
+ for (PartitionField field : allHistoricalFields) {
+ if (field.sourceId() == sourceId && field.transform().equals(transform)) {
+ // if target name is specified then consider it too, otherwise not
+ if (name == null || field.name().equals(name)) {
+ return field;
+ }
+ }
+ }
+ }
+ return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
+ }
+
@Override
public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) {
this.caseSensitive = isCaseSensitive;
@@ -157,8 +186,7 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
Preconditions.checkArgument(added == null,
"Cannot add duplicate partition field %s=%s, already added: %s", name, term, added);
- PartitionField newField = new PartitionField(
- sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
+ PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name);
if (newField.name() == null) {
String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
index a800e4032..72136477d 100644
--- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -133,4 +135,63 @@ public abstract class ScanTestBase<T extends Scan<T>> extends TableTestBase {
Assert.assertEquals(2, Iterables.size(scan.planFiles()));
Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
}
+
+ @Test
+ public void testReAddingPartitionField() throws Exception {
+ Assume.assumeTrue(formatVersion == 2);
+ Schema schema = new Schema(
+ required(1, "a", Types.IntegerType.get()),
+ required(2, "b", Types.StringType.get()),
+ required(3, "data", Types.IntegerType.get())
+ );
+ PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build();
+ File dir = temp.newFolder();
+ dir.delete();
+ this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion);
+ table.newFastAppend().appendFile(DataFiles.builder(initialSpec)
+ .withPath("/path/to/data/a.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("a=1")
+ .withRecordCount(1)
+ .build()).commit();
+
+ table.updateSpec().addField("b").removeField("a").commit();
+ table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+ .withPath("/path/to/data/b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("b=1")
+ .withRecordCount(1)
+ .build()).commit();
+
+ table.updateSpec().addField("a").commit();
+ table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+ .withPath("/path/to/data/ab.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("b=1/a=1")
+ .withRecordCount(1)
+ .build()).commit();
+
+ table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+ .withPath("/path/to/data/a2b.parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath("b=1/a=2")
+ .withRecordCount(1)
+ .build()).commit();
+
+ TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1"));
+ try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+ Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size());
+ }
+ }
+
+ TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2));
+ try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+ Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size());
+ }
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index ee00bb8d8..a357487ea 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -514,7 +514,11 @@ public class TestMetadataTableScans extends TableTestBase {
table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
table.refresh();
- table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+ // Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per
+ // https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed
+ // data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require
+ // data_bucket not to be recycled.
+ table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit();
table.refresh();
table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 7519486bc..691e9f6f5 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -482,11 +482,6 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
@Test
public void testPartitionsTableSwitchFields() throws Exception {
- // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
- // In V1, dropped partition fields show separately when field is re-added
- // In V2, re-added field currently conflicts with its deleted form
- Assume.assumeTrue(formatVersion == 1);
-
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
initTable();
Table table = validationCatalog.loadTable(tableIdent);
@@ -531,17 +526,34 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+ sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName);
- assertPartitions(
- ImmutableList.of(
- row(null, "c1", null),
- row(null, "c1", "d1"),
- row(null, "c2", null),
- row(null, "c2", "d2"),
- row("d1", "c1", null),
- row("d2", "c2", null)),
- "STRUCT<data_1000:STRING,category:STRING,data:STRING>",
- PARTITIONS);
+ if (formatVersion == 1) {
+ assertPartitions(
+ ImmutableList.of(
+ row(null, "c1", null),
+ row(null, "c1", "d1"),
+ row(null, "c2", null),
+ row(null, "c2", "d2"),
+ row(null, "c3", "d3"),
+ row("d1", "c1", null),
+ row("d2", "c2", null)),
+ "STRUCT<data_1000:STRING,category:STRING,data:STRING>",
+ PARTITIONS);
+ } else {
+ // In V2 re-adding a former partition field that was part of an older spec will not change its name or its
+ // field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new
+ // partition field addition will result in a new column in this metadata table)
+ assertPartitions(
+ ImmutableList.of(
+ row(null, "c1"),
+ row(null, "c2"),
+ row("d1", "c1"),
+ row("d2", "c2"),
+ row("d3", "c3")),
+ "STRUCT<data:STRING,category:STRING>",
+ PARTITIONS);
+ }
}
@Test