You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by la...@apache.org on 2024/01/14 12:40:54 UTC

(impala) 02/02: IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset

This is an automated email from the ASF dual-hosted git repository.

laszlog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6ddd69c605d4c594e33fdd39a2ca888538b4b8d7
Author: Zinway Liu <zi...@gmail.com>
AuthorDate: Wed Dec 27 14:32:17 2023 +0800

    IMPALA-12665: Adjust complete_micro_batch_ length to new scratch_batch_->capacity after ScratchTupleBatch::Reset
    
    **IMPALA-12665 Description:**
    The issue occurs when scanning Parquet tables with a row size
    > 4096 bytes and a row batch size > 1024. A heap-buffer-overflow
    was detected by AddressSanitizer, indicating a write operation
    beyond the allocated buffer space.
    
    **Root Cause Analysis:**
    The error log by AddressSanitizer points to a heap-buffer-overflow,
    where memory is accessed beyond the allocated region. This occurs
    in the `HdfsParquetScanner` and `ScratchTupleBatch` classes when
    handling large rows > 4096 bytes.
    
    **Fault Reproduction:**
    The issue can be reproduced by creating a Parquet table with many
    columns, inserting data using Hive, then querying with Impala.
    Bash and Hive client scripts in IMPALA-12665 create a table and
    populate it, triggering the bug.
    
    **Technical Analysis:**
    `ScratchTupleBatch::Reset` recalculates `capacity` based on tuple
    size and fixed memory limits. When row size > 4096 bytes, `capacity`
    is set < 1024. `HdfsParquetScanner` incorrectly assumes
    `complete_micro_batch_` length of 1024, leading to overflow.
    
    **Proposed Solution:**
    Ensure `complete_micro_batch_` length is updated after
    `ScratchTupleBatch::Reset`. This prevents accessing memory outside
    allocated buffer, avoiding heap-buffer-overflow.
    
    Change-Id: I966ff10ba734ed8b1b61325486de0dfcc7b58e4d
    Reviewed-on: http://gerrit.cloudera.org:8080/20834
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/parquet/hdfs-parquet-scanner.cc |  7 ++++++
 be/src/exec/scratch-tuple-batch.h           |  8 +++++++
 tests/query_test/test_scanners.py           | 36 +++++++++++++++++++++++++++++
 3 files changed, 51 insertions(+)

diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index cc7c92a7b..8ff1cc312 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -2362,6 +2362,10 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou
     // Start a new scratch batch.
     RETURN_IF_ERROR(scratch_batch_->Reset(state_));
     InitTupleBuffer(template_tuple_, scratch_batch_->tuple_mem, scratch_batch_->capacity);
+    // Adjust complete_micro_batch_ length to new scratch_batch_->capacity after
+    // ScratchTupleBatch::Reset
+    complete_micro_batch_.AdjustLength(scratch_batch_->capacity);
+
     // Late Materialization
     // 1. Filter rows only materializing the columns in 'filter_readers_'
     // 2. Transfer the surviving rows
@@ -2499,6 +2503,9 @@ Status HdfsParquetScanner::FillScratchMicroBatches(
               col_reader->schema_element().name, filename()));
         }
       }
+      // Ensure that the length of the micro_batch is less than
+      // or equal to the capacity of scratch_batch_.
+      DCHECK_LE(micro_batches[r].length, scratch_batch_->capacity);
       uint8_t* next_tuple_mem = scratch_batch_->tuple_mem
           + (scratch_batch_->tuple_byte_size * micro_batches[r].start);
       if (col_reader->max_rep_level() > 0) {
diff --git a/be/src/exec/scratch-tuple-batch.h b/be/src/exec/scratch-tuple-batch.h
index 6513c5c70..ca14c255f 100644
--- a/be/src/exec/scratch-tuple-batch.h
+++ b/be/src/exec/scratch-tuple-batch.h
@@ -30,6 +30,14 @@ struct ScratchMicroBatch {
   int start;
   int end;
   int length;
+
+  // Adjusts the micro batch length to new capacity if needed.
+  void AdjustLength(int new_capacity) {
+    if (length > new_capacity) {
+      length = new_capacity;
+      end = length - 1;
+    }
+  }
 };
 
 /// Helper struct that holds a batch of tuples allocated from a mem pool, as well
diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py
index 6a5e2b8d6..88e9abf83 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1268,6 +1268,42 @@ class TestParquet(ImpalaTestSuite):
     self.run_test_case(
         "QueryTest/parquet-decimal-precision-and-scale-altering", vector, unique_database)
 
+  def test_row_size_gt_4096_queries(self, unique_database):
+    table_format = 'parquet'
+    table_name = "{0}.{1}_{2}".format(
+        unique_database, "t_row_size_gt_4096", table_format)
+
+    # create table
+    field_string = ', '.join('field{} STRING'.format(i) for i in range(1, 601))
+    create_sql = "CREATE TABLE {} (id INT, {}) STORED AS {}".format(
+        table_name, field_string, table_format)
+    self.client.execute(create_sql)
+
+    # insert data
+    id_generation_sql = """
+    WITH ten AS (
+      SELECT 0 AS n
+      UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3
+      UNION ALL SELECT 4 UNION ALL SELECT 5 UNION ALL SELECT 6
+      UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9
+    )
+    SELECT
+      row_number() OVER (ORDER BY a.n) AS id
+    FROM
+      ten a, ten b, ten c, ten d
+    LIMIT
+      2000
+    """
+    field_string = ', '.join(['CAST(RAND() AS STRING) AS field{}'.format(i)
+                                for i in range(1, 601)])
+    insert_sql = "INSERT INTO {} SELECT CAST(s.id AS INT), {} FROM ({}) s;".format(
+        table_name, field_string, id_generation_sql)
+    self.execute_query_expect_success(self.client, insert_sql)
+
+    # do a query
+    query_sql = "SELECT * FROM {} where field1 = '123'".format(table_name)
+    self.execute_query_expect_success(self.client, query_sql)
+
 
 # We use various scan range lengths to exercise corner cases in the HDFS scanner more
 # thoroughly. In particular, it will exercise: