You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mb...@apache.org on 2021/08/05 10:05:54 UTC

[hive] branch master updated: HIVE-25328: Limit scope of REPLACE COLUMNS for Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0824d79  HIVE-25328: Limit scope of REPLACE COLUMNS for Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
0824d79 is described below

commit 0824d79e5708be0e4056dad8388c9dea8d659054
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Thu Aug 5 12:05:44 2021 +0200

    HIVE-25328: Limit scope of REPLACE COLUMNS for Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       | 52 +++++++++---------
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 63 ++++++++++++++++++----
 2 files changed, 79 insertions(+), 36 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 01ccfd3..6301f93 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -309,9 +309,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   }
 
   @Override
-  public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
-      throws MetaException {
-    if (Boolean.valueOf(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) {
+  public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) {
+    if (Boolean.parseBoolean(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) {
       LOG.debug("Initiating rollback for table {} at location {}",
           hmsTable.getTableName(), hmsTable.getSd().getLocation());
       context.getProperties().put(INITIALIZE_ROLLBACK_MIGRATION, "true");
@@ -374,7 +373,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
       if (SUPPORTED_ALTER_OPS.stream().noneMatch(op -> op.equals(currentAlterTableOp))) {
         throw new MetaException(
             "Unsupported ALTER TABLE operation type on Iceberg table " + tableName + ", must be one of: " +
-                SUPPORTED_ALTER_OPS.toString());
+                SUPPORTED_ALTER_OPS);
       }
     }
   }
@@ -507,35 +506,36 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   }
 
   private void handleReplaceColumns(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws MetaException {
-    HiveSchemaUtil.SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDiff(hmsTable.getSd().getCols(),
-        HiveSchemaUtil.convert(icebergTable.schema()), true);
-    if (!schemaDifference.isEmpty()) {
-      updateSchema = icebergTable.updateSchema();
-    } else {
-      // we should get here if the user restated the exactly the existing columns in the REPLACE COLUMNS command
-      LOG.info("Found no difference between new and old schema for ALTER TABLE REPLACE COLUMNS for" +
-          " table: {}. There will be no Iceberg commit.", hmsTable.getTableName());
-      return;
-    }
+    List<FieldSchema> hmsCols = hmsTable.getSd().getCols();
+    List<FieldSchema> icebergCols = HiveSchemaUtil.convert(icebergTable.schema());
+    HiveSchemaUtil.SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDiff(hmsCols, icebergCols, true);
 
-    for (FieldSchema droppedCol : schemaDifference.getMissingFromFirst()) {
-      updateSchema.deleteColumn(droppedCol.getName());
+    // if there are columns dropped, let's remove them from the iceberg schema as well so we can compare the order
+    if (!schemaDifference.getMissingFromFirst().isEmpty()) {
+      schemaDifference.getMissingFromFirst().forEach(icebergCols::remove);
     }
 
-    for (FieldSchema addedCol : schemaDifference.getMissingFromSecond()) {
-      updateSchema.addColumn(
-          addedCol.getName(),
-          HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType())),
-          addedCol.getComment()
-      );
+    Pair<String, Optional<String>> outOfOrder = HiveSchemaUtil.getFirstOutOfOrderColumn(
+        hmsCols, icebergCols, ImmutableMap.of());
+
+    // limit the scope of this operation to only dropping columns
+    if (!schemaDifference.getMissingFromSecond().isEmpty() || !schemaDifference.getTypeChanged().isEmpty() ||
+        !schemaDifference.getCommentChanged().isEmpty() || outOfOrder != null) {
+      throw new MetaException("Unsupported operation to use REPLACE COLUMNS for adding a column, changing a " +
+          "column type, column comment or reordering columns. Only use REPLACE COLUMNS for dropping columns. " +
+          "For the other operations, consider using the ADD COLUMNS or CHANGE COLUMN commands.");
     }
 
-    for (FieldSchema updatedCol : schemaDifference.getTypeChanged()) {
-      updateSchema.updateColumn(updatedCol.getName(), getPrimitiveTypeOrThrow(updatedCol), updatedCol.getComment());
+    if (schemaDifference.getMissingFromFirst().isEmpty()) {
+      throw new MetaException("No schema change detected from REPLACE COLUMNS operations. For rectifying any schema " +
+          "mismatches between HMS and Iceberg, please consider the UPDATE COLUMNS command.");
     }
 
-    for (FieldSchema updatedCol : schemaDifference.getCommentChanged()) {
-      updateSchema.updateColumnDoc(updatedCol.getName(), updatedCol.getComment());
+    updateSchema = icebergTable.updateSchema();
+    LOG.info("handleReplaceColumns: Dropping the following columns for Iceberg table {}, cols: {}",
+        hmsTable.getTableName(), schemaDifference.getMissingFromFirst());
+    for (FieldSchema droppedCol : schemaDifference.getMissingFromFirst()) {
+      updateSchema.deleteColumn(droppedCol.getName());
     }
   }
 
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 2bfcd08..4d01c5d 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -963,9 +963,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
     testTables.createTable(shell, identifier.name(), schema, SPEC, FileFormat.PARQUET, ImmutableList.of());
 
     shell.executeStatement("ALTER TABLE default.customers REPLACE COLUMNS " +
-        "(customer_id bigint, last_name string COMMENT 'This is last name', " +
-        "address struct<city:string,street:string> COMMENT 'Adding some comment', " +
-        "new_col string COMMENT 'This is a new column added')");
+        "(customer_id int, last_name string COMMENT 'This is last name', " +
+        "address struct<city:string,street:string>)");
 
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
     org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
@@ -974,21 +973,65 @@ public class TestHiveIcebergStorageHandlerNoScan {
     List<FieldSchema> hmsSchema = hmsTable.getSd().getCols();
 
     List<FieldSchema> expectedSchema = Lists.newArrayList(
-        // customer_id: type promotion (int -> bigint), no change in comment
-        new FieldSchema("customer_id", "bigint", null),
+        new FieldSchema("customer_id", "int", null),
         // first_name column is dropped
-        // last_name: no changes
         new FieldSchema("last_name", "string", "This is last name"),
-        // address: comment added, no change in type
-        new FieldSchema("address", "struct<city:string,street:string>", "Adding some comment"),
-        // new_col: brand new column
-        new FieldSchema("new_col", "string", "This is a new column added"));
+        new FieldSchema("address", "struct<city:string,street:string>", null));
 
     Assert.assertEquals(expectedSchema, icebergSchema);
     Assert.assertEquals(expectedSchema, hmsSchema);
   }
 
   @Test
+  public void testAlterTableReplaceColumnsFailsWhenNotOnlyDropping() {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+    Schema schema = new Schema(
+        optional(1, "customer_id", Types.IntegerType.get()),
+        optional(2, "first_name", Types.StringType.get(), "This is first name"),
+        optional(3, "last_name", Types.StringType.get(), "This is last name"),
+        optional(4, "address",  Types.StructType.of(
+            optional(5, "city", Types.StringType.get()),
+            optional(6, "street", Types.StringType.get())), null)
+    );
+    testTables.createTable(shell, identifier.name(), schema, SPEC, FileFormat.PARQUET, ImmutableList.of());
+
+    // check unsupported operations
+    String[] commands = {
+        // type promotion
+        "ALTER TABLE default.customers REPLACE COLUMNS (customer_id bigint, first_name string COMMENT 'This is " +
+            "first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+        // delete a comment
+        "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string, " +
+            "last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+        // change a comment
+        "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'New docs', " +
+            "last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+        // reorder columns
+        "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, last_name string COMMENT 'This is " +
+            "last name', first_name string COMMENT 'This is first name', address struct<city:string,street:string>)",
+        // add new column
+        "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'This is " +
+            "first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>, " +
+            "new_col timestamp)",
+        // dropping a column + reordering columns
+        "ALTER TABLE default.customers REPLACE COLUMNS (last_name string COMMENT 'This is " +
+            "last name', first_name string COMMENT 'This is first name', address struct<city:string,street:string>)"
+    };
+
+    for (String command : commands) {
+      AssertHelpers.assertThrows("", IllegalArgumentException.class,
+          "Unsupported operation to use REPLACE COLUMNS", () -> shell.executeStatement(command));
+    }
+
+    // check no-op case too
+    String command = "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'This" +
+        " is first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>)";
+    AssertHelpers.assertThrows("", IllegalArgumentException.class,
+        "No schema change detected", () -> shell.executeStatement(command));
+  }
+
+  @Test
   public void testAlterTableChangeColumnNameAndComment() throws TException, InterruptedException {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");