You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/18 23:51:09 UTC

incubator-impala git commit: IMPALA-5776: Write partial tuple to the correct mempool

Repository: incubator-impala
Updated Branches:
  refs/heads/master 985698bb6 -> 082058e2c


IMPALA-5776: Write partial tuple to the correct mempool

In the text scanner, we were writing the partial tuple variable length
data to data_buffer_pool_ mempool which caused strange behavior, such
as incorrect results.

If we are scanning compressed data, the pool gets attached to the row
batch at the end of a GetNext() call and gets freed before the next
GetNext() call. This is wrong because we expect the data in the partial
tuple to survive between the GetNext() calls. If we are scanning non
compressed data, data_buffer_pool_ never gets cleared and grows over
time until the scanner finishes reading the scan range.

We fix the problem by writing the varlen partial tuple data to
boundary_pool, which is where the constant length partial tuple data is
written. We also make sure that boundary pool does not hold any tuple
data of returned batches by always deep copying it to output batches.

Testing:
- Ran some tests locally on ASAN build.
- Updated test_scanners_fuzz.py to make slightly more significant
  changes to the data files. This change was helpful for finding
  issues while developing this patch.

Change-Id: I60ba5c113aefd17f697c1888fd46a237ef396540
Reviewed-on: http://gerrit.cloudera.org:8080/7639
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/082058e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/082058e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/082058e2

Branch: refs/heads/master
Commit: 082058e2c760e7b5bd84f580bed61a21aee16f25
Parents: 985698b
Author: Taras Bobrovytsky <tb...@cloudera.com>
Authored: Wed Aug 9 16:08:17 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Aug 18 23:27:38 2017 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-text-scanner.cc       | 64 ++++++++++++++++-------------
 be/src/exec/hdfs-text-scanner.h        | 38 +++++++++--------
 tests/query_test/test_scanners_fuzz.py | 14 ++++---
 3 files changed, 65 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/082058e2/be/src/exec/hdfs-text-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 5962f94..761f06f 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -66,7 +66,6 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* stat
       batch_start_ptr_(nullptr),
       error_in_row_(false),
       partial_tuple_(nullptr),
-      partial_tuple_empty_(true),
       parse_delimiter_timer_(nullptr) {
 }
 
@@ -163,19 +162,19 @@ void HdfsTextScanner::Close(RowBatch* row_batch) {
     decompressor_->Close();
     decompressor_.reset();
   }
+  DCHECK(boundary_column_.IsEmpty());
+  boundary_pool_->FreeAll();
   if (row_batch != nullptr) {
     row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
-    row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false);
     context_->ReleaseCompletedResources(row_batch, true);
     if (scan_node_->HasRowBatchQueue()) {
       static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
           unique_ptr<RowBatch>(row_batch));
     }
   } else {
-    if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll();
-    if (data_buffer_pool_ != nullptr) data_buffer_pool_->FreeAll();
-    if (boundary_pool_ != nullptr) boundary_pool_->FreeAll();
+    template_tuple_pool_->FreeAll();
+    data_buffer_pool_->FreeAll();
     context_->ReleaseCompletedResources(nullptr, true);
   }
 
@@ -236,8 +235,7 @@ Status HdfsTextScanner::ResetScanner() {
   boundary_row_.Clear();
   delimited_text_parser_->ParserReset();
   byte_buffer_ptr_ = byte_buffer_end_ = nullptr;
-  partial_tuple_ = Tuple::Create(tuple_byte_size_, boundary_pool_.get());
-  partial_tuple_empty_ = true;
+  partial_tuple_ = nullptr;
 
   // Initialize codegen fn
   RETURN_IF_ERROR(InitializeWriteTuplesFn(
@@ -256,7 +254,7 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
     // of the next scan range. Nothing to do since we already fully parsed the previous
     // tuple.
     DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
-    DCHECK(partial_tuple_empty_);
+    DCHECK(partial_tuple_ == nullptr);
     DCHECK(boundary_column_.IsEmpty());
     DCHECK(boundary_row_.IsEmpty());
     scan_state_ = DONE;
@@ -286,15 +284,20 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
            << ":" << stream_->file_offset() << endl << status.GetDetail();
         RETURN_IF_ERROR(state_->LogOrReturnError(
             ErrorMsg(TErrorCode::GENERAL, ss.str())));
-      } else if (!partial_tuple_empty_ || !boundary_column_.IsEmpty() ||
+      } else if (partial_tuple_ != nullptr || !boundary_column_.IsEmpty() ||
           !boundary_row_.IsEmpty() ||
           (delimited_text_parser_->HasUnfinishedTuple() &&
               (!scan_node_->materialized_slots().empty() ||
                   scan_node_->num_materialized_partition_keys() > 0))) {
-        // Missing columns or row delimiter at end of the file is ok, fill the row in.
-        char* col = boundary_column_.buffer();
+        // There is data in the partial column because there is a missing row delimiter
+        // at the end of the file. Copy the data into a new string buffer that gets
+        // memory from the row batch pool, so that the boundary pool could be freed.
+        StringBuffer sb(row_batch->tuple_data_pool());
+        RETURN_IF_ERROR(sb.Append(boundary_column_.buffer(), boundary_column_.len()));
+        boundary_column_.Clear();
+        char* col = sb.buffer();
         int num_fields = 0;
-        RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(),
+        RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(sb.len(),
             &col, &num_fields, field_locations_.data()));
 
         TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
@@ -312,9 +315,9 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
       } else if (delimited_text_parser_->HasUnfinishedTuple()) {
         DCHECK(scan_node_->materialized_slots().empty());
         DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0);
-        // If no fields are materialized we do not update partial_tuple_empty_,
-        // boundary_column_, or boundary_row_. However, we still need to handle the case
-        // of partial tuple due to missing tuple delimiter at the end of file.
+        // If no fields are materialized we do not update boundary_column_, or
+        // boundary_row_. However, we still need to handle the case of partial tuple due
+        // to missing tuple delimiter at the end of file.
         RETURN_IF_ERROR(CommitRows(1, row_batch));
       }
       break;
