You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/11/08 14:33:18 UTC

[iceberg] branch master updated: Core: Fix metadata table read failure due to illegal character (#4577)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee8457766 Core: Fix metadata table read failure due to illegal character (#4577)
9ee8457766 is described below

commit 9ee8457766a966252ea1a7d2e40ff2d812df7c4e
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Tue Nov 8 22:33:12 2022 +0800

    Core: Fix metadata table read failure due to illegal character (#4577)
---
 .../apache/iceberg/avro/BuildAvroProjection.java   |   8 +-
 .../apache/iceberg/MetadataTableScanTestBase.java  |  99 +++++++++++++
 .../org/apache/iceberg/TestMetadataTableScans.java |  59 +-------
 ...stMetadataTableScansWithPartitionEvolution.java | 163 +++++++++++++++++++++
 4 files changed, 269 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
index 3f1a71a9e6..c5c78dd147 100644
--- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
+++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
@@ -95,7 +95,8 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
         hasChange = true;
       }
 
-      Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name()));
+      String fieldName = AvroSchemaUtil.makeCompatibleName(field.name());
+      Schema.Field avroField = updateMap.get(fieldName);
 
       if (avroField != null) {
         updatedFields.add(avroField);
@@ -109,11 +110,14 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
         // to make sure that even if records in the file have the field it is not projected.
         Schema.Field newField =
             new Schema.Field(
-                field.name() + "_r" + field.fieldId(),
+                fieldName + "_r" + field.fieldId(),
                 AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())),
                 null,
                 JsonProperties.NULL_VALUE);
         newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
+        if (!field.name().equals(fieldName)) {
+          newField.addProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP, field.name());
+        }
         updatedFields.add(newField);
         hasChange = true;
       }
diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
new file mode 100644
index 0000000000..623d08f36b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class MetadataTableScanTestBase extends TableTestBase {
+
+  @Parameterized.Parameters(name = "formatVersion = {0}")
+  public static Object[] parameters() {
+    return new Object[] {1, 2};
+  }
+
+  public MetadataTableScanTestBase(int formatVersion) {
+    super(formatVersion);
+  }
+
+  protected Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
+    return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
+        .map(t -> (AllManifestsTable.ManifestListReadTask) t)
+        .map(t -> t.file().path().toString())
+        .collect(Collectors.toSet());
+  }
+
+  protected Set<String> expectedManifestListPaths(
+      Iterable<Snapshot> snapshots, Long... snapshotIds) {
+    Set<Long> snapshotIdSet = Sets.newHashSet(snapshotIds);
+    return StreamSupport.stream(snapshots.spliterator(), false)
+        .filter(s -> snapshotIdSet.contains(s.snapshotId()))
+        .map(Snapshot::manifestListLocation)
+        .collect(Collectors.toSet());
+  }
+
+  protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals)
+      throws IOException {
+    try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
+      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+      for (CombinedScanTask combinedScanTask : tasks) {
+        for (FileScanTask fileScanTask : combinedScanTask.files()) {
+          if (ignoreResiduals) {
+            Assert.assertEquals(
+                "Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+          } else {
+            Assert.assertNotEquals(
+                "Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+          }
+        }
+      }
+    }
+  }
+
+  protected void validateIncludesPartitionScan(
+      CloseableIterable<FileScanTask> tasks, int partValue) {
+    validateIncludesPartitionScan(tasks, 0, partValue);
+  }
+
+  protected void validateIncludesPartitionScan(
+      CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+    Assert.assertTrue(
+        "File scan tasks do not include correct file",
+        StreamSupport.stream(tasks.spliterator(), false)
+            .anyMatch(
+                task -> {
+                  StructLike partition = task.file().partition();
+                  if (position >= partition.size()) {
+                    return false;
+                  }
+
+                  return partition.get(position, Object.class).equals(partValue);
+                }));
+  }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index bb1958fb67..e816b10db8 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -35,21 +35,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Types;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-@RunWith(Parameterized.class)
-public class TestMetadataTableScans extends TableTestBase {
-
-  @Parameterized.Parameters(name = "formatVersion = {0}")
-  public static Object[] parameters() {
-    return new Object[] {1, 2};
-  }
+public class TestMetadataTableScans extends MetadataTableScanTestBase {
 
   public TestMetadataTableScans(int formatVersion) {
     super(formatVersion);
@@ -962,52 +953,4 @@ public class TestMetadataTableScans extends TableTestBase {
         expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L),
         actualManifestListPaths(manifestsTableScan));
   }
-
-  private Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
-    return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
-        .map(t -> (AllManifestsTable.ManifestListReadTask) t)
-        .map(t -> t.file().path().toString())
-        .collect(Collectors.toSet());
-  }
-
-  private Set<String> expectedManifestListPaths(Iterable<Snapshot> snapshots, Long... snapshotIds) {
-    Set<Long> snapshotIdSet = Sets.newHashSet(snapshotIds);
-    return StreamSupport.stream(snapshots.spliterator(), false)
-        .filter(s -> snapshotIdSet.contains(s.snapshotId()))
-        .map(Snapshot::manifestListLocation)
-        .collect(Collectors.toSet());
-  }
-
-  private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals)
-      throws IOException {
-    try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
-      Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
-      for (CombinedScanTask combinedScanTask : tasks) {
-        for (FileScanTask fileScanTask : combinedScanTask.files()) {
-          if (ignoreResiduals) {
-            Assert.assertEquals(
-                "Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
-          } else {
-            Assert.assertNotEquals(
-                "Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
-          }
-        }
-      }
-    }
-  }
-
-  private void validateIncludesPartitionScan(CloseableIterable<FileScanTask> tasks, int partValue) {
-    Assert.assertTrue(
-        "File scan tasks do not include correct file",
-        StreamSupport.stream(tasks.spliterator(), false)
-            .anyMatch(a -> a.file().partition().get(0, Object.class).equals(partValue)));
-  }
-
-  private boolean manifestHasPartition(ManifestFile mf, int partValue) {
-    int lower =
-        Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound());
-    int upper =
-        Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound());
-    return (lower <= partValue) && (upper >= partValue);
-  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
new file mode 100644
index 0000000000..c825cef876
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMetadataTableScansWithPartitionEvolution extends MetadataTableScanTestBase {
+  public TestMetadataTableScansWithPartitionEvolution(int formatVersion) {
+    super(formatVersion);
+  }
+
+  @Before
+  public void createTable() throws IOException {
+    TestTables.clearTables();
+    this.tableDir = temp.newFolder();
+    tableDir.delete();
+
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.IntegerType.get()),
+            required(
+                2,
+                "nested",
+                Types.StructType.of(Types.NestedField.required(3, "id", Types.IntegerType.get()))));
+    this.metadataDir = new File(tableDir, "metadata");
+    PartitionSpec spec = PartitionSpec.builderFor(schema).identity("id").build();
+
+    this.table = create(schema, spec);
+    table.newFastAppend().appendFile(newDataFile("id=0")).appendFile(newDataFile("id=1")).commit();
+
+    table.updateSpec().addField("nested.id").commit();
+    table
+        .newFastAppend()
+        .appendFile(newDataFile("id=2/nested.id=2"))
+        .appendFile(newDataFile("id=3/nested.id=3"))
+        .commit();
+  }
+
+  @Test
+  public void testManifestsTableWithAddPartitionOnNestedField() throws IOException {
+    Table manifestsTable = new ManifestsTable(table.ops(), table);
+    TableScan scan = manifestsTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(1);
+      Assertions.assertThat(allRows(tasks)).hasSize(2);
+    }
+  }
+
+  @Test
+  public void testDataFilesTableWithAddPartitionOnNestedField() throws IOException {
+    Table dataFilesTable = new DataFilesTable(table.ops(), table);
+    TableScan scan = dataFilesTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(2);
+      Assertions.assertThat(allRows(tasks)).hasSize(4);
+    }
+  }
+
+  @Test
+  public void testManifestEntriesWithAddPartitionOnNestedField() throws IOException {
+    Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table);
+    TableScan scan = manifestEntriesTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(2);
+      Assertions.assertThat(allRows(tasks)).hasSize(4);
+    }
+  }
+
+  @Test
+  public void testAllDataFilesTableWithAddPartitionOnNestedField() throws IOException {
+    Table allDataFilesTable = new AllDataFilesTable(table.ops(), table);
+    TableScan scan = allDataFilesTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(2);
+      Assertions.assertThat(allRows(tasks)).hasSize(4);
+    }
+  }
+
+  @Test
+  public void testAllEntriesTableWithAddPartitionOnNestedField() throws IOException {
+    Table allEntriesTable = new AllEntriesTable(table.ops(), table);
+    TableScan scan = allEntriesTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(2);
+      Assertions.assertThat(allRows(tasks)).hasSize(4);
+    }
+  }
+
+  @Test
+  public void testAllManifestsTableWithAddPartitionOnNestedField() throws IOException {
+    Table allManifestsTable = new AllManifestsTable(table.ops(), table);
+    TableScan scan = allManifestsTable.newScan();
+
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      Assertions.assertThat(tasks).hasSize(2);
+      Assertions.assertThat(allRows(tasks)).hasSize(3);
+    }
+  }
+
+  @Test
+  public void testPartitionsTableScanWithAddPartitionOnNestedField() throws IOException {
+    Table partitionsTable = new PartitionsTable(table.ops(), table);
+    Types.StructType idPartition =
+        new Schema(
+                required(
+                    1,
+                    "partition",
+                    Types.StructType.of(
+                        optional(1000, "id", Types.IntegerType.get()),
+                        optional(1001, "nested.id", Types.IntegerType.get()))))
+            .asStruct();
+
+    TableScan scanNoFilter = partitionsTable.newScan().select("partition");
+    Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
+    try (CloseableIterable<FileScanTask> tasksNoFilter =
+        PartitionsTable.planFiles((StaticTableScan) scanNoFilter)) {
+      Assertions.assertThat(tasksNoFilter).hasSize(4);
+      validateIncludesPartitionScan(tasksNoFilter, 0);
+      validateIncludesPartitionScan(tasksNoFilter, 1);
+      validateIncludesPartitionScan(tasksNoFilter, 2);
+      validateIncludesPartitionScan(tasksNoFilter, 3);
+      validateIncludesPartitionScan(tasksNoFilter, 1, 2);
+      validateIncludesPartitionScan(tasksNoFilter, 1, 3);
+    }
+  }
+
+  private Stream<StructLike> allRows(Iterable<FileScanTask> tasks) {
+    return Streams.stream(tasks).flatMap(task -> Streams.stream(task.asDataTask().rows()));
+  }
+}