You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2022/08/01 23:54:31 UTC

[impala] branch master updated: IMPALA-11378: Allow INSERT OVERWRITE for bucket tranforms in some cases

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new c0b0875bd IMPALA-11378: Allow INSERT OVERWRITE for bucket tranforms in some cases
c0b0875bd is described below

commit c0b0875bda59771fb1b5c55a5eaf45f3dcfaa63c
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Tue Jun 21 20:43:35 2022 +0200

    IMPALA-11378: Allow INSERT OVERWRITE for bucket tranforms in some cases
    
    This change has been considered only for Iceberg tables mainly for table
    maintenance reasons. Iceberg table writes create new snapshots and these
    can accumulate over time. This commit allows a simple form of compaction
    of these snapshots.
    
    INSERT OVERWRITES have been blocked in case partition evolution is in
    place, because it would be possible to overwrite a data file with a
    newer schema that has less columns. This could cause unexpected data
    loss.
    
    For bucketed tables, the following syntax is allowed to be executed:
      INSERT OVERWRITE ice_tbl SELECT * FROM ice_tbl;
    The source and target table has to be the same and specified, only
    SELECT '*' queries are allowed. These requirements are also in place to
    avoid unexpected data loss.
     - Values are not allowed, because inserting a single record could
       overwrite a whole file in a bucket.
     - Only source table is allowed, because at the time of the insert it
       is unknown which files will be modified, similar to values.
    
    Testing:
     - Added e2e tests.
    
    Change-Id: Ibd1bc19d839297246eadeb754cdeeec1e306098a
    Reviewed-on: http://gerrit.cloudera.org:8080/18649
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/InsertStmt.java     | 54 +++++++++++++++++++---
 .../queries/QueryTest/iceberg-negative.test        | 36 +++++++++++++--
 .../queries/QueryTest/iceberg-overwrite.test       | 31 +++++++++++++
 tests/custom_cluster/test_events_custom_configs.py |  6 +--
 4 files changed, 112 insertions(+), 15 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 6c76f1436..c043c3c2f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -20,6 +20,7 @@ package org.apache.impala.analysis;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -602,8 +603,16 @@ public class InsertStmt extends StatementBase {
     }
 
     if (table_ instanceof FeIcebergTable) {
-      if (overwrite_) validateNoBucketTransform((FeIcebergTable)table_);
-      validateIcebergColumnsForInsert((FeIcebergTable)table_);
+      FeIcebergTable iceTable = (FeIcebergTable)table_;
+      if (overwrite_) {
+        if (iceTable.getPartitionSpecs().size() > 1) {
+          throw new AnalysisException("The Iceberg table has multiple partition specs. " +
+              "This means the outcome of dynamic partition overwrite is unforeseeable. " +
+              "Consider using TRUNCATE and INSERT INTO to overwrite your table.");
+        }
+        validateBucketTransformForOverwrite(iceTable);
+      }
+      validateIcebergColumnsForInsert(iceTable);
     }
 
     if (isHBaseTable && overwrite_) {
@@ -625,15 +634,46 @@ public class InsertStmt extends StatementBase {
     }
   }
 