@@ -762,7 +765,7 @@ Status HdfsTextScanner::Open(ScannerContext* context) {
   return Status::OK();
 }
 
-// This function deals with tuples that straddle batches. There are two cases:
+// This function deals with tuples that straddle blocks. There are two cases:
 // 1. There is already a partial tuple in flight from the previous time around.
 //    This tuple can either be fully materialized (all the materialized columns have
 //    been processed but we haven't seen the tuple delimiter yet) or only partially
@@ -773,6 +776,7 @@ Status HdfsTextScanner::Open(ScannerContext* context) {
 int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
     TupleRow* row) {
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
+  DCHECK(boundary_column_.IsEmpty());
 
   FieldLocation* fields = field_locations_.data();
 
@@ -782,11 +786,8 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
   if (slot_idx_ != 0) {
     DCHECK(tuple_ != nullptr);
     int num_partial_fields = scan_node_->materialized_slots().size() - slot_idx_;
-    // Corner case where there will be no materialized tuples but at least one col
-    // worth of string data.  In this case, make a deep copy and reuse the byte buffer.
-    bool copy_strings = num_partial_fields > num_fields;
     num_partial_fields = min(num_partial_fields, num_fields);
-    WritePartialTuple(fields, num_partial_fields, copy_strings);
+    WritePartialTuple(fields, num_partial_fields);
 
     // This handles case 1.  If the tuple is complete and we've found a tuple delimiter
     // this time around (i.e. num_tuples > 0), add it to the row batch.  Otherwise,
@@ -801,10 +802,9 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
         if (!parse_status_.ok()) return 0;
         error_in_row_ = false;
       }
-      boundary_row_.Clear();
 
-      memcpy(tuple_, partial_tuple_, scan_node_->tuple_desc()->byte_size());
-      partial_tuple_empty_ = true;
+      CopyAndClearPartialTuple(pool);
+
       row->SetTuple(scan_node_->tuple_idx(), tuple_);
 
       slot_idx_ = 0;
@@ -855,12 +855,12 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
   // Write out the remaining slots (resulting in a partially materialized tuple)
   if (num_fields != 0) {
     DCHECK(tuple_ != nullptr);
+    partial_tuple_ = Tuple::Create(tuple_byte_size_, boundary_pool_.get());
     InitTuple(template_tuple_, partial_tuple_);
     // If there have been no materialized tuples at this point, copy string data
     // out of byte_buffer and reuse the byte_buffer.  The copied data can be at
     // most one tuple's worth.
-    WritePartialTuple(fields, num_fields, num_tuples_materialized == 0);
-    partial_tuple_empty_ = false;
+    WritePartialTuple(fields, num_fields);
   }
   DCHECK_LE(slot_idx_, scan_node_->materialized_slots().size());
   return num_tuples_materialized;
@@ -883,8 +883,7 @@ Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) {
   return Status::OK();
 }
 
