You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/27 22:40:28 UTC

[1/2] impala git commit: IMPALA-5031: Fix undefined behavior: right-shifting too far

Repository: impala
Updated Branches:
  refs/heads/master 170956b85 -> b5608264b


IMPALA-5031: Fix undefined behavior: right-shifting too far

In expr.shift, the C++ standard says of right shifts:

    The behavior is undefined if the right operand is negative, or
    greater than or equal to the length in bits of the promoted left
    operand.

In HdfsAvroScannerTest.DecimalTest, this is triggered, and the
interesting part of the backtrace is:

exec/hdfs-avro-scanner-ir.cc:272:18: runtime error: shift exponent 32 is too large for 32-bit type 'int32_t' (aka 'int')
    #0 0x1786f65 in HdfsAvroScanner::ReadAvroDecimal(int, unsigned char**, unsigned char*, bool, void*, MemPool*) exec/hdfs-avro-scanner-ir.cc:272:18
    #1 0x1617778 in void HdfsAvroScannerTest::TestReadAvroType<DecimalValue<int>, bool (HdfsAvroScanner::*)(int, unsigned char**, unsigned char*, bool, void*, MemPool*), unsigned long>(bool (HdfsAvroScanner::*)(int, unsigned char**, unsigned char*, bool, void*, MemPool*), unsigned long, unsigned char*, long, DecimalValue<int>, int, TErrorCode::type) exec/hdfs-avro-scanner-test.cc:88:20
    #2 0x1605705 in void HdfsAvroScannerTest::TestReadAvroDecimal<int>(unsigned char*, long, DecimalValue<int>, int, TErrorCode::type) exec/hdfs-avro-scanner-test.cc:184:5

Change-Id: Ic5be92912198af2a5e00053ceb9a4fab43ca6bb8
Reviewed-on: http://gerrit.cloudera.org:8080/11047
Reviewed-by: Jim Apple <jb...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/154c530c
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/154c530c
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/154c530c

Branch: refs/heads/master
Commit: 154c530c296036f65f8f7879e2a0eba101c6aced
Parents: 170956b
Author: Jim Apple <jb...@apache.org>
Authored: Tue Jul 24 20:51:21 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 27 21:01:35 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-avro-scanner-ir.cc | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/154c530c/be/src/exec/hdfs-avro-scanner-ir.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc
index ffd5774..8eba43e 100644
--- a/be/src/exec/hdfs-avro-scanner-ir.cc
+++ b/be/src/exec/hdfs-avro-scanner-ir.cc
@@ -257,6 +257,16 @@ bool HdfsAvroScanner::ReadAvroDecimal(int slot_byte_size, uint8_t** data,
       SetStatusInvalidValue(TErrorCode::AVRO_INVALID_LENGTH, len.val);
       return false;
     }
+    // The len.val == 0 case is special due to undefined behavior of shifting and memcpy,
+    // so we handle it separately.
+    if (UNLIKELY(len.val == 0)) {
+      if(LIKELY(slot_byte_size == 4 || slot_byte_size == 8 || slot_byte_size == 16)) {
+        memset(slot, 0, slot_byte_size);
+      } else {
+        DCHECK(false) << "Decimal slots can't be this size: " << slot_byte_size;
+      }
+      return true;
+    }
     // Decimals are encoded as big-endian integers. Copy the decimal into the most
     // significant bytes and then shift down to the correct position to sign-extend the
     // decimal.


[2/2] impala git commit: IMPALA-7324: remove MarkNeedsDeepCopy() from sorter

Posted by ta...@apache.org.
IMPALA-7324: remove MarkNeedsDeepCopy() from sorter

Testing:
Ran exhaustive tests.

Change-Id: If86f038f1f6ca81ad5c9d40af1b7ea6115144ffc
Reviewed-on: http://gerrit.cloudera.org:8080/11048
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b5608264
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b5608264
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b5608264

