You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/08/14 06:16:22 UTC
[iceberg] branch master updated: Core: Fix querying metadata tables
with multiple specs (#2936)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 95cde3a Core: Fix querying metadata tables with multiple specs (#2936)
95cde3a is described below
commit 95cde3aea7eacf9a391cda36ef255d1b24d930cd
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Aug 13 20:16:13 2021 -1000
Core: Fix querying metadata tables with multiple specs (#2936)
---
.../java/org/apache/iceberg/AllDataFilesTable.java | 6 +-
.../java/org/apache/iceberg/AllEntriesTable.java | 6 +-
.../java/org/apache/iceberg/DataFilesTable.java | 6 +-
.../org/apache/iceberg/ManifestEntriesTable.java | 6 +-
.../main/java/org/apache/iceberg/Partitioning.java | 69 +++++
.../java/org/apache/iceberg/TestPartitioning.java | 196 +++++++++++++
.../TestMetadataTablesWithPartitionEvolution.java | 310 +++++++++++++++++++++
7 files changed, 591 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index fc2dc69..d1b084f 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
@@ -56,8 +57,9 @@ public class AllDataFilesTable extends BaseMetadataTable {
@Override
public Schema schema() {
- Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
- if (table().spec().fields().size() < 1) {
+ StructType partitionType = Partitioning.partitionType(table());
+ Schema schema = new Schema(DataFile.getType(partitionType).fields());
+ if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index c1b7145..84c1609 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
@@ -55,8 +56,9 @@ public class AllEntriesTable extends BaseMetadataTable {
@Override
public Schema schema() {
- Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
- if (table().spec().fields().size() < 1) {
+ StructType partitionType = Partitioning.partitionType(table());
+ Schema schema = ManifestEntry.getSchema(partitionType);
+ if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index d6b80ee..f931c06 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
/**
* A {@link Table} implementation that exposes a table's data files as rows.
@@ -51,8 +52,9 @@ public class DataFilesTable extends BaseMetadataTable {
@Override
public Schema schema() {
- Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
- if (table().spec().fields().size() < 1) {
+ StructType partitionType = Partitioning.partitionType(table());
+ Schema schema = new Schema(DataFile.getType(partitionType).fields());
+ if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field
return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
} else {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 7bae349..a44fc64 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.StructProjection;
/**
@@ -54,8 +55,9 @@ public class ManifestEntriesTable extends BaseMetadataTable {
@Override
public Schema schema() {
- Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
- if (table().spec().fields().size() < 1) {
+ StructType partitionType = Partitioning.partitionType(table());
+ Schema schema = ManifestEntry.getSchema(partitionType);
+ if (partitionType.fields().size() < 1) {
// avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
return TypeUtil.selectNot(schema, Sets.newHashSet(102));
} else {
diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java
index d393fe1..28598d4 100644
--- a/core/src/main/java/org/apache/iceberg/Partitioning.java
+++ b/core/src/main/java/org/apache/iceberg/Partitioning.java
@@ -19,9 +19,20 @@
package org.apache.iceberg;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
public class Partitioning {
private Partitioning() {
@@ -177,4 +188,62 @@ public class Partitioning {
return null;
}
}
+
+ /**
+ * Builds a common partition type for all specs in a table.
+ * <p>
+ * Whenever a table has multiple specs, the partition type is a struct containing
+ * all columns that have ever been a part of any spec in the table.
+ *
+ * @param table a table with one or many specs
+ * @return the constructed common partition type
+ */
+ public static StructType partitionType(Table table) {
+ if (table.specs().size() == 1) {
+ return table.spec().partitionType();
+ }
+
+ Map<Integer, PartitionField> fieldMap = Maps.newHashMap();
+ List<NestedField> structFields = Lists.newArrayList();
+
+ // sort the spec IDs in descending order to pick up the most recent field names
+ List<Integer> specIds = table.specs().keySet().stream()
+ .sorted(Collections.reverseOrder())
+ .collect(Collectors.toList());
+
+ for (Integer specId : specIds) {
+ PartitionSpec spec = table.specs().get(specId);
+
+ for (PartitionField field : spec.fields()) {
+ int fieldId = field.fieldId();
+ PartitionField existingField = fieldMap.get(fieldId);
+
+ if (existingField == null) {
+ fieldMap.put(fieldId, field);
+ NestedField structField = spec.partitionType().field(fieldId);
+ structFields.add(structField);
+ } else {
+ // verify the fields are compatible as they may conflict in v1 tables
+ ValidationException.check(equivalentIgnoringNames(field, existingField),
+ "Conflicting partition fields: ['%s', '%s']",
+ field, existingField);
+ }
+ }
+ }
+
+ List<NestedField> sortedStructFields = structFields.stream()
+ .sorted(Comparator.comparingInt(NestedField::fieldId))
+ .collect(Collectors.toList());
+ return StructType.of(sortedStructFields);
+ }
+
+ private static boolean equivalentIgnoringNames(PartitionField field, PartitionField anotherField) {
+ return field.fieldId() == anotherField.fieldId() &&
+ field.sourceId() == anotherField.sourceId() &&
+ compatibleTransforms(field.transform(), anotherField.transform());
+ }
+
+ private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?> t2) {
+ return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || t2.equals(Transforms.alwaysNull());
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
new file mode 100644
index 0000000..2610ad5
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestPartitioning {
+
+ private static final int V1_FORMAT_VERSION = 1;
+ private static final int V2_FORMAT_VERSION = 2;
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ required(2, "data", Types.StringType.get()),
+ required(3, "category", Types.StringType.get())
+ );
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private File tableDir = null;
+
+ @Before
+ public void setupTableDir() throws IOException {
+ this.tableDir = temp.newFolder();
+ }
+
+ @After
+ public void cleanupTables() {
+ TestTables.clearTables();
+ }
+
+ @Test
+ public void testPartitionTypeWithSpecEvolutionInV1Tables() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+ table.updateSpec()
+ .addField(Expressions.bucket("category", 8))
+ .commit();
+
+ Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+ StructType expectedType = StructType.of(
+ NestedField.optional(1000, "data", Types.StringType.get()),
+ NestedField.optional(1001, "category_bucket_8", Types.IntegerType.get())
+ );
+ StructType actualType = Partitioning.partitionType(table);
+ Assert.assertEquals("Types must match", expectedType, actualType);
+ }
+
+ @Test
+ public void testPartitionTypeWithSpecEvolutionInV2Tables() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);
+
+ table.updateSpec()
+ .removeField("data")
+ .addField("category")
+ .commit();
+
+ Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+ StructType expectedType = StructType.of(
+ NestedField.optional(1000, "data", Types.StringType.get()),
+ NestedField.optional(1001, "category", Types.StringType.get())
+ );
+ StructType actualType = Partitioning.partitionType(table);
+ Assert.assertEquals("Types must match", expectedType, actualType);
+ }
+
+ @Test
+ public void testPartitionTypeWithRenamesInV1Table() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data", "p1")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+ table.updateSpec()
+ .addField("category")
+ .commit();
+
+ table.updateSpec()
+ .renameField("p1", "p2")
+ .commit();
+
+ StructType expectedType = StructType.of(
+ NestedField.optional(1000, "p2", Types.StringType.get()),
+ NestedField.optional(1001, "category", Types.StringType.get())
+ );
+ StructType actualType = Partitioning.partitionType(table);
+ Assert.assertEquals("Types must match", expectedType, actualType);
+ }
+
+ @Test
+ public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+
+ // in v1, we use void transforms instead of dropping partition fields
+ StructType expectedType = StructType.of(
+ NestedField.optional(1000, "data_1000", Types.StringType.get()),
+ NestedField.optional(1001, "data", Types.StringType.get())
+ );
+ StructType actualType = Partitioning.partitionType(table);
+ Assert.assertEquals("Types must match", expectedType, actualType);
+ }
+
+ @Test
+ public void testPartitionTypeWithAddingBackSamePartitionFieldInV2Table() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);
+
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+
+ // in v2, we should be able to reuse the original partition spec
+ StructType expectedType = StructType.of(
+ NestedField.optional(1000, "data", Types.StringType.get())
+ );
+ StructType actualType = Partitioning.partitionType(table);
+ Assert.assertEquals("Types must match", expectedType, actualType);
+ }
+
+ @Test
+ public void testPartitionTypeWithIncompatibleSpecEvolution() {
+ PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+ .identity("data")
+ .build();
+ TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+ PartitionSpec newSpec = PartitionSpec.builderFor(table.schema())
+ .identity("category")
+ .build();
+
+ TableOperations ops = ((HasTableOperations) table).operations();
+ TableMetadata current = ops.current();
+ ops.commit(current, current.updatePartitionSpec(newSpec));
+
+ Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+ AssertHelpers.assertThrows("Should complain about incompatible specs",
+ ValidationException.class, "Conflicting partition fields",
+ () -> Partitioning.partitionType(table));
+ }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
new file mode 100644
index 0000000..0581ebe
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.apache.iceberg.FileFormat.AVRO;
+import static org.apache.iceberg.FileFormat.ORC;
+import static org.apache.iceberg.FileFormat.PARQUET;
+import static org.apache.iceberg.MetadataTableType.ALL_DATA_FILES;
+import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+import static org.apache.iceberg.MetadataTableType.FILES;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+
+@RunWith(Parameterized.class)
+public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBase {
+
+ @Parameters(name = "catalog = {0}, impl = {1}, conf = {2}, fileFormat = {3}, formatVersion = {4}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ { "testhive", SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default"
+ ),
+ ORC,
+ formatVersion()
+ },
+ { "testhadoop", SparkCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hadoop"
+ ),
+ PARQUET,
+ formatVersion()
+ },
+ { "spark_catalog", SparkSessionCatalog.class.getName(),
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "clients", "1",
+ "parquet-enabled", "false",
+ "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
+ ),
+ AVRO,
+ formatVersion()
+ }
+ };
+ }
+
+ private static int formatVersion() {
+ return RANDOM.nextInt(2) + 1;
+ }
+
+ private static final Random RANDOM = ThreadLocalRandom.current();
+
+ private final FileFormat fileFormat;
+ private final int formatVersion;
+
+ public TestMetadataTablesWithPartitionEvolution(String catalogName, String implementation, Map<String, String> config,
+ FileFormat fileFormat, int formatVersion) {
+ super(catalogName, implementation, config);
+ this.fileFormat = fileFormat;
+ this.formatVersion = formatVersion;
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testFilesMetadataTable() throws ParseException {
+ sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
+ initTable();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables while the current spec is still unpartitioned
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ Dataset<Row> df = loadMetadataTable(tableType);
+ Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty());
+ }
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after adding the first partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(new Object[]{null}), row("b1")),
+ "STRUCT<data:STRING>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .addField(Expressions.bucket("category", 8))
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after adding the second partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8:INT>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after dropping the first partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8:INT>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .renameField("category_bucket_8", "category_bucket_8_another_name")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+
+ // verify the metadata tables after renaming the second partition column
+ for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8_another_name:INT>",
+ tableType);
+ }
+ }
+
+ @Test
+ public void testEntriesMetadataTable() throws ParseException {
+ sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
+ initTable();
+
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables while the current spec is still unpartitioned
+ for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+ Dataset<Row> df = loadMetadataTable(tableType);
+ StructType dataFileType = (StructType) df.schema().apply("data_file").dataType();
+ Assert.assertTrue("Partition must be skipped", dataFileType.getFieldIndex("").isEmpty());
+ }
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ table.updateSpec()
+ .addField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after adding the first partition column
+ for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+ assertPartitions(
+ ImmutableList.of(row(new Object[]{null}), row("b1")),
+ "STRUCT<data:STRING>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .addField(Expressions.bucket("category", 8))
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after adding the second partition column
+ for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8:INT>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .removeField("data")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+ sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+ // verify the metadata tables after dropping the first partition column
+ for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8:INT>",
+ tableType);
+ }
+
+ table.updateSpec()
+ .renameField("category_bucket_8", "category_bucket_8_another_name")
+ .commit();
+ sql("REFRESH TABLE %s", tableName);
+
+ // verify the metadata tables after renaming the second partition column
+ for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+ assertPartitions(
+ ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+ "STRUCT<data:STRING,category_bucket_8_another_name:INT>",
+ tableType);
+ }
+ }
+
+ private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
+ MetadataTableType tableType) throws ParseException {
+ Dataset<Row> df = loadMetadataTable(tableType);
+
+ DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString);
+ switch (tableType) {
+ case FILES:
+ case ALL_DATA_FILES:
+ DataType actualFilesType = df.schema().apply("partition").dataType();
+ Assert.assertEquals("Partition type must match", expectedType, actualFilesType);
+ break;
+
+ case ENTRIES:
+ case ALL_ENTRIES:
+ StructType dataFileType = (StructType) df.schema().apply("data_file").dataType();
+ DataType actualEntriesType = dataFileType.apply("partition").dataType();
+ Assert.assertEquals("Partition type must match", expectedType, actualEntriesType);
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType);
+ }
+
+ switch (tableType) {
+ case FILES:
+ case ALL_DATA_FILES:
+ List<Row> actualFilesPartitions = df.orderBy("partition")
+ .select("partition.*")
+ .collectAsList();
+ assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualFilesPartitions));
+ break;
+
+ case ENTRIES:
+ case ALL_ENTRIES:
+ List<Row> actualEntriesPartitions = df.orderBy("data_file.partition")
+ .select("data_file.partition.*")
+ .collectAsList();
+ assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualEntriesPartitions));
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType);
+ }
+ }
+
+ private Dataset<Row> loadMetadataTable(MetadataTableType tableType) {
+ return spark.read().format("iceberg").load(tableName + "." + tableType.name());
+ }
+
+ private void initTable() {
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, fileFormat.name());
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, FORMAT_VERSION, formatVersion);
+ }
+}