You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2021/02/05 22:11:50 UTC

[impala] 01/02: IMPALA-10223: Implement INSERT OVERWRITE for Iceberg tables

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a3f441193d25c6ca721d59ba88129b643b8ad69f
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Mon Feb 1 11:44:00 2021 +0100

    IMPALA-10223: Implement INSERT OVERWRITE for Iceberg tables
    
    This patch adds support for INSERT OVERWRITE statements for
    Iceberg tables. We use Iceberg's ReplacePartitions interface
    for this. This interface provides consistent behavior with
    INSERT OVERWRITEs against regular tables. It's also consistent
    with other engines dynamic overwrites, e.g. Spark.
    
    INSERT OVERWRITE for partitioned tables replaces the partitions
    affected by the INSERT, while keeping the other partitions
    untouched.
    
    INSERT OVERWRITE is prohibited for tables that use the BUCKET
    partition transform because it would randomly overwrite table
    data.
    
    Testing
     * added e2e test
    
    Change-Id: Idf4acfb54cf62a3f3b2e8db9d04044580151299c
    Reviewed-on: http://gerrit.cloudera.org:8080/17012
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/client-request-state.cc             |   1 +
 common/thrift/CatalogService.thrift                |   3 +
 .../org/apache/impala/analysis/InsertStmt.java     |  17 ++-
 .../impala/service/IcebergCatalogOpExecutor.java   |  54 ++++++-
 .../queries/QueryTest/iceberg-negative.test        |   7 +-
 .../queries/QueryTest/iceberg-overwrite.test       | 168 +++++++++++++++++++++
 tests/query_test/test_iceberg.py                   |   4 +
 7 files changed, 245 insertions(+), 9 deletions(-)

diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index de75e79..7f455a3 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1298,6 +1298,7 @@ Status ClientRequestState::UpdateCatalog() {
         TIcebergOperationParam& ice_op = catalog_update.iceberg_operation;
         ice_op.__set_spec_id(finalize_params.spec_id);
         ice_op.__set_iceberg_data_files_fb(createIcebergDataFilesVector(*dml_exec_state));
+        ice_op.__set_is_overwrite(finalize_params.is_overwrite);
       }
 
       Status cnxn_status;
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 606d07c..b314369 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -201,6 +201,9 @@ struct TIcebergOperationParam {
 
   // Iceberg data files to append to the table, encoded in FlatBuffers.
   2: required list<binary> iceberg_data_files_fb;
+
+  // Is overwrite operation
+  3: required bool is_overwrite = false;
 }
 
 // Updates the metastore with new partition information and returns a response
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 0b796d8..c880697 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -549,9 +549,7 @@ public class InsertStmt extends StatementBase {
     }
 
     if (table_ instanceof FeIcebergTable) {
-      if (overwrite_) {
-        throw new AnalysisException("INSERT OVERWRITE not supported for Iceberg tables.");
-      }
+      if (overwrite_) validateNoBucketTransform((FeIcebergTable)table_);
       validateIcebergColumnsForInsert((FeIcebergTable)table_);
     }
 
@@ -574,6 +572,19 @@ public class InsertStmt extends StatementBase {
     }
   }
 
