You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2024/01/14 12:40:52 UTC

(impala) branch master updated (a2b8aed2c -> 6ddd69c60)

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from a2b8aed2c IMPALA-12702: Show reduced cardinality estimation in ExecSummary
     new 74617537b IMPALA-12706: Fix nested struct querying for Iceberg metadata tables
     new 6ddd69c60 IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../iceberg-metadata/iceberg-metadata-scan-node.cc | 33 +++++++++-----------
 .../iceberg-metadata/iceberg-metadata-scan-node.h  |  8 ++---
 be/src/exec/parquet/hdfs-parquet-scanner.cc        |  7 +++++
 be/src/exec/scratch-tuple-batch.h                  |  8 +++++
 .../queries/QueryTest/iceberg-metadata-tables.test | 10 ++++++
 tests/query_test/test_scanners.py                  | 36 ++++++++++++++++++++++
 6 files changed, 80 insertions(+), 22 deletions(-)


(impala) 02/02: IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6ddd69c605d4c594e33fdd39a2ca888538b4b8d7
Author: Zinway Liu <zi...@gmail.com>
AuthorDate: Wed Dec 27 14:32:17 2023 +0800

    IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset
    
    **IMPALA-12665 Description:**
    The issue occurs when scanning Parquet tables with a row size
    > 4096 bytes and a row batch size > 1024. A heap-buffer-overflow
    was detected by AddressSanitizer, indicating a write operation
    beyond the allocated buffer space.
    
    **Root Cause Analysis:**
    The error log by AddressSanitizer points to a heap-buffer-overflow,
    where memory is accessed beyond the allocated region. This occurs
    in the `HdfsParquetScanner` and `ScratchTupleBatch` classes when
    handling large rows > 4096 bytes.
    
    **Fault Reproduction:**
    The issue can be reproduced by creating a Parquet table with many
    columns, inserting data using Hive, then querying with Impala.
    Bash and Hive client scripts in IMPALA-12665 create a table and
    populate it, triggering the bug.
    
    **Technical Analysis:**
    `ScratchTupleBatch::Reset` recalculates `capacity` based on tuple
    size and fixed memory limits. When row size > 4096 bytes, `capacity`
    is set < 1024. `HdfsParquetScanner` incorrectly assumes
    `complete_micro_batch_` length of 1024, leading to overflow.
    
    **Proposed Solution:**
    Ensure `complete_micro_batch_` length is updated after
    `ScratchTupleBatch::Reset`. This prevents accessing memory outside
    allocated buffer, avoiding heap-buffer-overflow.
    
    Change-Id: I966ff10ba734ed8b1b61325486de0dfcc7b58e4d
    Reviewed-on: http://gerrit.cloudera.org:8080/20834
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc |  7 ++++++
 be/src/exec/scratch-tuple-batch.h           |  8 +++++++
 tests/query_test/test_scanners.py           | 36 +++++++++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index cc7c92a7b..8ff1cc312 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2362,6 +2362,10 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou
     // Start a new scratch batch.
     RETURN_IF_ERROR(scratch_batch_->Reset(state_));
     InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
+    // Adjust complete_micro_batch_ length to new scratch_batch_->capacity after
+    // ScratchTupleBatch::Reset
+    complete_micro_batch_.AdjustLength(scratch_batch_->capacity);
+
     // Late Materialization
     // 1. Filter rows only materializing the columns in 'filter_readers_'
     // 2. Transfer the surviving rows
@@ -2499,6 +2503,9 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
               col_reader->schema_element().name, filename()));
         }
       }
