You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/08/11 05:48:29 UTC
[impala] 01/03: IMPALA-11408: Fill missing partition columns when INSERT INTO iceberg_tbl (col_list)
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 0e3e4b57a1c164f34cb24af854aa67f4cfb6f251
Author: LPL <li...@sensorsdata.cn>
AuthorDate: Fri Jul 1 10:51:27 2022 +0800
IMPALA-11408: Fill missing partition columns when INSERT INTO iceberg_tbl (col_list)
In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
partition columns of Iceberg table are not in the columns permutation,
in order for data to be written to the default
partition '__HIVE_DEFAULT_PARTITION__' we will fill the missing
partition columns with NullLiteral.
Testing:
- add e2e tests
Change-Id: I40c733755d65e5c81a12ffe09b6d16ed5d115368
Reviewed-on: http://gerrit.cloudera.org:8080/18790
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/analysis/InsertStmt.java | 51 ++++--
.../java/org/apache/impala/util/IcebergUtil.java | 11 ++
.../iceberg-partition-transform-insert.test | 171 +++++++++++++++++++++
.../QueryTest/iceberg-partitioned-insert.test | 118 ++++++++++++++
4 files changed, 342 insertions(+), 9 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index c043c3c2f..97ab4202d 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -17,15 +17,23 @@
package org.apache.impala.analysis;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.types.Types;
+
import org.apache.impala.authorization.Privilege;
import org.apache.impala.catalog.BuiltinsDb;
import org.apache.impala.catalog.Column;
@@ -54,12 +62,6 @@ import org.apache.impala.rewrite.ExprRewriter;
import org.apache.impala.thrift.TIcebergPartitionTransformType;
import org.apache.impala.thrift.TSortingOrder;
import org.apache.impala.util.IcebergUtil;
-import org.apache.thrift.TBaseHelper;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
/**
* Representation of a single insert or upsert statement, including the select statement
@@ -929,7 +931,9 @@ public class InsertStmt extends StatementBase {
++parts;
}
}
- Preconditions.checkState(partitionKeyExprs_.size() == parts);
+ if (CollectionUtils.isEmpty(columnPermutation_)) {
+ Preconditions.checkState(partitionKeyExprs_.size() == parts);
+ }
}
else if (isKuduTable) {
Preconditions.checkState(
@@ -972,7 +976,19 @@ public class InsertStmt extends StatementBase {
} else {
// Unmentioned non-clustering columns get NULL literals with the appropriate
// target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
- resultExprs_.add(NullLiteral.create(tblColumn.getType()));
+ NullLiteral nullExpr = NullLiteral.create(tblColumn.getType());
+ resultExprs_.add(nullExpr);
+ // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), if the
+ // partition columns are not in the columnPermutation_, we should fill it
+ // with NullLiteral to partitionKeyExprs_ (IMPALA-11408).
+ if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)
+ && icebergPartSpec != null) {
+ IcebergColumn targetColumn = (IcebergColumn) tblColumn;
+ if (IcebergUtil.isPartitionColumn(targetColumn, icebergPartSpec)) {
+ partitionKeyExprs_.add(nullExpr);
+ partitionColPos_.add(targetColumn.getPosition());
+ }
+ }
}
}
}
@@ -985,6 +1001,23 @@ public class InsertStmt extends StatementBase {
}
}
+ // In the case of INSERT INTO iceberg_tbl (col_a, col_b, ...), to ensure that data is
+ // written to the correct partition, we need to make sure that the partitionKeyExprs_
+ // is in ascending order according to the column position of the Iceberg tables.
+ if (isIcebergTarget() && !CollectionUtils.isEmpty(columnPermutation_)) {
+ List<Pair<Integer, Expr>> exprPairs = Lists.newArrayList();
+ for (int i = 0; i < partitionColPos_.size(); i++) {
+ exprPairs.add(Pair.create(partitionColPos_.get(i), partitionKeyExprs_.get(i)));
+ }
+ exprPairs.sort(Comparator.comparingInt(p -> p.first));
+ partitionColPos_.clear();
+ partitionKeyExprs_.clear();
+ for (Pair<Integer, Expr> exprPair : exprPairs) {
+ partitionColPos_.add(exprPair.first);
+ partitionKeyExprs_.add(exprPair.second);
+ }
+ }
+
if (table_ instanceof FeKuduTable) {
Preconditions.checkState(!primaryKeyExprs_.isEmpty());
}
diff --git a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
index ace59ae2a..f0ec43185 100644
--- a/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/IcebergUtil.java
@@ -67,6 +67,7 @@ import org.apache.impala.analysis.TimeTravelSpec.Kind;
import org.apache.impala.catalog.Catalog;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.IcebergColumn;
import org.apache.impala.catalog.IcebergTable;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.IcebergCatalog;
@@ -984,4 +985,14 @@ public class IcebergUtil {
}
});
}
+
+ public static boolean isPartitionColumn(IcebergColumn column,
+ IcebergPartitionSpec spec) {
+ for (IcebergPartitionField partField : spec.getIcebergPartitionFields()) {
+ if (partField.getTransformType() == TIcebergPartitionTransformType.VOID) continue;
+ if (column.getFieldId() != partField.getSourceId()) continue;
+ return true;
+ }
+ return false;
+ }
}
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
index 971d7a776..f2f725e09 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partition-transform-insert.test
@@ -1,5 +1,176 @@
====
---- QUERY
+create table ice_part_transform (col_i int, col_str string, col_ts timestamp)
+partitioned by spec (
+ bucket(3, col_i),
+ truncate(1, col_str),
+ day(col_ts)
+)
+stored as iceberg;
+====
+---- QUERY
+insert into
+ ice_part_transform
+values
+ (2, 'two', '2001-02-03 07:08:00'),
+ (1, 'one', '2001-01-03 07:08:00'),
+ (3, 'three', '2002-03-03 07:08:00'),
+ (5, 'five', '2003-05-03 07:08:00');
+select col_i,col_str,col_ts from ice_part_transform order by 1,2,3;
+---- RESULTS
+1,'one',2001-01-03 07:08:00
+2,'two',2001-02-03 07:08:00
+3,'three',2002-03-03 07:08:00
+5,'five',2003-05-03 07:08:00
+---- TYPES
+int,string,timestamp
+====
+---- QUERY
+insert into ice_part_transform(col_i) values (0), (4);
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_str is null
+ and col_ts is null
+order by
+ 1, 2, 3;
+---- RESULTS
+0,'NULL',NULL
+4,'NULL',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_str) values ('zero'), ('four');
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_i is null
+ and col_ts is null
+order by
+ 1, 2, 3;
+---- RESULTS
+NULL,'four',NULL
+NULL,'zero',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_ts) values ('2001-04-03 07:08:00'), ('2001-05-03 07:08:00');
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_i is null
+ and col_str is null
+order by
+ 1, 2, 3;
+---- RESULTS
+NULL,'NULL',2001-04-03 07:08:00
+NULL,'NULL',2001-05-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_i, col_str) values (2, 'two'), (1, 'one');
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_i is not null
+ and col_str is not null
+ and col_ts is null
+order by
+ 1, 2, 3;
+---- RESULTS
+1,'one',NULL
+2,'two',NULL
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_i, col_ts) values (2, '2001-02-03 07:08:00'), (1, '2001-01-03 07:08:00');
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_i is not null
+ and col_str is null
+ and col_ts is not null
+order by
+ 1, 2, 3;
+---- RESULTS
+1,'NULL',2001-01-03 07:08:00
+2,'NULL',2001-02-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+insert into ice_part_transform(col_str, col_ts) values ('two', '2001-02-03 07:08:00'), ('one', '2001-01-03 07:08:00');
+select
+ col_i, col_str, col_ts
+from
+ ice_part_transform
+where
+ col_i is null
+ and col_str is not null
+ and col_ts is not null
+order by
+ 1, 2, 3;
+---- RESULTS
+NULL,'one',2001-01-03 07:08:00
+NULL,'two',2001-02-03 07:08:00
+---- TYPES
+int,string,timestamp
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 2
+aggregation(SUM, NumRowGroups): 2
+====
+---- QUERY
+show files in ice_part_transform;
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=2002-03-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=0/col_str_trunc=t/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=1/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=f/col_ts_day=2003-05-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=o/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=2/col_str_trunc=o/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-04-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=__HIVE_DEFAULT_PARTITION__/col_ts_day=2001-05-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=f/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=o/col_ts_day=2001-01-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=t/col_ts_day=2001-02-03/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_part_transform/data/col_i_bucket=__HIVE_DEFAULT_PARTITION__/col_str_trunc=z/col_ts_day=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+---- TYPES
+string, string, string
+====
+---- QUERY
# Test partitioned INSERTs with single column that is also
# the partitioned column. Partition transform is BUCKET.
create table single_col_bucket (s string)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index fa0c7b413..ee7355170 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -76,6 +76,56 @@ select * from ice_multi_part;
INT, DATE, STRING
====
---- QUERY
+insert into
+ ice_multi_part (d, s)
+values
+ ('2022-07-26', 'third'),
+ ('2022-07-27', 'fourth'),
+ ('2022-07-28', 'fifth');
+select * from ice_multi_part order by d;
+---- RESULTS
+1,2020-12-07,'first'
+2,2020-12-08,'second'
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+---- TYPES
+INT, DATE, STRING
+====
+---- QUERY
+insert into
+ ice_multi_part (i, s)
+values
+ (6, 'sixth'),
+ (7, 'seventh');
+select * from ice_multi_part order by d, i;
+---- RESULTS
+1,2020-12-07,'first'
+2,2020-12-08,'second'
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+6,NULL,'sixth'
+7,NULL,'seventh'
+---- TYPES
+INT, DATE, STRING
+====
+---- QUERY
+show files in ice_multi_part;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=1/d=2020-12-07/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=2/d=2020-12-08/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=6/d=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=7/d=__HIVE_DEFAULT_PARTITION__/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-26/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-27/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/ice_multi_part/data/i=__HIVE_DEFAULT_PARTITION__/d=2022-07-28/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
select * from ice_multi_part
where d = '2020-12-08';
---- RESULTS
@@ -86,6 +136,24 @@ INT, DATE, STRING
aggregation(SUM, RowsRead): 1
====
---- QUERY
+select
+ *
+from
+ ice_multi_part
+where
+ i is null
+order by
+ d;
+---- RESULTS
+NULL,2022-07-26,'third'
+NULL,2022-07-27,'fourth'
+NULL,2022-07-28,'fifth'
+---- TYPES
+INT, DATE, STRING
+---- RUNTIME_PROFILE
+aggregation(SUM, RowsRead): 3
+====
+---- QUERY
# Test that Impala only writes one file per partitions.
create table ice_bigints (i BIGINT, j BIGINT, k BIGINT)
partitioned by spec (i, j)
@@ -263,6 +331,56 @@ BIGINT
aggregation(SUM, NumRowGroups): 0
====
---- QUERY
+create table alltypes_part_2 like alltypes_part;
+---- RESULTS
+'Table has been created.'
+====
+---- QUERY
+insert into
+ alltypes_part_2(
+ id,
+ int_col,
+ bigint_col,
+ float_col,
+ double_col,
+ date_col,
+ string_col,
+ timestamp_col
+ )
+select
+ id,
+ int_col,
+ bigint_col,
+ float_col,
+ double_col,
+ CAST(date_string_col as date FORMAT 'MM/DD/YY'),
+ string_col,
+ timestamp_col
+from
+ functional.alltypestiny;
+select count(*) from alltypes_part;
+---- RESULTS
+8
+---- TYPES
+BIGINT
+====
+---- QUERY
+show files in alltypes_part_2;
+---- LABELS
+Path,Size,Partition
+---- RESULTS
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=0/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-01-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=1/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-01-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=2/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-02-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=3/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-02-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=4/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-03-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=5/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-03-01/string_col=1/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=6/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=0/bigint_col=0/float_col=0/double_col=0/date_col=2009-04-01/string_col=0/.*.0.parq','.*',''
+row_regex:'$NAMENODE/test-warehouse/$DATABASE.db/alltypes_part_2/data/id=7/bool_col=__HIVE_DEFAULT_PARTITION__/int_col=1/bigint_col=10/float_col=1.100000023841858/double_col=10.1/date_col=2009-04-01/string_col=1/.*.0.parq','.*',''
+---- TYPES
+STRING, STRING, STRING
+====
+---- QUERY
# Iceberg partitions independent of column order
---- QUERY
# Test inserts with multple partition columns.