Branch: refs/heads/master
Commit: b5608264b4552e44eb73ded1e232a8775c3dba6b
Parents: 154c530
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Jul 23 23:29:04 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Fri Jul 27 22:34:47 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/row-batch.cc         | 10 ++--
 be/src/runtime/row-batch.h          | 10 ++--
 be/src/runtime/sorted-run-merger.cc |  6 +--
 be/src/runtime/sorted-run-merger.h  |  9 ++--
 be/src/runtime/sorter.cc            | 83 +++++++++++++-------------------
 5 files changed, 53 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index ca2a9f3..b5c51bb 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -44,7 +44,7 @@ const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
 RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_tracker)
   : num_rows_(0),
     capacity_(capacity),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(row_desc->tuple_descriptors().size()),
     tuple_ptrs_size_(capacity * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -71,7 +71,7 @@ RowBatch::RowBatch(
     const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
   : num_rows_(input_batch.num_rows),
     capacity_(input_batch.num_rows),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(input_batch.row_tuples.size()),
     tuple_ptrs_size_(capacity_ * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -108,7 +108,7 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header
     MemTracker* mem_tracker)
   : num_rows_(0),
     capacity_(0),
-    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(header.num_tuples_per_row()),
     tuple_ptrs_size_(header.num_rows() * num_tuples_per_row_ * sizeof(Tuple*)),
@@ -436,7 +436,7 @@ void RowBatch::Reset() {
   tuple_data_pool_.FreeAll();
   FreeBuffers();
   attached_buffer_bytes_ = 0;
-  flush_ = FlushMode::NO_FLUSH_RESOURCES;
+  flush_mode_ = FlushMode::NO_FLUSH_RESOURCES;
   needs_deep_copy_ = false;
 }
 
@@ -449,7 +449,7 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   buffers_.clear();
   if (needs_deep_copy_) {
     dest->MarkNeedsDeepCopy();
-  } else if (flush_ == FlushMode::FLUSH_RESOURCES) {
+  } else if (flush_mode_ == FlushMode::FLUSH_RESOURCES) {
     dest->MarkFlushResources();
   }
   Reset();

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 08df8a8..67adb9b 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -198,7 +198,7 @@ class RowBatch {
     DCHECK_LE(num_rows_, capacity_);
     // Check AtCapacity() condition enforced in MarkNeedsDeepCopy() and
     // MarkFlushResources().
-    DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
+    DCHECK((!needs_deep_copy_ && flush_mode_ == FlushMode::NO_FLUSH_RESOURCES)
         || num_rows_ == capacity_);
     int64_t mem_usage = attached_buffer_bytes_ + tuple_data_pool_.total_allocated_bytes();
     return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
@@ -293,9 +293,11 @@ class RowBatch {
   void MarkFlushResources() {
     DCHECK_LE(num_rows_, capacity_);
     capacity_ = num_rows_;
-    flush_ = FlushMode::FLUSH_RESOURCES;
+    flush_mode_ = FlushMode::FLUSH_RESOURCES;
   }
 
+  FlushMode flush_mode() const { return flush_mode_; }
+
   /// Called to indicate that some resources backing this batch were not attached and
   /// will be cleaned up after the next GetNext() call. This means that the batch must
   /// be returned up the operator tree. Blocking operators must deep-copy any rows from
@@ -499,10 +501,10 @@ class RowBatch {
   /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
   /// acquired by a new owner as soon as possible. See MarkFlushResources(). If
   /// FLUSH_RESOURCES, AtCapacity() is also true.
-  FlushMode flush_;
+  FlushMode flush_mode_;
 
   /// If true, this batch references unowned memory that will be cleaned up soon.
-  /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
+  /// See MarkNeedsDeepCopy(). If true, 'flush_mode_' is FLUSH_RESOURCES and
   /// AtCapacity() is true.
   bool needs_deep_copy_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index 8f5ede9..64feeb7 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -65,9 +65,9 @@ class SortedRunMerger::SortedRunWrapper {
     do {
       // Make sure to transfer resources from every batch received from 'sorted_run_'.
       if (transfer_batch != NULL) {
-        DCHECK(!input_row_batch_->needs_deep_copy())
-            << "Run batch suppliers that set the "
-               "needs_deep_copy flag must use a deep-copying merger";
+        DCHECK_ENUM_EQ(
+            RowBatch::FlushMode::NO_FLUSH_RESOURCES, input_row_batch_->flush_mode())
+            << "Run batch suppliers that flush resources must use a deep-copying merger";
         input_row_batch_->TransferResourceOwnership(transfer_batch);
       }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h
index 99ffbe9..cfa07cb 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -43,10 +43,11 @@ class RuntimeProfile;
 /// and transfers resource ownership from the input batches to the output batch when
 /// an input batch is processed.
 ///
-/// SortedRunMerger cannot handle the 'needs_deep_copy' resource-transfer model so
-/// if the RunBatchSupplierFn can return batches with the 'needs_deep_copy' flag set,
-/// the merger must have 'deep_copy_input' set. TODO: once 'needs_deep_copy' is removed
-/// this is no longer a problem.
+/// SortedRunMerger cannot handle "flushing resources" so if the RunBatchSupplierFn
+/// can return batches with FLUSH_RESOURCES set, the merger must have 'deep_copy_input'
+/// set. This is because AdvanceMinRow() gets the next batch before freeing resources
+/// from the previous batch.
+/// TODO: it would be nice to fix this to avoid unnecessary copies.
 class SortedRunMerger {
  public:
   /// Function that returns the next batch of rows from an input sorted run. The batch

http://git-wip-us.apache.org/repos/asf/impala/blob/b5608264/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index a8bab16..806a0b4 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -334,8 +334,8 @@ class Sorter::Run {
   Status AddPage(vector<Page>* page_sequence) WARN_UNUSED_RESULT;
 
   /// Advance to the next read page. If the run is pinned, has no effect. If the run
-  /// is unpinned, atomically pin the page at 'page_index' + 1 in 'pages' and delete
-  /// the page at 'page_index'.
+  /// is unpinned, the pin at 'page_index' was already attached to an output batch and
+  /// this function will pin the page at 'page_index' + 1 in 'pages'.
   Status PinNextReadPage(vector<Page>* pages, int page_index) WARN_UNUSED_RESULT;
 
   /// Copy the StringValues in 'var_values' to 'dest' in order and update the StringValue
@@ -918,22 +918,11 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
 
   if (end_of_fixed_len_page_
       && fixed_len_pages_index_ >= static_cast<int>(fixed_len_pages_.size()) - 1) {
-    if (is_pinned_) {
-      // All pages were previously attached to output batches. GetNextBatch() assumes
-      // that we don't attach resources to the batch on eos.
-      DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
-      DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
-
-      // Flush resources in case we are in a subplan and need to allocate more pages
-      // when the node is reopened.
-      output_batch->MarkFlushResources();
-    } else {
-      // We held onto the last fixed or var len blocks without transferring them to the
-      // caller. We signalled MarkNeedsDeepCopy() to the caller, so we can safely delete
-      // them now to free memory.
-      if (!fixed_len_pages_.empty()) DCHECK_EQ(NumOpenPages(fixed_len_pages_), 1);
-      if (!var_len_pages_.empty()) DCHECK_EQ(NumOpenPages(var_len_pages_), 1);
-    }
+    // All pages were previously attached to output batches. GetNextBatch() assumes
+    // that we don't attach resources to the batch on eos.
+    DCHECK_EQ(NumOpenPages(fixed_len_pages_), 0);
+    DCHECK_EQ(NumOpenPages(var_len_pages_), 0);
+
     CloseAllPages();
     *eos = true;
     DCHECK_EQ(num_tuples_returned_, num_tuples_);
@@ -976,12 +965,16 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
     if (CONVERT_OFFSET_TO_PTR && !ConvertOffsetsToPtrs(input_tuple)) {
       DCHECK(!is_pinned_);
       // The var-len data is in the next page. We are done with the current page, so
-      // return rows we've accumulated so far and advance to the next page in the next
-      // GetNext() call. This is needed for the unpinned case where we will exchange
-      // this page for the next in the next GetNext() call. So therefore we must hold
-      // onto the current var-len page and signal to the caller that the page is going
-      // to be deleted.
-      output_batch->MarkNeedsDeepCopy();
+      // return rows we've accumulated so far along with the page's buffer and advance
+      // to the next page in the next GetNext() call. We need the page's reservation to
+      // pin the next page, so flush resources.
+      DCHECK_GE(var_len_pages_index_, 0);
+      DCHECK_LT(var_len_pages_index_, var_len_pages_.size());
+      BufferPool::BufferHandle buffer =
+          var_len_pages_[var_len_pages_index_].ExtractBuffer(
+              sorter_->buffer_pool_client_);
+      output_batch->AddBuffer(sorter_->buffer_pool_client_, move(buffer),
+          RowBatch::FlushMode::FLUSH_RESOURCES);
       end_of_var_len_page_ = true;
       break;
     }
@@ -993,27 +986,23 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
 
   if (fixed_len_page_offset_ >= fixed_len_page->valid_data_len()) {
     // Reached the page boundary, need to move to the next page.
-    if (is_pinned_) {
-      BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
-      // Attach page to batch. Caller can delete the page when it wants to.
-      output_batch->AddBuffer(client,
-          fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client),
-          RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-
-      // Attach the var-len pages at eos once no more rows will reference the pages.
-      if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
-        for (Page& var_len_page : var_len_pages_) {
-          DCHECK(var_len_page.is_open());
+    BufferPool::ClientHandle* client = sorter_->buffer_pool_client_;
+    // Attach page to batch. For unpinned pages we need to flush resource so we can pin
+    // the next page on the next call to GetNext().
+    RowBatch::FlushMode flush = is_pinned_ ? RowBatch::FlushMode::NO_FLUSH_RESOURCES :
+                                             RowBatch::FlushMode::FLUSH_RESOURCES;
+    output_batch->AddBuffer(
+        client, fixed_len_pages_[fixed_len_pages_index_].ExtractBuffer(client), flush);
+    // Attach remaining var-len pages at eos once no more rows will reference the pages.
+    if (fixed_len_pages_index_ == fixed_len_pages_.size() - 1) {
+      for (Page& var_len_page : var_len_pages_) {
+        DCHECK(!is_pinned_ || var_len_page.is_open());
+        if (var_len_page.is_open()) {
           output_batch->AddBuffer(client, var_len_page.ExtractBuffer(client),
               RowBatch::FlushMode::NO_FLUSH_RESOURCES);
         }
-        var_len_pages_.clear();
       }
-    } else {
-      // To iterate over unpinned runs, we need to exchange this page for the next
-      // in the next GetNext() call, so we need to hold onto the page and signal to
-      // the caller that the page is going to be deleted.
-      output_batch->MarkNeedsDeepCopy();
+      var_len_pages_.clear();
     }
     end_of_fixed_len_page_ = true;
   }
@@ -1027,14 +1016,10 @@ Status Sorter::Run::PinNextReadPage(vector<Page>* pages, int page_index) {
   Page* curr_page = &(*pages)[page_index];
   Page* next_page = &(*pages)[page_index + 1];
   DCHECK_EQ(is_pinned_, next_page->is_pinned());
-  if (is_pinned_) {
-    // The current page was attached to a batch and 'next_page' is already pinned.
-    DCHECK(!curr_page->is_open());
-    return Status::OK();
-  }
-  // Close the previous page to free memory and pin the next page. Should always succeed
-  // since the pages are the same size.
-  curr_page->Close(sorter_->buffer_pool_client_);
+  // The current page was attached to a batch.
+  DCHECK(!curr_page->is_open());
+  // 'next_page' is already pinned if the whole stream is pinned.
+  if (is_pinned_) return Status::OK();
   RETURN_IF_ERROR(next_page->Pin(sorter_->buffer_pool_client_));
   return Status::OK();
 }