You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mb...@apache.org on 2021/08/05 10:05:54 UTC
[hive] branch master updated: HIVE-25328: Limit scope of REPLACE
COLUMNS for Iceberg tables (Marton Bod,
reviewed by Adam Szita and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0824d79 HIVE-25328: Limit scope of REPLACE COLUMNS for Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
0824d79 is described below
commit 0824d79e5708be0e4056dad8388c9dea8d659054
Author: Marton Bod <ma...@gmail.com>
AuthorDate: Thu Aug 5 12:05:44 2021 +0200
HIVE-25328: Limit scope of REPLACE COLUMNS for Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
---
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 52 +++++++++---------
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 63 ++++++++++++++++++----
2 files changed, 79 insertions(+), 36 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 01ccfd3..6301f93 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -309,9 +309,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
}
@Override
- public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context)
- throws MetaException {
- if (Boolean.valueOf(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) {
+ public void rollbackAlterTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, EnvironmentContext context) {
+ if (Boolean.parseBoolean(context.getProperties().getOrDefault(MIGRATE_HIVE_TO_ICEBERG, "false"))) {
LOG.debug("Initiating rollback for table {} at location {}",
hmsTable.getTableName(), hmsTable.getSd().getLocation());
context.getProperties().put(INITIALIZE_ROLLBACK_MIGRATION, "true");
@@ -374,7 +373,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
if (SUPPORTED_ALTER_OPS.stream().noneMatch(op -> op.equals(currentAlterTableOp))) {
throw new MetaException(
"Unsupported ALTER TABLE operation type on Iceberg table " + tableName + ", must be one of: " +
- SUPPORTED_ALTER_OPS.toString());
+ SUPPORTED_ALTER_OPS);
}
}
}
@@ -507,35 +506,36 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
}
private void handleReplaceColumns(org.apache.hadoop.hive.metastore.api.Table hmsTable) throws MetaException {
- HiveSchemaUtil.SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDiff(hmsTable.getSd().getCols(),
- HiveSchemaUtil.convert(icebergTable.schema()), true);
- if (!schemaDifference.isEmpty()) {
- updateSchema = icebergTable.updateSchema();
- } else {
- // we should get here if the user restated the exactly the existing columns in the REPLACE COLUMNS command
- LOG.info("Found no difference between new and old schema for ALTER TABLE REPLACE COLUMNS for" +
- " table: {}. There will be no Iceberg commit.", hmsTable.getTableName());
- return;
- }
+ List<FieldSchema> hmsCols = hmsTable.getSd().getCols();
+ List<FieldSchema> icebergCols = HiveSchemaUtil.convert(icebergTable.schema());
+ HiveSchemaUtil.SchemaDifference schemaDifference = HiveSchemaUtil.getSchemaDiff(hmsCols, icebergCols, true);
- for (FieldSchema droppedCol : schemaDifference.getMissingFromFirst()) {
- updateSchema.deleteColumn(droppedCol.getName());
+ // if there are columns dropped, let's remove them from the iceberg schema as well so we can compare the order
+ if (!schemaDifference.getMissingFromFirst().isEmpty()) {
+ schemaDifference.getMissingFromFirst().forEach(icebergCols::remove);
}
- for (FieldSchema addedCol : schemaDifference.getMissingFromSecond()) {
- updateSchema.addColumn(
- addedCol.getName(),
- HiveSchemaUtil.convert(TypeInfoUtils.getTypeInfoFromTypeString(addedCol.getType())),
- addedCol.getComment()
- );
+ Pair<String, Optional<String>> outOfOrder = HiveSchemaUtil.getFirstOutOfOrderColumn(
+ hmsCols, icebergCols, ImmutableMap.of());
+
+ // limit the scope of this operation to only dropping columns
+ if (!schemaDifference.getMissingFromSecond().isEmpty() || !schemaDifference.getTypeChanged().isEmpty() ||
+ !schemaDifference.getCommentChanged().isEmpty() || outOfOrder != null) {
+ throw new MetaException("Unsupported operation to use REPLACE COLUMNS for adding a column, changing a " +
+ "column type, column comment or reordering columns. Only use REPLACE COLUMNS for dropping columns. " +
+ "For the other operations, consider using the ADD COLUMNS or CHANGE COLUMN commands.");
}
- for (FieldSchema updatedCol : schemaDifference.getTypeChanged()) {
- updateSchema.updateColumn(updatedCol.getName(), getPrimitiveTypeOrThrow(updatedCol), updatedCol.getComment());
+ if (schemaDifference.getMissingFromFirst().isEmpty()) {
+ throw new MetaException("No schema change detected from REPLACE COLUMNS operations. For rectifying any schema " +
+ "mismatches between HMS and Iceberg, please consider the UPDATE COLUMNS command.");
}
- for (FieldSchema updatedCol : schemaDifference.getCommentChanged()) {
- updateSchema.updateColumnDoc(updatedCol.getName(), updatedCol.getComment());
+ updateSchema = icebergTable.updateSchema();
+ LOG.info("handleReplaceColumns: Dropping the following columns for Iceberg table {}, cols: {}",
+ hmsTable.getTableName(), schemaDifference.getMissingFromFirst());
+ for (FieldSchema droppedCol : schemaDifference.getMissingFromFirst()) {
+ updateSchema.deleteColumn(droppedCol.getName());
}
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 2bfcd08..4d01c5d 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -963,9 +963,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.createTable(shell, identifier.name(), schema, SPEC, FileFormat.PARQUET, ImmutableList.of());
shell.executeStatement("ALTER TABLE default.customers REPLACE COLUMNS " +
- "(customer_id bigint, last_name string COMMENT 'This is last name', " +
- "address struct<city:string,street:string> COMMENT 'Adding some comment', " +
- "new_col string COMMENT 'This is a new column added')");
+ "(customer_id int, last_name string COMMENT 'This is last name', " +
+ "address struct<city:string,street:string>)");
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
@@ -974,21 +973,65 @@ public class TestHiveIcebergStorageHandlerNoScan {
List<FieldSchema> hmsSchema = hmsTable.getSd().getCols();
List<FieldSchema> expectedSchema = Lists.newArrayList(
- // customer_id: type promotion (int -> bigint), no change in comment
- new FieldSchema("customer_id", "bigint", null),
+ new FieldSchema("customer_id", "int", null),
// first_name column is dropped
- // last_name: no changes
new FieldSchema("last_name", "string", "This is last name"),
- // address: comment added, no change in type
- new FieldSchema("address", "struct<city:string,street:string>", "Adding some comment"),
- // new_col: brand new column
- new FieldSchema("new_col", "string", "This is a new column added"));
+ new FieldSchema("address", "struct<city:string,street:string>", null));
Assert.assertEquals(expectedSchema, icebergSchema);
Assert.assertEquals(expectedSchema, hmsSchema);
}
@Test
+ public void testAlterTableReplaceColumnsFailsWhenNotOnlyDropping() {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+
+ Schema schema = new Schema(
+ optional(1, "customer_id", Types.IntegerType.get()),
+ optional(2, "first_name", Types.StringType.get(), "This is first name"),
+ optional(3, "last_name", Types.StringType.get(), "This is last name"),
+ optional(4, "address", Types.StructType.of(
+ optional(5, "city", Types.StringType.get()),
+ optional(6, "street", Types.StringType.get())), null)
+ );
+ testTables.createTable(shell, identifier.name(), schema, SPEC, FileFormat.PARQUET, ImmutableList.of());
+
+ // check unsupported operations
+ String[] commands = {
+ // type promotion
+ "ALTER TABLE default.customers REPLACE COLUMNS (customer_id bigint, first_name string COMMENT 'This is " +
+ "first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+ // delete a comment
+ "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string, " +
+ "last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+ // change a comment
+ "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'New docs', " +
+ "last_name string COMMENT 'This is last name', address struct<city:string,street:string>)",
+ // reorder columns
+ "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, last_name string COMMENT 'This is " +
+ "last name', first_name string COMMENT 'This is first name', address struct<city:string,street:string>)",
+ // add new column
+ "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'This is " +
+ "first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>, " +
+ "new_col timestamp)",
+ // dropping a column + reordering columns
+ "ALTER TABLE default.customers REPLACE COLUMNS (last_name string COMMENT 'This is " +
+ "last name', first_name string COMMENT 'This is first name', address struct<city:string,street:string>)"
+ };
+
+ for (String command : commands) {
+ AssertHelpers.assertThrows("", IllegalArgumentException.class,
+ "Unsupported operation to use REPLACE COLUMNS", () -> shell.executeStatement(command));
+ }
+
+ // check no-op case too
+ String command = "ALTER TABLE default.customers REPLACE COLUMNS (customer_id int, first_name string COMMENT 'This" +
+ " is first name', last_name string COMMENT 'This is last name', address struct<city:string,street:string>)";
+ AssertHelpers.assertThrows("", IllegalArgumentException.class,
+ "No schema change detected", () -> shell.executeStatement(command));
+ }
+
+ @Test
public void testAlterTableChangeColumnNameAndComment() throws TException, InterruptedException {
TableIdentifier identifier = TableIdentifier.of("default", "customers");