You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2023/08/02 23:29:36 UTC

[impala] 03/03: IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 8638255e5074f1342dfc452bca39f649a76612d6
Author: Zoltan Borok-Nagy <bo...@cloudera.com>
AuthorDate: Tue Aug 1 14:57:34 2023 +0200

    IMPALA-12327: Iceberg V2 operator wrong results in PARTITIONED mode
    
    The Iceberg delete node tries to do mini merge-joins between data
    records and delete records. This works in BROADCAST mode, and most of
    the time in PARTITIONED mode as well. Though the Iceberg delete node had
    the wrong assumption that if the rows in a row batch belong to the same
    file, and come in ascending order, we rely on the previous delete
    updating IcebergDeleteState to the next deleted row id and skip the
    binary search if it's greater than or equal to the current probe row id.
    
    When PARTITIONED mode is used, we cannot rely on ascending row order,
    not even inside row batches, not even when the previous file path is the
    same as the current one. This is because files with multiple blocks can
    be processed by multiple hosts in parallel, then the rows are getting
    hash-exchanged based on their file paths. Then the exchange-receiver at
    the LHS coalesces the row batches from multiple senders, hence the row
    IDs being unordered.
    
    This patch adds a fix to ignore presumptions and do a binary search when
    the position-based difference between the current row and previous row
    is not one, and we are in PARTITIONED mode.
    
    Tests:
     * added e2e tests
    
    Change-Id: Ib89a53e812af8c3b8ec5bc27bca0a50dcac5d924
    Reviewed-on: http://gerrit.cloudera.org:8080/20295
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/iceberg-delete-node.cc                 | 27 +++++++++++++++++-----
 .../functional/functional_schema_template.sql      |  4 +---
 .../iceberg-v2-read-position-deletes.test          | 15 ++++++++++++
 3 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/iceberg-delete-node.cc b/be/src/exec/iceberg-delete-node.cc
index 04f024ca0..668181f13 100644
--- a/be/src/exec/iceberg-delete-node.cc
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -324,15 +324,17 @@ void IcebergDeleteNode::IcebergDeleteState::UpdateImpl() {
 }
 
 void IcebergDeleteNode::IcebergDeleteState::Update(
-    impala::StringValue* file_path, int64_t* probe_pos) {
+    impala::StringValue* file_path, int64_t* next_probe_pos) {
   DCHECK(builder_ != nullptr);
   // Making sure the row ids are in ascending order inside a row batch in broadcast mode
   DCHECK(builder_->IsDistributedMode() || current_probe_pos_ == INVALID_ROW_ID
-      || current_probe_pos_ < *probe_pos);
-  DCHECK(!builder_->IsDistributedMode() || previous_file_path_ == nullptr
-      || *file_path != *previous_file_path_ || current_probe_pos_ == INVALID_ROW_ID
-      || current_probe_pos_ < *probe_pos);
-  current_probe_pos_ = *probe_pos;
+      || current_probe_pos_ < *next_probe_pos);
+  bool is_consecutive_pos = false;
+  if(current_probe_pos_ != INVALID_ROW_ID) {
+    const int64_t step = *next_probe_pos - current_probe_pos_;
+    is_consecutive_pos = step == 1;
+  }
+  current_probe_pos_ = *next_probe_pos;
 
   if (previous_file_path_ != nullptr
       && (!builder_->IsDistributedMode() || *file_path == *previous_file_path_)) {
@@ -340,6 +342,19 @@ void IcebergDeleteNode::IcebergDeleteState::Update(
     if (current_deleted_pos_row_id_ != INVALID_ROW_ID
         && current_probe_pos_ > (*current_delete_row_)[current_deleted_pos_row_id_]) {
       UpdateImpl();
+    } else if (builder_->IsDistributedMode() && !is_consecutive_pos) {
+      // In distributed mode (which means PARTITIONED JOIN distribution mode) we cannot
+      // rely on ascending row order, not even inside row batches, not even when the
+      // previous file path is the same as the current one.
+      // This is because files with multiple blocks can be processed by multiple hosts
+      // in parallel, then the rows are getting hash-exchanged based on their file paths.
+      // Then the exchange-receiver at the LHS coalesces the row batches from multiple
+      // senders, hence the row IDs getting unordered. So we are always doing a binary
+      // search here to find the proper delete row id.
+      // This won't be a problem with the DIRECTED distribution mode (see IMPALA-12308)
+      // which will behave similarly to the BROADCAST mode in this regard.
+      DCHECK_EQ(*file_path, *previous_file_path_);
+      UpdateImpl();
     }
   } else {
     auto it = builder_->deleted_rows().find(*file_path);
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3a5e5b076..7a1e255a2 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -3694,7 +3694,6 @@ ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.de
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(2, 'orc', 1.5, false);
 ALTER TABLE {db_name}{db_suffix}.{table_name} SET TBLPROPERTIES('write.format.default'='parquet');
 INSERT INTO TABLE {db_name}{db_suffix}.{table_name} values(3, 'parquet', 2.5, false);
-
 ====
 ---- DATASET
 functional
@@ -3709,8 +3708,7 @@ TBLPROPERTIES('iceberg.catalog'='hadoop.catalog',
               'format-version'='2');
 ---- DEPENDENT_LOAD
 `hadoop fs -mkdir -p /test-warehouse/iceberg_test/hadoop_catalog/ice && \
-hadoop fs -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
-
+hadoop fs -Ddfs.block.size=524288 -put -f ${IMPALA_HOME}/testdata/data/iceberg_test/hadoop_catalog/ice/iceberg_lineitem_multiblock /test-warehouse/iceberg_test/hadoop_catalog/ice
 ====
 ---- DATASET
 functional
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index c40a968ef..4742defee 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -672,6 +672,21 @@ SELECT count(*) from iceberg_lineitem_multiblock;
 bigint
 ====
 ---- QUERY
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
+SET BATCH_SIZE=2;
+select count(*) from iceberg_lineitem_multiblock where l_linenumber%5=0;
+---- RESULTS
+0
+---- TYPES
+bigint
+====
+---- QUERY
 SELECT * from iceberg_v2_partitioned_position_deletes;
 ---- RESULTS
 6,'Alex','view',2020-01-01 09:00:00