+  private void validateNoBucketTransform(FeIcebergTable iceTable)
+      throws AnalysisException {
+    IcebergPartitionSpec spec = iceTable.getDefaultPartitionSpec();
+    if (!spec.hasPartitionFields()) return;
+    for (IcebergPartitionField field : spec.getIcebergPartitionFields()) {
+      if (field.getTransformType() == TIcebergPartitionTransformType.BUCKET) {
+          throw new AnalysisException("The Iceberg table has BUCKET partitioning. " +
+              "This means the outcome of dynamic partition overwrite is unforeseeable. " +
+              "Consider using TRUNCATE and INSERT INTO to overwrite your table.");
+      }
+    }
+  }
+
   private void analyzeWriteAccess() throws AnalysisException {
     if (!(table_ instanceof FeFsTable)) return;
     FeFsTable fsTable = (FeFsTable) table_;
diff --git a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index db44aef..c4d0259 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -165,6 +166,48 @@ public class IcebergCatalogOpExecutor {
   }
 
   /**
+   * An auxiliary interface for the Append and Overwrite operations.
+   */
+  private static interface BatchWrite {
+    public void addFile(DataFile file);
+    public void commit();
+  }
+
+  private static class Append implements BatchWrite {
+    final private AppendFiles append;
+    public Append(org.apache.iceberg.Table tbl) {
+      append = tbl.newAppend();
+    }
+
+    @Override
+    public void addFile(DataFile file) {
+      append.appendFile(file);
+    }
+
+    @Override
+    public void commit() {
+      append.commit();
+    }
+  }
+
+  private static class DynamicOverwrite implements BatchWrite {
+    final private ReplacePartitions replace;
+    public DynamicOverwrite(org.apache.iceberg.Table tbl) {
+      replace = tbl.newReplacePartitions();
+    }
+
+    @Override
+    public void addFile(DataFile file) {
+      replace.addFile(file);
+    }
+
+    @Override
+    public void commit() {
+      replace.commit();
+    }
+  }
+
+  /**
    * Append the newly inserted data files to the Iceberg table using the AppendFiles
    * API.
    */
@@ -174,7 +217,12 @@ public class IcebergCatalogOpExecutor {
     org.apache.iceberg.Table nativeIcebergTable =
         IcebergUtil.loadTable(feIcebergTable);
     List<ByteBuffer> dataFilesFb = icebergOp.getIceberg_data_files_fb();
-    AppendFiles append = nativeIcebergTable.newAppend();
+    BatchWrite batchWrite;
+    if (icebergOp.isIs_overwrite()) {
+      batchWrite = new DynamicOverwrite(nativeIcebergTable);
+    } else {
+      batchWrite = new Append(nativeIcebergTable);
+    }
     for (ByteBuffer buf : dataFilesFb) {
       FbIcebergDataFile dataFile = FbIcebergDataFile.getRootAsFbIcebergDataFile(buf);
       PartitionSpec partSpec = nativeIcebergTable.specs().get(icebergOp.getSpec_id());
@@ -188,9 +236,9 @@ public class IcebergCatalogOpExecutor {
           partSpec.partitionType(),
           feIcebergTable.getDefaultPartitionSpec(), dataFile.partitionPath());
       if (partitionData != null) builder.withPartition(partitionData);
-      append.appendFile(builder.build());
+      batchWrite.addFile(builder.build());
     }
-    append.commit();
+    batchWrite.commit();
   }
 
   /**
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
index 6e5e563..bd9d033 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-negative.test
@@ -101,12 +101,13 @@ AS SELECT id FROM functional.alltypes;
 AnalysisException: CREATE TABLE AS SELECT does not support the (ICEBERG) file format.
 ====
 ---- QUERY
-CREATE TABLE iceberg_overwrite (i int)
+CREATE TABLE iceberg_overwrite_bucket (i int)
+PARTITION BY SPEC (i bucket 3)
 STORED AS ICEBERG
 TBLPROPERTIES('iceberg.catalog'='hadoop.tables');
-INSERT OVERWRITE iceberg_overwrite VALUES (1), (2), (3);
+INSERT OVERWRITE iceberg_overwrite_bucket VALUES (1), (2), (3);
 ---- CATCH
-AnalysisException: INSERT OVERWRITE not supported for Iceberg tables.
+AnalysisException: The Iceberg table has BUCKET partitioning.
 ====
 ---- QUERY
 CREATE TABLE iceberg_hive_cat_with_cat_locaction (i int)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
new file mode 100644
index 0000000..3ad743b
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-overwrite.test
@@ -0,0 +1,168 @@
+====
+---- QUERY
+# Create unpartitioned table for INSERT OVERWRITE
+create table ice_nopart (i int, j int)
+stored as iceberg;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+INSERT INTO ice_nopart VALUES (1, 2);
+SELECT * FROM ice_nopart;
+---- RESULTS
+1,2
+---- TYPES
+INT,INT
+====
+---- QUERY
+INSERT OVERWRITE ice_nopart VALUES (10, 20);
+SELECT * FROM ice_nopart;
+---- RESULTS
+10,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+INSERT OVERWRITE ice_nopart select cast(i+1 as int), cast(j+1 as int) from ice_nopart;
+SELECT * FROM ice_nopart;
+---- RESULTS
+11,21
+---- TYPES
+INT,INT
+====
+---- QUERY
+# INSERT empty result set clears table.
+INSERT OVERWRITE ice_nopart select * from ice_nopart where false;
+select * from ice_nopart;
+---- RESULTS
+====
+---- QUERY
+# Create identity-partitioned table for INSERT OVERWRITE
+create table ice_ident (i int)
+partitioned by (j int)
+stored as iceberg;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+INSERT INTO ice_ident VALUES (1, 2);
+SELECT * FROM ice_ident;
+---- RESULTS
+1,2
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Add values to a new partition keeping the old ones.
+INSERT OVERWRITE ice_ident VALUES (10, 20);
+SELECT * FROM ice_ident;
+---- RESULTS
+1,2
+10,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+# INSERT only updates the affected partition.
+INSERT OVERWRITE ice_ident select cast(i+1 as int), j from ice_ident where j = 2;
+SELECT * FROM ice_ident;
+---- RESULTS
+2,2
+10,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+# INSERT empty result set has no effect on partitioned table.
+INSERT OVERWRITE ice_ident select * from ice_ident where false;
+select * from ice_ident;
+---- RESULTS
+2,2
+10,20
+---- TYPES
+INT,INT
+====
+---- QUERY
+# Create DAY-partitioned table for INSERT OVERWRITE
+create table ice_day (ts timestamp)
+partition by spec (ts DAY)
+stored as iceberg;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_day values ('2021-02-01 16:59:36.630928000');
+insert into ice_day values ('2021-02-02 16:59:36.630928000');
+insert into ice_day values ('2021-02-02 16:59:39.630928000');
+insert into ice_day values ('2021-02-03 16:59:36.630928000');
+====
+---- QUERY
+select * from ice_day;
+---- RESULTS
+2021-02-01 16:59:36.630928000
+2021-02-02 16:59:36.630928000
+2021-02-02 16:59:39.630928000
+2021-02-03 16:59:36.630928000
+---- TYPES
+TIMESTAMP
+====
+---- QUERY
+# Update data for partition '2021-02-02'.
+insert overwrite ice_day values ('2021-02-02 00:00:00');
+select * from ice_day;
+---- RESULTS
+2021-02-01 16:59:36.630928000
+2021-02-02 00:00:00
+2021-02-03 16:59:36.630928000
+---- TYPES
+TIMESTAMP
+====
+---- QUERY
+# INSERT empty result set has no effect on partitioned table.
+INSERT OVERWRITE ice_day select * from ice_day where false;
+select * from ice_day;
+---- RESULTS
+2021-02-01 16:59:36.630928000
+2021-02-02 00:00:00
+2021-02-03 16:59:36.630928000
+---- TYPES
+TIMESTAMP
+====
+---- QUERY
+# Create TRUNCATE-partitioned table for INSERT OVERWRITE
+create table ice_trunc (d decimal(10, 2))
+partition by spec (d TRUNCATE 100)
+stored as iceberg;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into ice_trunc values (1.11);
+insert into ice_trunc values (1.12);
+insert into ice_trunc values (2.22);
+insert into ice_trunc values (3.33);
+====
+---- QUERY
+select * from ice_trunc;
+---- RESULTS
+1.11
+1.12
+2.22
+3.33
+---- TYPES
+DECIMAL
+====
+---- QUERY
+insert overwrite ice_trunc values(1.88), (1.9), (3.99), (4.44), (4.45), (5);
+select * from ice_trunc
+---- RESULTS
+1.88
+1.90
+2.22
+3.99
+4.44
+4.45
+5.00
+---- TYPES
+DECIMAL
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 514a9bd..19177af 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -72,6 +72,10 @@ class TestIcebergTable(ImpalaTestSuite):
     self.run_test_case('QueryTest/iceberg-partitioned-insert', vector,
         use_db=unique_database)
 
+  @SkipIf.not_hdfs
+  def test_insert_overwrite(self, vector, unique_database):
+    self.run_test_case('QueryTest/iceberg-overwrite', vector, use_db=unique_database)
+
   def test_partition_transform_insert(self, vector, unique_database):
     self.run_test_case('QueryTest/iceberg-partition-transform-insert', vector,
         use_db=unique_database)