You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/08/14 06:16:22 UTC

[iceberg] branch master updated: Core: Fix querying metadata tables with multiple specs (#2936)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 95cde3a  Core: Fix querying metadata tables with multiple specs (#2936)
95cde3a is described below

commit 95cde3aea7eacf9a391cda36ef255d1b24d930cd
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Fri Aug 13 20:16:13 2021 -1000

    Core: Fix querying metadata tables with multiple specs (#2936)
---
 .../java/org/apache/iceberg/AllDataFilesTable.java |   6 +-
 .../java/org/apache/iceberg/AllEntriesTable.java   |   6 +-
 .../java/org/apache/iceberg/DataFilesTable.java    |   6 +-
 .../org/apache/iceberg/ManifestEntriesTable.java   |   6 +-
 .../main/java/org/apache/iceberg/Partitioning.java |  69 +++++
 .../java/org/apache/iceberg/TestPartitioning.java  | 196 +++++++++++++
 .../TestMetadataTablesWithPartitionEvolution.java  | 310 +++++++++++++++++++++
 7 files changed, 591 insertions(+), 8 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
index fc2dc69..d1b084f 100644
--- a/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllDataFilesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.ThreadPools;
 
@@ -56,8 +57,9 @@ public class AllDataFilesTable extends BaseMetadataTable {
 
   @Override
   public Schema schema() {
-    Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
-    if (table().spec().fields().size() < 1) {
+    StructType partitionType = Partitioning.partitionType(table());
+    Schema schema = new Schema(DataFile.getType(partitionType).fields());
+    if (partitionType.fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index c1b7145..84c1609 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.ThreadPools;
 
@@ -55,8 +56,9 @@ public class AllEntriesTable extends BaseMetadataTable {
 
   @Override
   public Schema schema() {
-    Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
-    if (table().spec().fields().size() < 1) {
+    StructType partitionType = Partitioning.partitionType(table());
+    Schema schema = ManifestEntry.getSchema(partitionType);
+    if (partitionType.fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index d6b80ee..f931c06 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
 
 /**
  * A {@link Table} implementation that exposes a table's data files as rows.
@@ -51,8 +52,9 @@ public class DataFilesTable extends BaseMetadataTable {
 
   @Override
   public Schema schema() {
-    Schema schema = new Schema(DataFile.getType(table().spec().partitionType()).fields());
-    if (table().spec().fields().size() < 1) {
+    StructType partitionType = Partitioning.partitionType(table());
+    Schema schema = new Schema(DataFile.getType(partitionType).fields());
+    if (partitionType.fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field
       return TypeUtil.selectNot(schema, Sets.newHashSet(DataFile.PARTITION_ID));
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 7bae349..a44fc64 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.StructProjection;
 
 /**
@@ -54,8 +55,9 @@ public class ManifestEntriesTable extends BaseMetadataTable {
 
   @Override
   public Schema schema() {
-    Schema schema = ManifestEntry.getSchema(table().spec().partitionType());
-    if (table().spec().fields().size() < 1) {
+    StructType partitionType = Partitioning.partitionType(table());
+    Schema schema = ManifestEntry.getSchema(partitionType);
+    if (partitionType.fields().size() < 1) {
       // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
       return TypeUtil.selectNot(schema, Sets.newHashSet(102));
     } else {
diff --git a/core/src/main/java/org/apache/iceberg/Partitioning.java b/core/src/main/java/org/apache/iceberg/Partitioning.java
index d393fe1..28598d4 100644
--- a/core/src/main/java/org/apache/iceberg/Partitioning.java
+++ b/core/src/main/java/org/apache/iceberg/Partitioning.java
@@ -19,9 +19,20 @@
 
 package org.apache.iceberg;
 
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.transforms.PartitionSpecVisitor;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
 
 public class Partitioning {
   private Partitioning() {
@@ -177,4 +188,62 @@ public class Partitioning {
       return null;
     }
   }
+
+  /**
+   * Builds a common partition type for all specs in a table.
+   * <p>
+   * Whenever a table has multiple specs, the partition type is a struct containing
+   * all columns that have ever been a part of any spec in the table.
+   *
+   * @param table a table with one or many specs
+   * @return the constructed common partition type
+   */
+  public static StructType partitionType(Table table) {
+    if (table.specs().size() == 1) {
+      return table.spec().partitionType();
+    }
+
+    Map<Integer, PartitionField> fieldMap = Maps.newHashMap();
+    List<NestedField> structFields = Lists.newArrayList();
+
+    // sort the spec IDs in descending order to pick up the most recent field names
+    List<Integer> specIds = table.specs().keySet().stream()
+        .sorted(Collections.reverseOrder())
+        .collect(Collectors.toList());
+
+    for (Integer specId : specIds) {
+      PartitionSpec spec = table.specs().get(specId);
+
+      for (PartitionField field : spec.fields()) {
+        int fieldId = field.fieldId();
+        PartitionField existingField = fieldMap.get(fieldId);
+
+        if (existingField == null) {
+          fieldMap.put(fieldId, field);
+          NestedField structField = spec.partitionType().field(fieldId);
+          structFields.add(structField);
+        } else {
+          // verify the fields are compatible as they may conflict in v1 tables
+          ValidationException.check(equivalentIgnoringNames(field, existingField),
+              "Conflicting partition fields: ['%s', '%s']",
+              field, existingField);
+        }
+      }
+    }
+
+    List<NestedField> sortedStructFields = structFields.stream()
+        .sorted(Comparator.comparingInt(NestedField::fieldId))
+        .collect(Collectors.toList());
+    return StructType.of(sortedStructFields);
+  }
+
+  private static boolean equivalentIgnoringNames(PartitionField field, PartitionField anotherField) {
+    return field.fieldId() == anotherField.fieldId() &&
+        field.sourceId() == anotherField.sourceId() &&
+        compatibleTransforms(field.transform(), anotherField.transform());
+  }
+
+  private static boolean compatibleTransforms(Transform<?, ?> t1, Transform<?, ?> t2) {
+    return t1.equals(t2) || t1.equals(Transforms.alwaysNull()) || t2.equals(Transforms.alwaysNull());
+  }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestPartitioning.java b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
new file mode 100644
index 0000000..2610ad5
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/TestPartitioning.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+public class TestPartitioning {
+
+  private static final int V1_FORMAT_VERSION = 1;
+  private static final int V2_FORMAT_VERSION = 2;
+  private static final Schema SCHEMA = new Schema(
+      required(1, "id", Types.IntegerType.get()),
+      required(2, "data", Types.StringType.get()),
+      required(3, "category", Types.StringType.get())
+  );
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private File tableDir = null;
+
+  @Before
+  public void setupTableDir() throws IOException {
+    this.tableDir = temp.newFolder();
+  }
+
+  @After
+  public void cleanupTables() {
+    TestTables.clearTables();
+  }
+
+  @Test
+  public void testPartitionTypeWithSpecEvolutionInV1Tables() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+    table.updateSpec()
+        .addField(Expressions.bucket("category", 8))
+        .commit();
+
+    Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+    StructType expectedType = StructType.of(
+        NestedField.optional(1000, "data", Types.StringType.get()),
+        NestedField.optional(1001, "category_bucket_8", Types.IntegerType.get())
+    );
+    StructType actualType = Partitioning.partitionType(table);
+    Assert.assertEquals("Types must match", expectedType, actualType);
+  }
+
+  @Test
+  public void testPartitionTypeWithSpecEvolutionInV2Tables() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);
+
+    table.updateSpec()
+        .removeField("data")
+        .addField("category")
+        .commit();
+
+    Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+    StructType expectedType = StructType.of(
+        NestedField.optional(1000, "data", Types.StringType.get()),
+        NestedField.optional(1001, "category", Types.StringType.get())
+    );
+    StructType actualType = Partitioning.partitionType(table);
+    Assert.assertEquals("Types must match", expectedType, actualType);
+  }
+
+  @Test
+  public void testPartitionTypeWithRenamesInV1Table() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data", "p1")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+    table.updateSpec()
+        .addField("category")
+        .commit();
+
+    table.updateSpec()
+        .renameField("p1", "p2")
+        .commit();
+
+    StructType expectedType = StructType.of(
+        NestedField.optional(1000, "p2", Types.StringType.get()),
+        NestedField.optional(1001, "category", Types.StringType.get())
+    );
+    StructType actualType = Partitioning.partitionType(table);
+    Assert.assertEquals("Types must match", expectedType, actualType);
+  }
+
+  @Test
+  public void testPartitionTypeWithAddingBackSamePartitionFieldInV1Table() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+
+    // in v1, we use void transforms instead of dropping partition fields
+    StructType expectedType = StructType.of(
+        NestedField.optional(1000, "data_1000", Types.StringType.get()),
+        NestedField.optional(1001, "data", Types.StringType.get())
+    );
+    StructType actualType = Partitioning.partitionType(table);
+    Assert.assertEquals("Types must match", expectedType, actualType);
+  }
+
+  @Test
+  public void testPartitionTypeWithAddingBackSamePartitionFieldInV2Table() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V2_FORMAT_VERSION);
+
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+
+    // in v2, we should be able to reuse the original partition spec
+    StructType expectedType = StructType.of(
+        NestedField.optional(1000, "data", Types.StringType.get())
+    );
+    StructType actualType = Partitioning.partitionType(table);
+    Assert.assertEquals("Types must match", expectedType, actualType);
+  }
+
+  @Test
+  public void testPartitionTypeWithIncompatibleSpecEvolution() {
+    PartitionSpec initialSpec = PartitionSpec.builderFor(SCHEMA)
+        .identity("data")
+        .build();
+    TestTables.TestTable table = TestTables.create(tableDir, "test", SCHEMA, initialSpec, V1_FORMAT_VERSION);
+
+    PartitionSpec newSpec = PartitionSpec.builderFor(table.schema())
+        .identity("category")
+        .build();
+
+    TableOperations ops = ((HasTableOperations) table).operations();
+    TableMetadata current = ops.current();
+    ops.commit(current, current.updatePartitionSpec(newSpec));
+
+    Assert.assertEquals("Should have 2 specs", 2, table.specs().size());
+
+    AssertHelpers.assertThrows("Should complain about incompatible specs",
+        ValidationException.class, "Conflicting partition fields",
+        () -> Partitioning.partitionType(table));
+  }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
new file mode 100644
index 0000000..0581ebe
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestMetadataTablesWithPartitionEvolution.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.SparkSessionCatalog;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.apache.iceberg.FileFormat.AVRO;
+import static org.apache.iceberg.FileFormat.ORC;
+import static org.apache.iceberg.FileFormat.PARQUET;
+import static org.apache.iceberg.MetadataTableType.ALL_DATA_FILES;
+import static org.apache.iceberg.MetadataTableType.ALL_ENTRIES;
+import static org.apache.iceberg.MetadataTableType.ENTRIES;
+import static org.apache.iceberg.MetadataTableType.FILES;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+
+@RunWith(Parameterized.class)
+public class TestMetadataTablesWithPartitionEvolution extends SparkCatalogTestBase {
+
+  @Parameters(name = "catalog = {0}, impl = {1}, conf = {2}, fileFormat = {3}, formatVersion = {4}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        { "testhive", SparkCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hive",
+                "default-namespace", "default"
+            ),
+            ORC,
+            formatVersion()
+        },
+        { "testhadoop", SparkCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hadoop"
+            ),
+            PARQUET,
+            formatVersion()
+        },
+        { "spark_catalog", SparkSessionCatalog.class.getName(),
+            ImmutableMap.of(
+                "type", "hive",
+                "default-namespace", "default",
+                "clients", "1",
+                "parquet-enabled", "false",
+                "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync
+            ),
+            AVRO,
+            formatVersion()
+        }
+    };
+  }
+
+  private static int formatVersion() {
+    return RANDOM.nextInt(2) + 1;
+  }
+
+  private static final Random RANDOM = ThreadLocalRandom.current();
+
+  private final FileFormat fileFormat;
+  private final int formatVersion;
+
+  public TestMetadataTablesWithPartitionEvolution(String catalogName, String implementation, Map<String, String> config,
+                                                  FileFormat fileFormat, int formatVersion) {
+    super(catalogName, implementation, config);
+    this.fileFormat = fileFormat;
+    this.formatVersion = formatVersion;
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testFilesMetadataTable() throws ParseException {
+    sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
+    initTable();
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables while the current spec is still unpartitioned
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      Dataset<Row> df = loadMetadataTable(tableType);
+      Assert.assertTrue("Partition must be skipped", df.schema().getFieldIndex("partition").isEmpty());
+    }
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after adding the first partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(new Object[]{null}), row("b1")),
+          "STRUCT<data:STRING>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .addField(Expressions.bucket("category", 8))
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after adding the second partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8:INT>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after dropping the first partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8:INT>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .renameField("category_bucket_8", "category_bucket_8_another_name")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+
+    // verify the metadata tables after renaming the second partition column
+    for (MetadataTableType tableType : Arrays.asList(FILES, ALL_DATA_FILES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8_another_name:INT>",
+          tableType);
+    }
+  }
+
+  @Test
+  public void testEntriesMetadataTable() throws ParseException {
+    sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg", tableName);
+    initTable();
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables while the current spec is still unpartitioned
+    for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+      Dataset<Row> df = loadMetadataTable(tableType);
+      StructType dataFileType = (StructType) df.schema().apply("data_file").dataType();
+      Assert.assertTrue("Partition must be skipped", dataFileType.getFieldIndex("").isEmpty());
+    }
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    table.updateSpec()
+        .addField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after adding the first partition column
+    for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+      assertPartitions(
+          ImmutableList.of(row(new Object[]{null}), row("b1")),
+          "STRUCT<data:STRING>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .addField(Expressions.bucket("category", 8))
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after adding the second partition column
+    for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8:INT>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", tableName);
+
+    // verify the metadata tables after dropping the first partition column
+    for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8:INT>",
+          tableType);
+    }
+
+    table.updateSpec()
+        .renameField("category_bucket_8", "category_bucket_8_another_name")
+        .commit();
+    sql("REFRESH TABLE %s", tableName);
+
+    // verify the metadata tables after renaming the second partition column
+    for (MetadataTableType tableType : Arrays.asList(ENTRIES, ALL_ENTRIES)) {
+      assertPartitions(
+          ImmutableList.of(row(null, null), row(null, 2), row("b1", null), row("b1", 2)),
+          "STRUCT<data:STRING,category_bucket_8_another_name:INT>",
+          tableType);
+    }
+  }
+
+  private void assertPartitions(List<Object[]> expectedPartitions, String expectedTypeAsString,
+                                MetadataTableType tableType) throws ParseException {
+    Dataset<Row> df = loadMetadataTable(tableType);
+
+    DataType expectedType = spark.sessionState().sqlParser().parseDataType(expectedTypeAsString);
+    switch (tableType) {
+      case FILES:
+      case ALL_DATA_FILES:
+        DataType actualFilesType = df.schema().apply("partition").dataType();
+        Assert.assertEquals("Partition type must match", expectedType, actualFilesType);
+        break;
+
+      case ENTRIES:
+      case ALL_ENTRIES:
+        StructType dataFileType = (StructType) df.schema().apply("data_file").dataType();
+        DataType actualEntriesType = dataFileType.apply("partition").dataType();
+        Assert.assertEquals("Partition type must match", expectedType, actualEntriesType);
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType);
+    }
+
+    switch (tableType) {
+      case FILES:
+      case ALL_DATA_FILES:
+        List<Row> actualFilesPartitions = df.orderBy("partition")
+            .select("partition.*")
+            .collectAsList();
+        assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualFilesPartitions));
+        break;
+
+      case ENTRIES:
+      case ALL_ENTRIES:
+        List<Row> actualEntriesPartitions = df.orderBy("data_file.partition")
+            .select("data_file.partition.*")
+            .collectAsList();
+        assertEquals("Partitions must match", expectedPartitions, rowsToJava(actualEntriesPartitions));
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Unsupported metadata table type: " + tableType);
+    }
+  }
+
+  private Dataset<Row> loadMetadataTable(MetadataTableType tableType) {
+    return spark.read().format("iceberg").load(tableName + "." + tableType.name());
+  }
+
+  private void initTable() {
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, DEFAULT_FILE_FORMAT, fileFormat.name());
+    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%d')", tableName, FORMAT_VERSION, formatVersion);
+  }
+}