You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by mb...@apache.org on 2022/01/07 15:34:23 UTC
[hive] branch master updated: HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e8effb5 HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
e8effb5 is described below
commit e8effb585db81edd809d1cb783ce22b079caa264
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Fri Jan 7 16:34:04 2022 +0100
HIVE-25849: Disable insert overwrite for bucket partitioned Iceberg tables (Marton Bod, reviewed by Adam Szita and Peter Vary)
---
.../apache/iceberg/mr/hive/HiveIcebergStorageHandler.java | 13 +++++++++++++
.../java/org/apache/iceberg/mr/hive/IcebergTableUtil.java | 4 ++++
.../apache/iceberg/mr/hive/TestHiveIcebergInserts.java | 15 +++++++++++++++
.../hadoop/hive/ql/metadata/HiveStorageHandler.java | 11 +++++++++++
.../org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java | 12 ++++++++++++
5 files changed, 55 insertions(+)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 60f9d42..676cb6a 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -48,11 +48,13 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDynamicListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -369,6 +371,17 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
return new URI(ICEBERG_URI_PREFIX + table.location());
}
+ @Override
+ public void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
+ HiveStorageHandler.super.validateSinkDesc(sinkDesc);
+ if (sinkDesc.getInsertOverwrite()) {
+ Table table = IcebergTableUtil.getTable(conf, sinkDesc.getTableInfo().getProperties());
+ if (IcebergTableUtil.isBucketed(table)) {
+ throw new SemanticException("Cannot perform insert overwrite query on bucket partitioned Iceberg table.");
+ }
+ }
+ }
+
private void setCommonJobConf(JobConf jobConf) {
jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index d201099..9a1f316 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -194,4 +194,8 @@ public class IcebergTableUtil {
updatePartitionSpec.commit();
}
+
+ public static boolean isBucketed(Table table) {
+ return table.spec().fields().stream().anyMatch(f -> f.transform().toString().startsWith("bucket["));
+ }
}
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 5222361..9b8def1 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.mr.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -169,6 +170,20 @@ public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineB
HiveIcebergTestUtils.validateData(table, expected, 0);
}
+ @Test
+ public void testInsertOverwriteBucketPartitionedTableThrowsError() {
+ TableIdentifier target = TableIdentifier.of("default", "target");
+ PartitionSpec spec = PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+ .bucket("last_name", 16).identity("customer_id").build();
+ testTables.createTable(shell, target.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+ spec, fileFormat, ImmutableList.of());
+
+ AssertHelpers.assertThrows("IOW should not work on bucket partitioned table", IllegalArgumentException.class,
+ "Cannot perform insert overwrite query on bucket partitioned Iceberg table",
+ () -> shell.executeStatement(
+ testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, target, true)));
+ }
+
/**
* Testing map-reduce inserts.
* @throws IOException If there is an underlying IOException
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b2f5a1a..0e38574 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
@@ -352,4 +354,13 @@ public interface HiveStorageHandler extends Configurable {
return new URI(this.getClass().getSimpleName().toLowerCase() + "://" +
HiveCustomStorageHandlerUtils.getTablePropsForCustomStorageHandler(tableProperties));
}
+
+ /**
+ * Validates whether the sink operation is permitted for the specific storage handler, based
+ * on information contained in the sinkDesc.
+ * @param sinkDesc The sink descriptor
+ * @throws SemanticException if the sink operation is not allowed
+ */
+ default void validateSinkDesc(FileSinkDesc sinkDesc) throws SemanticException {
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index bbf1258..d1d5ee3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -12587,6 +12587,18 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ // validate if this sink operation is allowed for non-native tables
+ if (sinkOp instanceof FileSinkOperator) {
+ FileSinkOperator fileSinkOperator = (FileSinkOperator) sinkOp;
+ Optional<HiveStorageHandler> handler = Optional.ofNullable(fileSinkOperator)
+ .map(FileSinkOperator::getConf)
+ .map(FileSinkDesc::getTable)
+ .map(Table::getStorageHandler);
+ if (handler.isPresent()) {
+ handler.get().validateSinkDesc(fileSinkOperator.getConf());
+ }
+ }
+
// Check query results cache
// In the case that row or column masking/filtering was required, we do not support caching.
// TODO: Enable caching for queries with masking/filtering