You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2022/06/22 07:37:18 UTC

[iceberg] branch master updated: Core: Metadata table queries fail if a partition column was reused in V2 (#4662)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a5efb53f3 Core: Metadata table queries fail if a partition column was reused in V2 (#4662)
a5efb53f3 is described below

commit a5efb53f3588306731afed71ad422891a588ccdf
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Wed Jun 22 09:37:12 2022 +0200

    Core: Metadata table queries fail if a partition column was reused in V2 (#4662)
---
 .../apache/iceberg/BaseUpdatePartitionSpec.java    | 32 +++++++++++-
 .../test/java/org/apache/iceberg/ScanTestBase.java | 61 ++++++++++++++++++++++
 .../org/apache/iceberg/TestMetadataTableScans.java |  6 ++-
 .../TestMetadataTablesWithPartitionEvolution.java  | 42 +++++++++------
 4 files changed, 123 insertions(+), 18 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
index a6a55a5fa..0181b98ac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
+++ b/core/src/main/java/org/apache/iceberg/BaseUpdatePartitionSpec.java
@@ -108,6 +108,35 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
     return lastAssignedPartitionId;
   }
 
+  /**
+   * In V2 it searches for a similar partition field in historical partition specs. Tries to match on source field
+   * ID, transform type and target name (optional). If not found or in V1 cases it creates a new PartitionField.
+   * @param sourceTransform pair of source ID and transform for this PartitionField addition
+   * @param name target partition field name, if specified
+   * @return the recycled or newly created partition field
+   */
+  private PartitionField recycleOrCreatePartitionField(Pair<Integer, Transform<?, ?>> sourceTransform, String name) {
+    if (formatVersion == 2 && base != null) {
+      int sourceId = sourceTransform.first();
+      Transform<?, ?> transform = sourceTransform.second();
+
+      Set<PartitionField> allHistoricalFields = Sets.newHashSet();
+      for (PartitionSpec partitionSpec : base.specs()) {
+        allHistoricalFields.addAll(partitionSpec.fields());
+      }
+
+      for (PartitionField field : allHistoricalFields) {
+        if (field.sourceId() == sourceId && field.transform().equals(transform)) {
+          // if target name is specified then consider it too, otherwise not
+          if (name == null || field.name().equals(name)) {
+            return field;
+          }
+        }
+      }
+    }
+    return new PartitionField(sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
+  }
+
   @Override
   public UpdatePartitionSpec caseSensitive(boolean isCaseSensitive) {
     this.caseSensitive = isCaseSensitive;
@@ -157,8 +186,7 @@ class BaseUpdatePartitionSpec implements UpdatePartitionSpec {
     Preconditions.checkArgument(added == null,
         "Cannot add duplicate partition field %s=%s, already added: %s", name, term, added);
 
-    PartitionField newField = new PartitionField(
-        sourceTransform.first(), assignFieldId(), name, sourceTransform.second());
+    PartitionField newField = recycleOrCreatePartitionField(sourceTransform, name);
     if (newField.name() == null) {
       String partitionName = PartitionSpecVisitor.visit(schema, newField, PartitionNameGenerator.INSTANCE);
       newField = new PartitionField(newField.sourceId(), newField.fieldId(), partitionName, newField.transform());
diff --git a/core/src/test/java/org/apache/iceberg/ScanTestBase.java b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
index a800e4032..72136477d 100644
--- a/core/src/test/java/org/apache/iceberg/ScanTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/ScanTestBase.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.concurrent.Executors;
@@ -28,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -133,4 +135,63 @@ public abstract class ScanTestBase<T extends Scan<T>> extends TableTestBase {
     Assert.assertEquals(2, Iterables.size(scan.planFiles()));
     Assert.assertTrue("Thread should be created in provided pool", planThreadsIndex.get() > 0);
   }
+
+  @Test
+  public void testReAddingPartitionField() throws Exception {
+    Assume.assumeTrue(formatVersion == 2);
+    Schema schema = new Schema(
+        required(1, "a", Types.IntegerType.get()),
+        required(2, "b", Types.StringType.get()),
+        required(3, "data", Types.IntegerType.get())
+    );
+    PartitionSpec initialSpec = PartitionSpec.builderFor(schema).identity("a").build();
+    File dir = temp.newFolder();
+    dir.delete();
+    this.table = TestTables.create(dir, "test_part_evolution", schema, initialSpec, formatVersion);
+    table.newFastAppend().appendFile(DataFiles.builder(initialSpec)
+        .withPath("/path/to/data/a.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("a=1")
+        .withRecordCount(1)
+        .build()).commit();
+
+    table.updateSpec().addField("b").removeField("a").commit();
+    table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+        .withPath("/path/to/data/b.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("b=1")
+        .withRecordCount(1)
+        .build()).commit();
+
+    table.updateSpec().addField("a").commit();
+    table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+        .withPath("/path/to/data/ab.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("b=1/a=1")
+        .withRecordCount(1)
+        .build()).commit();
+
+    table.newFastAppend().appendFile(DataFiles.builder(table.spec())
+        .withPath("/path/to/data/a2b.parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath("b=1/a=2")
+        .withRecordCount(1)
+        .build()).commit();
+
+    TableScan scan1 = table.newScan().filter(Expressions.equal("b", "1"));
+    try (CloseableIterable<CombinedScanTask> tasks = scan1.planTasks()) {
+      Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        Assert.assertEquals("All 4 files should match b=1 filter", 4, combinedScanTask.files().size());
+      }
+    }
+
+    TableScan scan2 = table.newScan().filter(Expressions.equal("a", 2));
+    try (CloseableIterable<CombinedScanTask> tasks = scan2.planTasks()) {
+      Assert.assertTrue("There should be 1 combined task", Iterables.size(tasks) == 1);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        Assert.assertEquals("a=2 and file without a in spec should match", 2, combinedScanTask.files().size());
+      }
+    }
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index ee00bb8d8..a357487ea 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -514,7 +514,11 @@ public class TestMetadataTableScans extends TableTestBase {
     table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
     table.refresh();
 
-    table.updateSpec().addField(Expressions.bucket("data", 16)).commit();
+    // Here we need to specify target name as 'data_bucket_16'. If unspecified a default name will be generated. As per
+    // https://github.com/apache/iceberg/pull/4868 there's an inconsistency of doing this: in V2, the above removed
+    // data_bucket would be recycled in favor of data_bucket_16. By specifying the target name, we explicitly require
+    // data_bucket not to be recycled.
+    table.updateSpec().addField("data_bucket_16", Expressions.bucket("data", 16)).commit();
     table.refresh();
 
     table.updateSpec().removeField(Expressions.bucket("data", 16)).commit();
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
index 7519486bc..691e9f6f5 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -482,11 +482,6 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
 
   @Test
   public void testPartitionsTableSwitchFields() throws Exception {
-    // Re-added partition fields currently not re-associated: https://github.com/apache/iceberg/issues/4292
-    // In V1, dropped partition fields show separately when field is re-added
-    // In V2, re-added field currently conflicts with its deleted form
-    Assume.assumeTrue(formatVersion == 1);
-
     sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
     initTable();
     Table table = validationCatalog.loadTable(tableIdent);
@@ -531,17 +526,34 @@ public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBa
 
     sql("INSERT INTO TABLE %s VALUES (1, 'c1', 'd1')", tableName);
     sql("INSERT INTO TABLE %s VALUES (2, 'c2', 'd2')", tableName);
+    sql("INSERT INTO TABLE %s VALUES (3, 'c3', 'd3')", tableName);
 
-    assertPartitions(
-        ImmutableList.of(
-            row(null, "c1", null),
-            row(null, "c1", "d1"),
-            row(null, "c2", null),
-            row(null, "c2", "d2"),
-            row("d1", "c1", null),
-            row("d2", "c2", null)),
-        "STRUCT<data_1000:STRING,category:STRING,data:STRING>",
-        PARTITIONS);
+    if (formatVersion == 1) {
+      assertPartitions(
+          ImmutableList.of(
+              row(null, "c1", null),
+              row(null, "c1", "d1"),
+              row(null, "c2", null),
+              row(null, "c2", "d2"),
+              row(null, "c3", "d3"),
+              row("d1", "c1", null),
+              row("d2", "c2", null)),
+          "STRUCT<data_1000:STRING,category:STRING,data:STRING>",
+          PARTITIONS);
+    } else {
+      // In V2 re-adding a former partition field that was part of an older spec will not change its name or its
+      // field ID either, thus values will be collapsed into a single common column (as opposed to V1 where any new
+      // partition field addition will result in a new column in this metadata table)
+      assertPartitions(
+          ImmutableList.of(
+              row(null, "c1"),
+              row(null, "c2"),
+              row("d1", "c1"),
+              row("d2", "c2"),
+              row("d3", "c3")),
+          "STRUCT<data:STRING,category:STRING>",
+          PARTITIONS);
+    }
   }
 
   @Test