You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/02 23:29:36 UTC
[impala] 03/03: IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 8638255e5074f1342dfc452bca39f649a76612d6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Aug 1 14:57:34 2023 +0200
IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode
The Iceberg delete node tries to do mini merge-joins between data
records and delete records. This works in BROADCAST mode, and most of
the time in PARTITIONED mode as well. Though the Iceberg delete node had
the wrong assumption that if the rows in a row batch belong to the same
file, and come in ascending order, we rely on the previous delete
updating IcebergDeleteState to the next deleted row id and skip the
binary search if it's greater than or equal to the current probe row id.
When PARTITIONED mode is used, we cannot rely on ascending row order,
not even inside row batches, not even when the previous file path is the
same as the current one. This is because files with multiple blocks can
be processed by multiple hosts in parallel, then the rows are getting
hash-exchanged based on their file paths. Then the exchange-receiver at
the LHS coalesces the row batches from multiple senders, hence the row
IDs being unordered.
This patch adds a fix to ignore presumptions and do a binary search when
the position-based difference between the current row and previous row
is not one, and we are in PARTITIONED mode.
Tests:
* added e2e tests
Change-Id: Ib89a53e812af8c3b8ec5bc27bca0a50dcac5d924
Reviewed-on: http://gerrit.cloudera.org:8080/20295
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/exec/iceberg-delete-node.cc | 27 +++++++++++++++++-----
.../functional/functional_schema_template.sql | 4 +---
.../iceberg-v2-read-position-deletes.test | 15 ++++++++++++
3 files changed, 37 insertions(+), 9 deletions(-)
diff --git a/be/src/exec/iceberg-delete-node.cc b/be/src/exec/iceberg-delete-node.cc
index 04f024ca0..668181f13 100644
--- a/be/src/exec/iceberg-delete-node.cc
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -324,15 +324,17 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
}
void IcebergDeleteNode::IcebergDeleteState::Update(
- impala::StringValue* file_path, int64_t* probe_pos) {
+ impala::StringValue* file_path, int64_t* next_probe_pos) {
DCHECK(builder_ != nullptr);
// Making sure the row ids are in ascending order inside a row batch in broadcast mode
DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
- || current_probe_pos_ < *probe_pos);
- DCHECK(!builder_->IsDistributedMode() || previous_file_path_ == nullptr
- || *file_path != *previous_file_path_ || current_probe_pos_ == INVALID_ROW_ID
- || current_probe_pos_ < *probe_pos);
- current_probe_pos_ = *probe_pos;
+ || current_probe_pos_ < *next_probe_pos);
+ bool is_consecutive_pos = false;
+ if(current_probe_pos_ != INVALID_ROW_ID) {
+ const int64_t step = *next_probe_pos - current_probe_pos_;
+ is_consecutive_pos = step == 1;
+ }
+ current_probe_pos_ = *next_probe_pos;
if (previous_file_path_ != nullptr
&& (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) {
@@ -340,6 +342,19 @@ void IcebergDeleteNode::IcebergDeleteState::Update(
if (current_deleted_pos_row_id_ != INVALID_ROW_ID
&& current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) {
UpdateImpl();
+ } else if (builder_->IsDistributedMode() && !is_consecutive_pos) {
+ // In distributed mode (which means PARTITIONED JOIN distribution mode) we cannot
+ // rely on ascending row order, not even inside row batches, not even when the
+ // previous file path is the same as the current one.
+ // This is because files with multiple blocks can be processed by multiple hosts
+ // in parallel, then the rows are getting hash-exchanged based on their file paths.
+ // Then the exchange-receiver at the LHS coalesces the row batches from multiple
+ // senders, hence the row IDs getting unordered. So we are always doing a binary
+ // search here to find the proper delete row id.
+ // This won't be a problem with the DIRECTED distribution mode (see IMPALA-12308)
+ // which will behave similarly to the BROADCAST mode in this regard.
+ DCHECK_EQ(*file_path, *previous_file_path_);
+ UpdateImpl();
}
} else {
auto it = builder_->deleted_rows().find(*file_path);
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3a5e5b076..7a1e255a2 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3694,7 +3694,6 @@ ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.de
INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(2, 'orc', 1.5, false);
ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.default'='parquet');
INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, false);
-
====
---- DATASET
functional
@@ -3709,8 +3708,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
'format-version'='2');
---- DEPENDENT_LOAD
`hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
-hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
-
+hadoop fs -Ddfs.block.size=524288 -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
====
---- DATASET
functional
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index c40a968ef..4742defee 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -672,6 +672,21 @@ SELECT count(*) from iceberg_lineitem_multiblock;
bigint
====
---- QUERY
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
+SET BATCH_SIZE=2;
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
SELECT * from iceberg_v2_partitioned_position_deletes;
---- RESULTS
6,'Alex','view',2020-01-01 09:00:00