-  private void validateNoBucketTransform(FeIcebergTable iceTable)
+  /**
+   * Validate if INSERT OVERWRITE could be allowed when the table has bucket partition
+   * transform. 'INSERT OVERWRITE tbl SELECT * FROM tbl' can be allowed because the source
+   * and target table is the same and the partitions are known.
+   */
+  private void validateBucketTransformForOverwrite(FeIcebergTable iceTable)
       throws AnalysisException {
+    Preconditions.checkState(overwrite_ == true);
     IcebergPartitionSpec spec = iceTable.getDefaultPartitionSpec();
     if (!spec.hasPartitionFields()) return;
     for (IcebergPartitionField field : spec.getIcebergPartitionFields()) {
-      if (field.getTransformType() == TIcebergPartitionTransformType.BUCKET) {
-          throw new AnalysisException("The Iceberg table has BUCKET partitioning. " +
-              "This means the outcome of dynamic partition overwrite is unforeseeable. " +
-              "Consider using TRUNCATE and INSERT INTO to overwrite your table.");
+      if (field.getTransformType() != TIcebergPartitionTransformType.BUCKET) continue;
+      if (queryStmt_ instanceof ValuesStmt) {
+        throw new AnalysisException("The Iceberg table has BUCKET partitioning. " +
+            "The outcome of static partition overwrite is unforeseeable. Consider " +
+            "using TRUNCATE and INSERT INTO to overwrite your table.");
+      }
+      List<TableRef> tblRefs = queryStmt_.collectTableRefs();
+      List<String> sourceTableAliases = tblRefs.size() <= 0 ? new ArrayList(0) :
+          Arrays.asList(tblRefs.get(0).getAliases());
+      String targetTableName = iceTable.getFullName();
+      if (!(tblRefs.size() == 1 && sourceTableAliases.contains(targetTableName))) {
+        throw new AnalysisException("The Iceberg table has BUCKET partitioning and " +
+            "the source table does not match the target table. This means the " +
+            "outcome of dynamic partition overwrite is unforeseeable. Consider using " +
+            "TRUNCATE and INSERT INTO to overwrite your table.");
+      }
+      SelectList selectList = ((SelectStmt)queryStmt_).selectList_;
+      if (selectList.getItems().size() != 1 && !selectList.getItems().get(0).isStar()) {
+        throw new AnalysisException("The Iceberg table has BUCKET partitioning. " +
+            "The outcome of dynamic partition overwrite is unforeseeable with the " +
+            "given select list, only '*' allowed. Otherwise consider using TRUNCATE " +
+            "and INSERT INTO to overwrite your table.");
+      }
+      if (((SelectStmt)queryStmt_).whereClause_ != null) {
+        throw new AnalysisException("The Iceberg table has BUCKET partitioning. " +
+            "The outcome of dynamic partition overwrite is unforeseeable with the " +
+            "given select query with WHERE clause, selective overwrite is not " +
+            "supported. Consider using TRUNCATE and INSERT INTO to overwrite your " +
+            "table.");
       }
     }
   }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 3a7755813..bbf62be8e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -106,13 +106,39 @@ SHOW CREATE TABLE fake_iceberg_table_hadoop_catalog;
 row_regex:.*CAUSED BY: TableLoadingException: Table does not exist: fake_db.fake_table*
 ====
 ---- QUERY
-CREATE TABLE iceberg_overwrite_bucket (i int)
+CREATE TABLE iceberg_overwrite_bucket (i int, j int)
 PARTITIONED BY SPEC (bucket(3, i))
-STORED AS ICEBERG
-TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
-INSERT OVERWRITE iceberg_overwrite_bucket VALUES (1), (2), (3);
+STORED AS ICEBERG;
+INSERT OVERWRITE iceberg_overwrite_bucket values (1, 2);
+---- CATCH
+AnalysisException: The Iceberg table has BUCKET partitioning. The outcome of static partition overwrite is unforeseeable.
+====
+---- QUERY
+INSERT OVERWRITE iceberg_overwrite_bucket SELECT 1, j FROM iceberg_overwrite_bucket;
+---- CATCH
+AnalysisException: The Iceberg table has BUCKET partitioning. The outcome of dynamic partition overwrite is unforeseeable with the given select list, only '*' allowed.
+====
+---- QUERY
+INSERT OVERWRITE iceberg_overwrite_bucket SELECT * FROM iceberg_overwrite_bucket WHERE j = 1;
+---- CATCH
+AnalysisException: The Iceberg table has BUCKET partitioning. The outcome of dynamic partition overwrite is unforeseeable with the given select query with WHERE clause
+====
+---- QUERY
+CREATE TABLE iceberg_overwrite_bucket_other (i int, j int)
+PARTITIONED BY SPEC (bucket(3, i))
+STORED AS ICEBERG;
+INSERT OVERWRITE iceberg_overwrite_bucket SELECT * FROM iceberg_overwrite_bucket_other;
+---- CATCH
+AnalysisException: The Iceberg table has BUCKET partitioning and the source table does not match the target table.
+====
+---- QUERY
+ALTER TABLE iceberg_overwrite_bucket SET PARTITION SPEC (
+  bucket(3, i),
+  truncate(3, j)
+);
+INSERT OVERWRITE iceberg_overwrite_bucket SELECT * FROM iceberg_overwrite_bucket_other;
 ---- CATCH
-AnalysisException: The Iceberg table has BUCKET partitioning.
+AnalysisException: The Iceberg table has multiple partition specs.
 ====
 ---- QUERY
 CREATE TABLE iceberg_hive_cat_with_cat_locaction (i int)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
index 4b12d3b12..b7ee35064 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
@@ -166,3 +166,34 @@ select * from ice_trunc
 ---- TYPES
 DECIMAL
 ====
+---- QUERY
+create table iceberg_overwrite_bucket (i int)
+partitioned by spec (bucket(3, i))
+stored as iceberg;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into iceberg_overwrite_bucket values (1);
+insert into iceberg_overwrite_bucket values (1);
+insert into iceberg_overwrite_bucket values (1);
+select INPUT__FILE__NAME, count(*)
+  from iceberg_overwrite_bucket
+  group by INPUT__FILE__NAME;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_overwrite_bucket/data/.*.0.parq',1
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_overwrite_bucket/data/.*.0.parq',1
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_overwrite_bucket/data/.*.0.parq',1
+---- TYPES
+STRING, BIGINT
+====
+---- QUERY
+insert overwrite iceberg_overwrite_bucket select * from iceberg_overwrite_bucket;
+select INPUT__FILE__NAME, count(*)
+  from iceberg_overwrite_bucket
+  group by INPUT__FILE__NAME;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/iceberg_overwrite_bucket/data/.*.0.parq',3
+---- TYPES
+STRING, BIGINT
+====
\ No newline at end of file
diff --git a/tests/custom_cluster/test_events_custom_configs.py b/tests/custom_cluster/test_events_custom_configs.py
index 1df40b5ae..fa0668f8e 100644
--- a/tests/custom_cluster/test_events_custom_configs.py
+++ b/tests/custom_cluster/test_events_custom_configs.py
@@ -359,6 +359,8 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
           CREATE TABLE {0} (i int) STORED AS ICEBERG
           TBLPROPERTIES ({1})""".format(tbl_name, catalog))
 
+      check_self_events("INSERT OVERWRITE {0} VALUES (1)".format(tbl_name),
+          skips_events=is_hive_catalog)
       check_self_events("ALTER TABLE {0} ADD COLUMN j INT".format(tbl_name))
       check_self_events("ALTER TABLE {0} DROP COLUMN i".format(tbl_name))
       check_self_events("ALTER TABLE {0} CHANGE COLUMN j j BIGINT".format(tbl_name))
@@ -370,9 +372,7 @@ class TestEventProcessingCustomConfigs(CustomClusterTestSuite):
       check_self_events(
           "ALTER TABLE {0} SET TBLPROPERTIES('key'='value')".format(tbl_name))
       check_self_events("ALTER TABLE {0} UNSET TBLPROPERTIES('key')".format(tbl_name))
-      check_self_events("INSERT INTO {0} VALUES (1), (2), (3)".format(tbl_name),
-          skips_events=is_hive_catalog)
-      check_self_events("INSERT OVERWRITE {0} VALUES (4), (5), (6)".format(tbl_name),
+      check_self_events("INSERT INTO {0} VALUES (2), (3), (4)".format(tbl_name),
           skips_events=is_hive_catalog)
       ctas_tbl = unique_database + ".ice_ctas"
       check_self_events("""CREATE TABLE {0} STORED AS ICEBERG