You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mb...@apache.org on 2022/01/07 15:34:23 UTC

[hive] branch master updated: HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e8effb5  HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
e8effb5 is described below

commit e8effb585db81edd809d1cb783ce22b079caa264
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Fri Jan 7 16:34:04 2022 +0100

    HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
---
 .../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 13 +++++++++++++
 .../java/org/apache/iceberg/mr/hive/IcebergTableUtil.java |  4 ++++
 .../apache/iceberg/mr/hive/TestHiveIcebergInserts.java    | 15 +++++++++++++++
 .../hadoop/hive/ql/metadata/HiveStorageHandler.java       | 11 +++++++++++
 .../org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 ++++++++++++
 5 files changed, 55 insertions(+)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 60f9d42..676cb6a 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -48,11 +48,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
 import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -369,6 +371,17 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     return new URI(ICEBERG_URI_PREFIX + table.location());
   }
 
+  @Override
+  public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
+    HiveStorageHandler.super.validateSinkDesc(sinkDesc);
+    if (sinkDesc.getInsertOverwrite()) {
+      Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
+      if (IcebergTableUtil.isBucketed(table)) {
+        throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
+      }
+    }
+  }
+
   private void setCommonJobConf(JobConf jobConf) {
     jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
   }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index d201099..9a1f316 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -194,4 +194,8 @@ public class IcebergTableUtil {
 
     updatePartitionSpec.commit();
   }
+
+  public static boolean isBucketed(Table table) {
+    return table.spec().fields().stream().anyMatch(f -> f.transform().toString().startsWith("bucket["));
+  }
 }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 5222361..9b8def1 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -169,6 +170,20 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB
     HiveIcebergTestUtils.validateData(table, expected, 0);
   }
 
+  @Test
+  public void testInsertOverwriteBucketPartitionedTableThrowsError() {
+    TableIdentifier target = TableIdentifier.of("default", "target");
+    PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .bucket("last_name", 16).identity("customer_id").build();
+    testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        spec, fileFormat, ImmutableList.of());
+
+    AssertHelpers.assertThrows("IOW should not work on bucket partitioned table", IllegalArgumentException.class,
+        "Cannot perform insert overwrite query on bucket partitioned Iceberg table",
+        () -> shell.executeStatement(
+            testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, target, true)));
+  }
+
   /**
    * Testing map-reduce inserts.
    * @throws IOException If there is an underlying IOException
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b2f5a1a..0e38574 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -352,4 +354,13 @@ public interface HiveStorageHandler extends Configurable {
     return new URI(this.getClass().getSimpleName().toLowerCase() + "://" +
         HiveCustomStorageHandlerUtils.getTablePropsForCustomStorageHandler(tableProperties));
   }
+
+  /**
+   * Validates whether the sink operation is permitted for the specific storage handler, based
+   * on information contained in the sinkDesc.
+   * @param sinkDesc The sink descriptor
+   * @throws SemanticException if the sink operation is not allowed
+   */
+  default void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bbf1258..d1d5ee3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -12587,6 +12587,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     }
 
+    // validate if this sink operation is allowed for non-native tables
+    if (sinkOp instanceof FileSinkOperator) {
+      FileSinkOperator fileSinkOperator = (FileSinkOperator) sinkOp;
+      Optional<HiveStorageHandler> handler = Optional.ofNullable(fileSinkOperator)
+          .map(FileSinkOperator::getConf)
+          .map(FileSinkDesc::getTable)
+          .map(Table::getStorageHandler);
+      if (handler.isPresent()) {
+         handler.get().validateSinkDesc(fileSinkOperator.getConf());
+      }
+    }
+
     // Check query results cache
     // In the case that row or column masking/filtering was required, we do not support caching.
     // TODO: Enable caching for queries with masking/filtering