You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/08/11 05:48:29 UTC

[impala] 01/03: IMPALA-11408: Fill missing partition columns when INSERT INTO iceberg_tbl (col_list)

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 0e3e4b57a1c164f34cb24af854aa67f4cfb6f251
Author: LPL <li...@sensorsdata.cn>
AuthorDate: Fri Jul 1 10:51:27 2022 +0800

    IMPALA-11408: Fill missing partition columns when INSERT INTO iceberg_tbl (col_list)
    
    In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
    partition columns of Iceberg table are not in the columns permutation,
    in order for data to be written to the default
    partition '__HIVE_DEFAULT_PARTITION__' we will fill the missing
    partition columns with NullLiteral.
    
    Testing:
     - add e2e tests
    
    Change-Id: I40c733755d65e5c81a12ffe09b6d16ed5d115368
    Reviewed-on: http://gerrit.cloudera.org:8080/18790
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/InsertStmt.java     |  51 ++++--
 .../java/org/apache/impala/util/IcebergUtil.java   |  11 ++
 .../iceberg-partition-transform-insert.test        | 171 +++++++++++++++++++++
 .../QueryTest/iceberg-partitioned-insert.test      | 118 ++++++++++++++
 4 files changed, 342 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index c043c3c2f..97ab4202d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -17,15 +17,23 @@
 
 package org.apache.impala.analysis;
 
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.types.Types;
+
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.BuiltinsDb;
 import org.apache.impala.catalog.Column;
@@ -54,12 +62,6 @@ import org.apache.impala.rewrite.ExprRewriter;
 import org.apache.impala.thrift.TIcebergPartitionTransformType;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.util.IcebergUtil;
