You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2022/11/08 14:33:18 UTC
[iceberg] branch master updated: Core: Fix metadata table read failure due to illegal character (#4577)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9ee8457766 Core: Fix metadata table read failure due to illegal character (#4577)
9ee8457766 is described below
commit 9ee8457766a966252ea1a7d2e40ff2d812df7c4e
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Tue Nov 8 22:33:12 2022 +0800
Core: Fix metadata table read failure due to illegal character (#4577)
---
.../apache/iceberg/avro/BuildAvroProjection.java | 8 +-
.../apache/iceberg/MetadataTableScanTestBase.java | 99 +++++++++++++
.../org/apache/iceberg/TestMetadataTableScans.java | 59 +-------
...stMetadataTableScansWithPartitionEvolution.java | 163 +++++++++++++++++++++
4 files changed, 269 insertions(+), 60 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
index 3f1a71a9e6..c5c78dd147 100644
--- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
+++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java
@@ -95,7 +95,8 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
hasChange = true;
}
- Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name()));
+ String fieldName = AvroSchemaUtil.makeCompatibleName(field.name());
+ Schema.Field avroField = updateMap.get(fieldName);
if (avroField != null) {
updatedFields.add(avroField);
@@ -109,11 +110,14 @@ class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Fi
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField =
new Schema.Field(
- field.name() + "_r" + field.fieldId(),
+ fieldName + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())),
null,
JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
+ if (!field.name().equals(fieldName)) {
+ newField.addProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP, field.name());
+ }
updatedFields.add(newField);
hasChange = true;
}
diff --git a/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
new file mode 100644
index 0000000000..623d08f36b
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/MetadataTableScanTestBase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public abstract class MetadataTableScanTestBase extends TableTestBase {
+
+ @Parameterized.Parameters(name = "formatVersion = {0}")
+ public static Object[] parameters() {
+ return new Object[] {1, 2};
+ }
+
+ public MetadataTableScanTestBase(int formatVersion) {
+ super(formatVersion);
+ }
+
+ protected Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
+ return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
+ .map(t -> (AllManifestsTable.ManifestListReadTask) t)
+ .map(t -> t.file().path().toString())
+ .collect(Collectors.toSet());
+ }
+
+ protected Set<String> expectedManifestListPaths(
+ Iterable<Snapshot> snapshots, Long... snapshotIds) {
+ Set<Long> snapshotIdSet = Sets.newHashSet(snapshotIds);
+ return StreamSupport.stream(snapshots.spliterator(), false)
+ .filter(s -> snapshotIdSet.contains(s.snapshotId()))
+ .map(Snapshot::manifestListLocation)
+ .collect(Collectors.toSet());
+ }
+
+ protected void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals)
+ throws IOException {
+ try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
+ Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
+ for (CombinedScanTask combinedScanTask : tasks) {
+ for (FileScanTask fileScanTask : combinedScanTask.files()) {
+ if (ignoreResiduals) {
+ Assert.assertEquals(
+ "Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
+ } else {
+ Assert.assertNotEquals(
+ "Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
+ }
+ }
+ }
+ }
+ }
+
+ protected void validateIncludesPartitionScan(
+ CloseableIterable<FileScanTask> tasks, int partValue) {
+ validateIncludesPartitionScan(tasks, 0, partValue);
+ }
+
+ protected void validateIncludesPartitionScan(
+ CloseableIterable<FileScanTask> tasks, int position, int partValue) {
+ Assert.assertTrue(
+ "File scan tasks do not include correct file",
+ StreamSupport.stream(tasks.spliterator(), false)
+ .anyMatch(
+ task -> {
+ StructLike partition = task.file().partition();
+ if (position >= partition.size()) {
+ return false;
+ }
+
+ return partition.get(position, Object.class).equals(partValue);
+ }));
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
index bb1958fb67..e816b10db8 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScans.java
@@ -35,21 +35,12 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
-import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-@RunWith(Parameterized.class)
-public class TestMetadataTableScans extends TableTestBase {
-
- @Parameterized.Parameters(name = "formatVersion = {0}")
- public static Object[] parameters() {
- return new Object[] {1, 2};
- }
+public class TestMetadataTableScans extends MetadataTableScanTestBase {
public TestMetadataTableScans(int formatVersion) {
super(formatVersion);
@@ -962,52 +953,4 @@ public class TestMetadataTableScans extends TableTestBase {
expectedManifestListPaths(table.snapshots(), 1L, 3L, 4L),
actualManifestListPaths(manifestsTableScan));
}
-
- private Set<String> actualManifestListPaths(TableScan allManifestsTableScan) {
- return StreamSupport.stream(allManifestsTableScan.planFiles().spliterator(), false)
- .map(t -> (AllManifestsTable.ManifestListReadTask) t)
- .map(t -> t.file().path().toString())
- .collect(Collectors.toSet());
- }
-
- private Set<String> expectedManifestListPaths(Iterable<Snapshot> snapshots, Long... snapshotIds) {
- Set<Long> snapshotIdSet = Sets.newHashSet(snapshotIds);
- return StreamSupport.stream(snapshots.spliterator(), false)
- .filter(s -> snapshotIdSet.contains(s.snapshotId()))
- .map(Snapshot::manifestListLocation)
- .collect(Collectors.toSet());
- }
-
- private void validateTaskScanResiduals(TableScan scan, boolean ignoreResiduals)
- throws IOException {
- try (CloseableIterable<CombinedScanTask> tasks = scan.planTasks()) {
- Assert.assertTrue("Tasks should not be empty", Iterables.size(tasks) > 0);
- for (CombinedScanTask combinedScanTask : tasks) {
- for (FileScanTask fileScanTask : combinedScanTask.files()) {
- if (ignoreResiduals) {
- Assert.assertEquals(
- "Residuals must be ignored", Expressions.alwaysTrue(), fileScanTask.residual());
- } else {
- Assert.assertNotEquals(
- "Residuals must be preserved", Expressions.alwaysTrue(), fileScanTask.residual());
- }
- }
- }
- }
- }
-
- private void validateIncludesPartitionScan(CloseableIterable<FileScanTask> tasks, int partValue) {
- Assert.assertTrue(
- "File scan tasks do not include correct file",
- StreamSupport.stream(tasks.spliterator(), false)
- .anyMatch(a -> a.file().partition().get(0, Object.class).equals(partValue)));
- }
-
- private boolean manifestHasPartition(ManifestFile mf, int partValue) {
- int lower =
- Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).lowerBound());
- int upper =
- Conversions.fromByteBuffer(Types.IntegerType.get(), mf.partitions().get(0).upperBound());
- return (lower <= partValue) && (upper >= partValue);
- }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
new file mode 100644
index 0000000000..c825cef876
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataTableScansWithPartitionEvolution.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.stream.Stream;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMetadataTableScansWithPartitionEvolution extends MetadataTableScanTestBase {
+ public TestMetadataTableScansWithPartitionEvolution(int formatVersion) {
+ super(formatVersion);
+ }
+
+ @Before
+ public void createTable() throws IOException {
+ TestTables.clearTables();
+ this.tableDir = temp.newFolder();
+ tableDir.delete();
+
+ Schema schema =
+ new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(
+ 2,
+ "nested",
+ Types.StructType.of(Types.NestedField.required(3, "id", Types.IntegerType.get()))));
+ this.metadataDir = new File(tableDir, "metadata");
+ PartitionSpec spec = PartitionSpec.builderFor(schema).identity("id").build();
+
+ this.table = create(schema, spec);
+ table.newFastAppend().appendFile(newDataFile("id=0")).appendFile(newDataFile("id=1")).commit();
+
+ table.updateSpec().addField("nested.id").commit();
+ table
+ .newFastAppend()
+ .appendFile(newDataFile("id=2/nested.id=2"))
+ .appendFile(newDataFile("id=3/nested.id=3"))
+ .commit();
+ }
+
+ @Test
+ public void testManifestsTableWithAddPartitionOnNestedField() throws IOException {
+ Table manifestsTable = new ManifestsTable(table.ops(), table);
+ TableScan scan = manifestsTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(1);
+ Assertions.assertThat(allRows(tasks)).hasSize(2);
+ }
+ }
+
+ @Test
+ public void testDataFilesTableWithAddPartitionOnNestedField() throws IOException {
+ Table dataFilesTable = new DataFilesTable(table.ops(), table);
+ TableScan scan = dataFilesTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(2);
+ Assertions.assertThat(allRows(tasks)).hasSize(4);
+ }
+ }
+
+ @Test
+ public void testManifestEntriesWithAddPartitionOnNestedField() throws IOException {
+ Table manifestEntriesTable = new ManifestEntriesTable(table.ops(), table);
+ TableScan scan = manifestEntriesTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(2);
+ Assertions.assertThat(allRows(tasks)).hasSize(4);
+ }
+ }
+
+ @Test
+ public void testAllDataFilesTableWithAddPartitionOnNestedField() throws IOException {
+ Table allDataFilesTable = new AllDataFilesTable(table.ops(), table);
+ TableScan scan = allDataFilesTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(2);
+ Assertions.assertThat(allRows(tasks)).hasSize(4);
+ }
+ }
+
+ @Test
+ public void testAllEntriesTableWithAddPartitionOnNestedField() throws IOException {
+ Table allEntriesTable = new AllEntriesTable(table.ops(), table);
+ TableScan scan = allEntriesTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(2);
+ Assertions.assertThat(allRows(tasks)).hasSize(4);
+ }
+ }
+
+ @Test
+ public void testAllManifestsTableWithAddPartitionOnNestedField() throws IOException {
+ Table allManifestsTable = new AllManifestsTable(table.ops(), table);
+ TableScan scan = allManifestsTable.newScan();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ Assertions.assertThat(tasks).hasSize(2);
+ Assertions.assertThat(allRows(tasks)).hasSize(3);
+ }
+ }
+
+ @Test
+ public void testPartitionsTableScanWithAddPartitionOnNestedField() throws IOException {
+ Table partitionsTable = new PartitionsTable(table.ops(), table);
+ Types.StructType idPartition =
+ new Schema(
+ required(
+ 1,
+ "partition",
+ Types.StructType.of(
+ optional(1000, "id", Types.IntegerType.get()),
+ optional(1001, "nested.id", Types.IntegerType.get()))))
+ .asStruct();
+
+ TableScan scanNoFilter = partitionsTable.newScan().select("partition");
+ Assert.assertEquals(idPartition, scanNoFilter.schema().asStruct());
+ try (CloseableIterable<FileScanTask> tasksNoFilter =
+ PartitionsTable.planFiles((StaticTableScan) scanNoFilter)) {
+ Assertions.assertThat(tasksNoFilter).hasSize(4);
+ validateIncludesPartitionScan(tasksNoFilter, 0);
+ validateIncludesPartitionScan(tasksNoFilter, 1);
+ validateIncludesPartitionScan(tasksNoFilter, 2);
+ validateIncludesPartitionScan(tasksNoFilter, 3);
+ validateIncludesPartitionScan(tasksNoFilter, 1, 2);
+ validateIncludesPartitionScan(tasksNoFilter, 1, 3);
+ }
+ }
+
+ private Stream<StructLike> allRows(Iterable<FileScanTask> tasks) {
+ return Streams.stream(tasks).flatMap(task -> Streams.stream(task.asDataTask().rows()));
+ }
+}