+      // Ensure that the length of the micro_batch is less than
+      // or equal to the capacity of scratch_batch_.
+      DCHECK_LE(micro_batches[r].length, scratch_batch_->capacity);
       uint8_t* next_tuple_mem = scratch_batch_->tuple_mem
           + (scratch_batch_->tuple_byte_size * micro_batches[r].start);
       if (col_reader->max_rep_level() > 0) {
diff --git a/be/src/exec/scratch-tuple-batch.h b/be/src/exec/scratch-tuple-batch.h
index 6513c5c70..ca14c255f 100644
--- a/be/src/exec/scratch-tuple-batch.h
+++ b/be/src/exec/scratch-tuple-batch.h
@@ -30,6 +30,14 @@ struct ScratchMicroBatch {
   int start;
   int end;
   int length;
+
+  // Adjusts the micro batch length to new capacity if needed.
+  void AdjustLength(int new_capacity) {
+    if (length > new_capacity) {
+      length = new_capacity;
+      end = length - 1;
+    }
+  }
 };
 
 /// Helper struct that holds a batch of tuples allocated from a mem pool, as well
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6a5e2b8d6..88e9abf83 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1268,6 +1268,42 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case(
         "QueryTest/parquet-decimal-precision-and-scale-altering", vector, unique_database)
 
+  def test_row_size_gt_4096_queries(self, unique_database):
+    table_format = 'parquet'
+    table_name = "{0}.{1}_{2}".format(
+        unique_database, "t_row_size_gt_4096", table_format)
+
+    # create table
+    field_string = ', '.join('field{} STRING'.format(i) for i in range(1, 601))
+    create_sql = "CREATE TABLE {} (id INT, {}) STORED AS {}".format(
+        table_name, field_string, table_format)
+    self.client.execute(create_sql)
+
+    # insert data
+    id_generation_sql = """
+    WITH ten AS (
+      SELECT 0 AS n
+      UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3
+      UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6
+      UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9
+    )
+    SELECT
+      row_number() OVER (ORDER BY a.n) AS id
+    FROM
+      ten a, ten b, ten c, ten d
+    LIMIT
+      2000
+    """
+    field_string = ', '.join(['CAST(RAND() AS STRING) AS field{}'.format(i)
+                                for i in range(1, 601)])
+    insert_sql = "INSERT INTO {} SELECT CAST(s.id AS INT), {} FROM ({}) s;".format(
+        table_name, field_string, id_generation_sql)
+    self.execute_query_expect_success(self.client, insert_sql)
+
+    # do a query
+    query_sql = "SELECT * FROM {} where field1 = '123'".format(table_name)
+    self.execute_query_expect_success(self.client, query_sql)
+
 
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise:


(impala) 01/02: IMPALA-12706: Fix nested struct querying for Iceberg metadata tables

Posted by la...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 74617537b5c805327349ef0ac5c79b84dc1e786d
Author: Tamas Mate <tm...@apache.org>
AuthorDate: Thu Jan 11 14:26:54 2024 +0100

    IMPALA-12706: Fix nested struct querying for Iceberg metadata tables
    
    This commit fixes a DCHECK failure when querying a struct inside a
    struct. The previous field accessor creation logic was trying to find
    the ColumnDescriptor for a struct inside a struct and hit a DCHECK
    because there are no ColumnDescriptors for struct fields. The logic
    has been reworked to only use ColumnDescriptors for top level columns.
    
    Testing:
     - Added E2E test to cover this case
    
    Change-Id: Iadd029a4edc500bd8d8fca3f958903c2dbe09e8e
    Reviewed-on: http://gerrit.cloudera.org:8080/20883
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../iceberg-metadata/iceberg-metadata-scan-node.cc | 33 ++++++++++------------
 .../iceberg-metadata/iceberg-metadata-scan-node.h  |  8 +++---
 .../queries/QueryTest/iceberg-metadata-tables.test | 10 +++++++
 3 files changed, 29 insertions(+), 22 deletions(-)

diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
index d779992fb..5210b7cd1 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.cc
@@ -91,30 +91,27 @@ Status IcebergMetadataScanNode::CreateFieldAccessors() {
   JNIEnv* env = JniUtil::GetJNIEnv();
   if (env == nullptr) return Status("Failed to get/create JVM");
   for (SlotDescriptor* slot_desc: tuple_desc_->slots()) {
-    if (slot_desc->type().IsStructType()) {
-      // Get the top level struct's field id from the ColumnDescriptor then recursively
-      // get the field ids for struct fields
-      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
-      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
-      RETURN_IF_ERROR(CreateFieldAccessors(env, slot_desc));
-    } else if (slot_desc->col_path().size() > 1) {
-      DCHECK(!slot_desc->type().IsComplexType());
-      // Slot that is child of a struct without tuple, can occur when a struct member is
-      // in the select list. ColumnType has a tree structure, and this loop finds the
-      // STRUCT node that stores the primitive type. Because, that struct node has the
-      // field id list of its childs.
+    int field_id = -1;
+    if (slot_desc->col_path().size() == 1) {
+      // Top level slots have ColumnDescriptors that store the field ids.
+      field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
+    } else {
+      // Non top level slots are fields of a nested type. This code path is to handle
+      // slots that does not have their nested type's tuple available.
+      // This loop finds the struct ColumnType node that stores the slot as it has the
+      // field id list of its children.
       int root_type_index = slot_desc->col_path()[0];
       ColumnType* current_type = &const_cast<ColumnType&>(
           tuple_desc_->table_desc()->col_descs()[root_type_index].type());
       for (int i = 1; i < slot_desc->col_path().size() - 1; ++i) {
         current_type = &current_type->children[slot_desc->col_path()[i]];
       }
-      int field_id = current_type->field_ids[slot_desc->col_path().back()];
-      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
-    } else {
-      // For primitives in the top level tuple, use the ColumnDescriptor
-      int field_id = tuple_desc_->table_desc()->GetColumnDesc(slot_desc).field_id();
-      RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+      field_id = current_type->field_ids[slot_desc->col_path().back()];
+    }
+    DCHECK_NE(field_id, -1);
+    RETURN_IF_ERROR(AddAccessorForFieldId(env, field_id, slot_desc->id()));
+    if (slot_desc->type().IsStructType()) {
+      RETURN_IF_ERROR(CreateFieldAccessors(env, slot_desc));
     }
   }
   return Status::OK();
diff --git a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
index 64f1d3aa5..ae15efbd4 100644
--- a/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
+++ b/be/src/exec/iceberg-metadata/iceberg-metadata-scan-node.h
@@ -121,10 +121,10 @@ class IcebergMetadataScanNode : public ScanNode {
   Status GetCatalogTable(jobject* jtable);
 
   /// Populates the jaccessors_ map by creating the accessors for the columns in the JVM.
-  /// To create a field accessor for a column the Iceberg field id is needed. For
-  /// primitive type columns that are not a field of a struct, this can be found in the
-  /// ColumnDescriptor. However, ColumnDescriptors are not available for struct fields,
-  /// in this case the SlotDescriptor can be used.
+  /// To create a field accessor for a column the Iceberg field id is needed. For columns
+  /// that are not a field of a struct, this can be found in the ColumnDescriptor.
+  /// However, ColumnDescriptors are not available for struct fields, in this case the
+  /// ColumnType of the SlotDescriptor can be used.
   Status CreateFieldAccessors();
 
   /// Recursive part of the Accessor collection, when there is a struct in the tuple.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
index 0f58dae99..a559e13b6 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-metadata-tables.test
@@ -470,6 +470,16 @@ select readable_metrics from functional_parquet.iceberg_query_metadata.entries;
 STRING
 ====
 ---- QUERY
+select readable_metrics.i from functional_parquet.iceberg_query_metadata.entries;
+---- RESULTS
+'{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}'
+'{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":2,"upper_bound":2}'
+'{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":1,"upper_bound":1}'
+'{"column_size":null,"value_count":null,"null_value_count":null,"nan_value_count":null,"lower_bound":null,"upper_bound":null}'
+---- TYPES
+STRING
+====
+---- QUERY
 select snapshot_id, readable_metrics from functional_parquet.iceberg_query_metadata.entries;
 ---- RESULTS
 row_regex:[1-9]\d*|0,'{"i":{"column_size":47,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":3,"upper_bound":3}}'