-void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
-    int num_fields, bool copy_strings) {
+void HdfsTextScanner::WritePartialTuple(FieldLocation* fields, int num_fields) {
   for (int i = 0; i < num_fields; ++i) {
     bool need_escape = false;
     int len = fields[i].len;
@@ -895,10 +894,19 @@ void HdfsTextScanner::WritePartialTuple(FieldLocation* fields,
 
     const SlotDescriptor* desc = scan_node_->materialized_slots()[slot_idx_];
     if (!text_converter_->WriteSlot(desc, partial_tuple_,
-        fields[i].start, len, true, need_escape, data_buffer_pool_.get())) {
+        fields[i].start, len, true, need_escape, boundary_pool_.get())) {
       ReportColumnParseError(desc, fields[i].start, len);
       error_in_row_ = true;
     }
     ++slot_idx_;
   }
 }
+
+void HdfsTextScanner::CopyAndClearPartialTuple(MemPool* pool) {
+  DCHECK(tuple_ != nullptr);
+  partial_tuple_->DeepCopy(tuple_, *scan_node_->tuple_desc(), pool);
+  boundary_row_.Reset();
+  boundary_column_.Reset();
+  boundary_pool_->Clear();
+  partial_tuple_ = nullptr;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/082058e2/be/src/exec/hdfs-text-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h
index ceeda8d..069ca7d 100644
--- a/be/src/exec/hdfs-text-scanner.h
+++ b/be/src/exec/hdfs-text-scanner.h
@@ -183,23 +183,31 @@ class HdfsTextScanner : public HdfsScanner {
   /// Returns the number of rows added to the row batch.
   int WriteFields(int num_fields, int num_tuples, MemPool* pool, TupleRow* row);
 
-  /// Utility function to write out 'num_fields' to 'tuple_'.  This is used to parse
-  /// partial tuples. If copy_strings is true, strings from fields will be copied into
-  /// the boundary pool.
-  void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings);
+  /// Utility function to parse 'num_fields' and materialize the resulting slots into
+  /// 'partial_tuple_'.  The data of var-len fields is copied into 'boundary_pool_'.
+  void WritePartialTuple(FieldLocation*, int num_fields);
 
-  /// Current state of this scanner. Advances through the states exactly in order.
+  /// Deep copies the partial tuple into 'tuple_'.  The deep copy is done to simplify
+  /// memory ownership.  Also clears the boundary pool to prevent the accumulation of
+  /// variable length data in it.
+  void CopyAndClearPartialTuple(MemPool* pool);
+
+  /// Current state of this scanner.  Advances through the states exactly in order.
   TextScanState scan_state_;
 
-  /// Mem pool for boundary_row_ and boundary_column_.
+  /// Mem pool for boundary_row_, boundary_column_, partial_tuple_ and any variable length
+  /// data that is pointed at by the partial tuple.  Does not hold any tuple data
+  /// of returned batches, because the data is always deep-copied into the output batch.
   boost::scoped_ptr<MemPool> boundary_pool_;
 
-  /// Helper string for dealing with input rows that span file blocks.
-  /// We keep track of a whole line that spans file blocks to be able to report
-  /// the line as erroneous in case of parsing errors.
+  /// Helper string for dealing with input rows that span file blocks.  We keep track of
+  /// a whole line that spans file blocks to be able to report the line as erroneous in
+  /// case of parsing errors.  Does not hold any tuple data of returned batches.
   StringBuffer boundary_row_;
 
-  /// Helper string for dealing with columns that span file blocks.
+  /// Helper string for dealing with columns that span file blocks.  Does not hold any
+  /// tuple data of returned batches, because the data is always deep-copied into the
+  /// output batch.  Memory comes from boundary_pool_.
   StringBuffer boundary_column_;
 
   /// Index into materialized_slots_ for the next slot to output for the current tuple.
@@ -225,15 +233,11 @@ class HdfsTextScanner : public HdfsScanner {
   /// logged.
   bool error_in_row_;
 
-  /// Memory to store partial tuples split across buffers.  Memory comes from
-  /// boundary_pool_.  There is only one tuple allocated for this object and reused
-  /// for boundary tuples.
+  /// Memory to store partial tuples split across buffers.  Does not hold any tuple data
+  /// of returned batches, because the data is always deep-copied into the output batch.
+  /// Memory comes from boundary_pool_.
   Tuple* partial_tuple_;
 
-  /// If false, there is a tuple that is partially materialized (i.e. partial_tuple_
-  /// contains data)
-  bool partial_tuple_empty_;
-
   /// Time parsing text files
   RuntimeProfile::Counter* parse_delimiter_timer_;
 };

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/082058e2/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 8e521d8..41c8d6b 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -17,6 +17,7 @@
 
 from copy import copy
 import itertools
+import math
 import os
 import pytest
 import random
@@ -213,17 +214,18 @@ class TestScannersFuzzing(ImpalaTestSuite):
     with open(path, "rb") as f:
       data = bytearray(f.read())
 
-    if rng.random() < 0.5:
+    num_corruptions = rng.randint(0, int(math.log(len(data))))
+    for _ in xrange(num_corruptions):
       flip_offset = rng.randint(0, len(data) - 1)
       flip_val = rng.randint(0, 255)
-      LOG.info("corrupt_file: Flip byte in %s at %d from %d to %d", path, flip_offset,
-          data[flip_offset], flip_val)
+      LOG.info("corrupt file: Flip byte in {} at {} from {} to {}".format(
+          path, flip_offset, data[flip_offset], flip_val))
       data[flip_offset] = flip_val
-    else:
+
+    if rng.random() < 0.4:
       truncation = rng.randint(0, len(data))
-      LOG.info("corrupt_file: Truncate %s to %d", path, truncation)
+      LOG.info("corrupt file: Truncate {} to {}".format(path, truncation))
       data = data[:truncation]
 
     with open(path, "wb") as f:
       f.write(data)
-