-import org.apache.thrift.TBaseHelper;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
 
 /**
  * Representation of a single insert or upsert statement, including the select statement
@@ -929,7 +931,9 @@ public class InsertStmt extends StatementBase {
           ++parts;
         }
       }
-      Preconditions.checkState(partitionKeyExprs_.size() == parts);
+      if (CollectionUtils.isEmpty(columnPermutation_)) {
+        Preconditions.checkState(partitionKeyExprs_.size() == parts);
+      }
     }
     else if (isKuduTable) {
       Preconditions.checkState(
@@ -972,7 +976,19 @@ public class InsertStmt extends StatementBase {
           } else {
             // Unmentioned non-clustering columns get NULL literals with the appropriate
             // target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
-            resultExprs_.add(NullLiteral.create(tblColumn.getType()));
+            NullLiteral nullExpr = NullLiteral.create(tblColumn.getType());
+            resultExprs_.add(nullExpr);
+            // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
+            // partition columns are not in the columnPermutation_, we should fill it
+            // with NullLiteral to partitionKeyExprs_ (IMPALA-11408).
+            if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)
+                && icebergPartSpec != null) {
+              IcebergColumn targetColumn = (IcebergColumn) tblColumn;
+              if (IcebergUtil.isPartitionColumn(targetColumn, icebergPartSpec)) {
+                partitionKeyExprs_.add(nullExpr);
+                partitionColPos_.add(targetColumn.getPosition());
+              }
+            }
           }
         }
       }
@@ -985,6 +1001,23 @@ public class InsertStmt extends StatementBase {
       }
     }
 
+    // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), to ensure that data is
+    // written to the correct partition, we need to make sure that the partitionKeyExprs_
+    // is in ascending order according to the column position of the Iceberg tables.
+    if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)) {
+      List<Pair<Integer, Expr>> exprPairs = Lists.newArrayList();
+      for (int i = 0; i < partitionColPos_.size(); i++) {
+        exprPairs.add(Pair.create(partitionColPos_.get(i), partitionKeyExprs_.get(i)));
+      }
+      exprPairs.sort(Comparator.comparingInt(p -> p.first));
+      partitionColPos_.clear();
+      partitionKeyExprs_.clear();
+      for (Pair<Integer, Expr> exprPair : exprPairs) {
+        partitionColPos_.add(exprPair.first);
+        partitionKeyExprs_.add(exprPair.second);
+      }
+    }
+
     if (table_ instanceof FeKuduTable) {
       Preconditions.checkState(!primaryKeyExprs_.isEmpty());
     }
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index ace59ae2a..f0ec43185 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -67,6 +67,7 @@ import org.apache.impala.analysis.TimeTravelSpec.Kind;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergColumn;
 import org.apache.impala.catalog.IcebergTable;
 import org.apache.impala.catalog.TableLoadingException;
 import org.apache.impala.catalog.iceberg.IcebergCatalog;
@@ -984,4 +985,14 @@ public class IcebergUtil {
       }
     });
   }
+
+  public static boolean isPartitionColumn(IcebergColumn column,
+      IcebergPartitionSpec spec) {
+    for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
+      if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
+      if (column.getFieldId() != partField.getSourceId()) continue;
+      return true;
+    }
+    return false;
+  }
 }
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
index 971d7a776..f2f725e09 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
@@ -1,5 +1,176 @@
 ====
 ---- QUERY
+create table ice_part_transform (col_i int, col_str string, col_ts timestamp)
+partitioned by spec (
+  bucket(3, col_i),
+  truncate(1, col_str),
+  day(col_ts)
+)
+stored as iceberg;
+====
+---- QUERY
+insert into
+  ice_part_transform
+values
+  (2, 'two', '2001-02-03 07:08:00'),
+  (1, 'one', '2001-01-03 07:08:00'),
+  (3, 'three', '2002-03-03 07:08:00'),
+  (5, 'five', '2003-05-03 07:08:00');
+select col_i,col_str,col_ts from ice_part_transform order by 1,2,3;
+---- RESULTS
+1,'one',2001-01-03 07:08:00
+2,'two',2001-02-03 07:08:00
+3,'three',2002-03-03 07:08:00
+5,'five',2003-05-03 07:08:00
+---- TYPES
+int,string,timestamp
+====
+---- QUERY
+insert into ice_part_transform(col_i) values (0), (4);
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_str is null
+  and col_ts is null
+order by
+  1, 2, 3;
+---- RESULTS
+0,'NULL',NULL
+4,'NULL',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_str) values ('zero'), ('four');
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_i is null
+  and col_ts is null
+order by
+  1, 2, 3;
+---- RESULTS
+NULL,'four',NULL
+NULL,'zero',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_ts) values ('2001-04-03 07:08:00'), ('2001-05-03 07:08:00');
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_i is null
+  and col_str is null
+order by
+  1, 2, 3;
+---- RESULTS
+NULL,'NULL',2001-04-03 07:08:00
+NULL,'NULL',2001-05-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_i, col_str) values (2, 'two'), (1, 'one');
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_i is not null
+  and col_str is not null
+  and col_ts is null
+order by
+  1, 2, 3;
+---- RESULTS
+1,'one',NULL
+2,'two',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_i, col_ts) values (2, '2001-02-03 07:08:00'), (1, '2001-01-03 07:08:00');
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_i is not null
+  and col_str is null
+  and col_ts is not null
+order by
+  1, 2, 3;
+---- RESULTS
+1,'NULL',2001-01-03 07:08:00
+2,'NULL',2001-02-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_str, col_ts) values ('two', '2001-02-03 07:08:00'), ('one', '2001-01-03 07:08:00');
+select
+  col_i, col_str, col_ts
+from
+  ice_part_transform
+where
+  col_i is null
+  and col_str is not null
+  and col_ts is not null
+order by
+  1, 2, 3;
+---- RESULTS
+NULL,'one',2001-01-03 07:08:00
+NULL,'two',2001-02-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+show files in ice_part_transform;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=2002-03-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=1/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=f/col_ts_day=2003-05-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=o/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=o/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-04-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-05-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=f/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=o/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=t/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=z/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+---- TYPES
+string, string, string
+====
+---- QUERY
 # Test partitioned INSERTs with single column that is also
 # the partitioned column. Partition transform is BUCKET.
 create table single_col_bucket (s string)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index fa0c7b413..ee7355170 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -76,6 +76,56 @@ select * from ice_multi_part;
 INT, DATE, STRING
 ====
 ---- QUERY
+insert into
+  ice_multi_part (d, s)
+values
+  ('2022-07-26', 'third'),
+  ('2022-07-27', 'fourth'),
+  ('2022-07-28', 'fifth');
+select * from ice_multi_part order by d;
+---- RESULTS
+1,2020-12-07,'first'
+2,2020-12-08,'second'
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+---- TYPES
+INT, DATE, STRING
+====
+---- QUERY
+insert into
+  ice_multi_part (i, s)
+values
+  (6, 'sixth'),
+  (7, 'seventh');
+select * from ice_multi_part order by d, i;
+---- RESULTS
+1,2020-12-07,'first'
+2,2020-12-08,'second'
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+6,NULL,'sixth'
+7,NULL,'seventh'
+---- TYPES
+INT, DATE, STRING
+====
+---- QUERY
+show files in ice_multi_part;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=1/d=2020-12-07/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=2/d=2020-12-08/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=6/d=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=7/d=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-26/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-27/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-28/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
 select * from ice_multi_part
 where d = '2020-12-08';
 ---- RESULTS
@@ -86,6 +136,24 @@ INT, DATE, STRING
 aggregation(SUM, RowsRead): 1
 ====
 ---- QUERY
+select
+  *
+from
+  ice_multi_part
+where
+  i is null
+order by
+  d;
+---- RESULTS
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+---- TYPES
+INT, DATE, STRING
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
 # Test that Impala only writes one file per partitions.
 create table ice_bigints (i BIGINT, j BIGINT, k BIGINT)
 partitioned by spec (i, j)
@@ -263,6 +331,56 @@ BIGINT
 aggregation(SUM, NumRowGroups): 0
 ====
 ---- QUERY
+create table alltypes_part_2 like alltypes_part;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+  alltypes_part_2(
+    id,
+    int_col,
+    bigint_col,
+    float_col,
+    double_col,
+    date_col,
+    string_col,
+    timestamp_col
+  )
+select
+  id,
+  int_col,
+  bigint_col,
+  float_col,
+  double_col,
+  CAST(date_string_col as date FORMAT 'MM/DD/YY'),
+  string_col,
+  timestamp_col
+from
+  functional.alltypestiny;
+select count(*) from alltypes_part;
+---- RESULTS
+8
+---- TYPES
+BIGINT
+====
+---- QUERY
+show files in alltypes_part_2;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=0/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-01-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=1/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-01-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=2/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-02-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=3/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-02-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=4/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-03-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=5/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-03-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=6/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-04-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=7/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-04-01/string_col=1/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
 # Iceberg partitions independent of column order
 ---- QUERY
 # Test inserts with multple partition columns.