You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/05/10 22:57:54 UTC

[impala] branch master updated: IMPALA-4658: Potential race if compiler reorders ReachedLimit() usage.

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 832c9de  IMPALA-4658: Potential race if compiler reorders ReachedLimit() usage.
832c9de is described below

commit 832c9de7810b47b5f782bccb761e07264e7548e5
Author: Abhishek Rawat <ar...@cloudera.com>
AuthorDate: Thu Apr 18 14:18:46 2019 -0700

    IMPALA-4658: Potential race if compiler reorders ReachedLimit() usage.
    
    The ExecNode::num_rows_returned_ member is shared by the scan node
    (thread) and the related scanner threads. There is a potential race here
    since the compiler is free to reorder the load/store of
    num_rows_returned_ member. Also, num_rows_returned_ is not accessed in
    a thread safe manner.
    
    This change introduces regular and thread safe functions for accessing
    and updating the num_rows_returned_ member, which is now private to the
    ExecNode class. The getters/modifiers for single threaded code-paths
    are:
      rows_returned()
      ReachedLimit()
      SetNumRowsReturned()
      IncrementNumRowsReturned()
      DecrementNumRowsReturned()
      CheckLimitAndTruncateRowBatchIfNeeded()
    Thread safe counterparts are:
      rows_returned_shared()
      ReachedLimitShared()
      IncrementNumRowsReturnedShared()
      DecrementNumRowsReturnedShared()
      CheckLimitAndTruncateRowBatchIfNeededShared()
    Debug checks are added to ensure that regular and thread safe functions
    are used properly in single-threaded and multi-threaded code paths,
    respectively.
    
    The initial attempt to address this issue by making num_rows_returned_
    atomic caused performance regressions in single threaded code paths
    which update the variable inside a loop, such as
    ExchangeNode::GetNext(). The regression could be attributed to extra
    path length introduced by atomic operations such as read modify
    write. And since the GetNext() function could be called multiple times,
    with each call updating the atomic multiple times, the regression could
    be huge. Some serious thoughts were given to other approaches such as
    updating the atomic num_rows_returned_ variable outside of a loop, but
    such a change would have been very extensive and error prone. There is
    nothing stopping someone from not adding atomic updates in a tight loop.
    After much deliberation it was decided that the single threaded code
    path should not be penalised and as such it probably makes sense to
    use separate functions for single threaded and multi-threaded code
    paths, provided the interfaces are simple enough, easy to maintain and
    there are checks in place to ensure proper use.
    
    The scan nodes with task based multi-threading (MT) also use the new
    thread-safe functions in the code-paths shared with the non MT
    scan nodes, even though the variable is not shared amongst various
    threads. This is because both the task based MT code-path and the
    regular scan node code path calls some common functions and it
    was getting very tricky and ugly to differentiate between the two. The
    overhead of using atomics is minimal for MT code path since the atomic
    is not accessed for every row. The only exceptions are HBaseScanNode
    and DataSourceScanNode which uses the regular functions.
    
    Testing:
    - Performance tests were run on tpch and tpcds workload. No real
      regressions were found, but both baseline and test runs showed high
      degree of variability for some queries. Multiple runs were done and
      it was concluded that the variability is not introduced by this
      patch.
    
    Change-Id: I4cbbfad80f7ab87dd6f192a24e2c68f7c66b047e
    Reviewed-on: http://gerrit.cloudera.org:8080/13178
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/aggregation-node.cc             |   4 +-
 be/src/exec/analytic-eval-node.cc           |   4 +-
 be/src/exec/cardinality-check-node.cc       |   4 +-
 be/src/exec/data-source-scan-node.cc        |   8 +--
 be/src/exec/exchange-node.cc                |  13 ++--
 be/src/exec/exec-node.cc                    |  33 ++++++++-
 be/src/exec/exec-node.h                     | 106 +++++++++++++++++++++++++++-
 be/src/exec/hbase-scan-node.cc              |   6 +-
 be/src/exec/hdfs-avro-scanner.cc            |   4 +-
 be/src/exec/hdfs-orc-scanner.cc             |   8 +--
 be/src/exec/hdfs-rcfile-scanner.cc          |   2 +-
 be/src/exec/hdfs-scan-node-base.cc          |   2 +-
 be/src/exec/hdfs-scan-node-mt.cc            |   9 +--
 be/src/exec/hdfs-scan-node-mt.h             |   1 +
 be/src/exec/hdfs-scan-node.cc               |  22 ++----
 be/src/exec/hdfs-scan-node.h                |   4 ++
 be/src/exec/hdfs-scanner.cc                 |   2 +-
 be/src/exec/hdfs-sequence-scanner.cc        |   8 +--
 be/src/exec/hdfs-text-scanner.cc            |   7 +-
 be/src/exec/kudu-scan-node-mt.cc            |   9 +--
 be/src/exec/kudu-scan-node-mt.h             |   7 +-
 be/src/exec/kudu-scan-node.cc               |  14 +---
 be/src/exec/kudu-scan-node.h                |   3 +
 be/src/exec/kudu-scanner.cc                 |   4 +-
 be/src/exec/nested-loop-join-node.cc        |  16 ++---
 be/src/exec/parquet/hdfs-parquet-scanner.cc |   6 +-
 be/src/exec/partial-sort-node.cc            |   8 +--
 be/src/exec/partitioned-hash-join-node.cc   |   8 +--
 be/src/exec/select-node-ir.cc               |   2 +-
 be/src/exec/select-node.cc                  |   2 +-
 be/src/exec/sort-node.cc                    |   8 +--
 be/src/exec/streaming-aggregation-node.cc   |   4 +-
 be/src/exec/subplan-node.cc                 |  12 ++--
 be/src/exec/topn-node.cc                    |   4 +-
 be/src/exec/union-node.cc                   |   8 +--
 be/src/exec/unnest-node.cc                  |  10 +--
 36 files changed, 235 insertions(+), 137 deletions(-)

diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 00157f0..9ce87d1 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -126,8 +126,8 @@ Status AggregationNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   if (pagg_eos) ++curr_output_agg_idx_;
 
   *eos = ReachedLimit() || (pagg_eos && curr_output_agg_idx_ >= aggs_.size());
-  num_rows_returned_ += row_batch->num_rows();
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  IncrementNumRowsReturned(row_batch->num_rows());
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index a4e5e06..0ede1a6 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -725,7 +725,7 @@ Status AnalyticEvalNode::GetNextOutputBatch(
     input_batch.CopyRow(input_batch.GetRow(i), dest);
     dest->SetTuple(num_child_tuples, result_tuples_.front().second);
     output_batch->CommitLastRow();
-    ++num_rows_returned_;
+    IncrementNumRowsReturned(1);
 
     // Remove the head of result_tuples_ if all rows using that evaluated tuple
     // have been returned.
@@ -809,7 +809,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
     prev_pool_last_window_idx_ = -1;
   }
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/cardinality-check-node.cc b/be/src/exec/cardinality-check-node.cc
index 16ec669..454db79 100644
--- a/be/src/exec/cardinality-check-node.cc
+++ b/be/src/exec/cardinality-check-node.cc
@@ -89,8 +89,8 @@ Status CardinalityCheckNode::GetNext(
     output_row_batch->CopyRow(src_row, dst_row);
     output_row_batch->CommitLastRow();
     row_batch_->TransferResourceOwnership(output_row_batch);
-    num_rows_returned_ = 1;
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    SetNumRowsReturned(1);
+    COUNTER_SET(rows_returned_counter_, rows_returned());
   }
   *eos = true;
   row_batch_->Reset();
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 72423da..2273a50 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -359,13 +359,13 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
           row_batch->CommitLastRow();
           tuple = reinterpret_cast<Tuple*>(
               reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
-          ++num_rows_returned_;
+          IncrementNumRowsReturned(1);
         }
         ++next_row_idx_;
       }
-      if (ReachedLimit() || row_batch->AtCapacity() || input_batch_->eos) {
-        *eos = ReachedLimit() || input_batch_->eos;
-        COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+      if (row_batch->AtCapacity() || input_batch_->eos || ReachedLimit()) {
+        *eos = input_batch_->eos || ReachedLimit();
+        COUNTER_SET(rows_returned_counter_, rows_returned());
         COUNTER_ADD(rows_read_counter_, rows_read);
         return Status::OK();
       }
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 04e7de0..961cfdf 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -190,9 +190,9 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool*
         // rows in output_batch
         input_batch_->CopyRow(src, dest);
         output_batch->CommitLastRow();
-        ++num_rows_returned_;
+        IncrementNumRowsReturned(1);
       }
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+      COUNTER_SET(rows_returned_counter_, rows_returned());
 
       if (ReachedLimit()) {
         stream_recvr_->TransferAllResources(output_batch);
@@ -235,18 +235,13 @@ Status ExchangeNode::GetNextMerging(RuntimeState* state, RowBatch* output_batch,
     RETURN_IF_ERROR(stream_recvr_->GetNext(output_batch, eos));
   }
 
-  num_rows_returned_ += output_batch->num_rows();
-  if (ReachedLimit()) {
-    output_batch->set_num_rows(output_batch->num_rows() - (num_rows_returned_ - limit_));
-    num_rows_returned_ = limit_;
-    *eos = true;
-  }
+  CheckLimitAndTruncateRowBatchIfNeeded(output_batch, eos);
 
   // On eos, transfer all remaining resources from the input batches maintained
   // by the merger to the output batch.
   if (*eos) stream_recvr_->TransferAllResources(output_batch);
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 16f074a..1dda4f2 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -81,13 +81,13 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl
     row_descriptor_(descs, tnode.row_tuples, tnode.nullable_tuples),
     resource_profile_(tnode.resource_profile),
     limit_(tnode.limit),
-    num_rows_returned_(0),
     runtime_profile_(RuntimeProfile::Create(
         pool_, Substitute("$0 (id=$1)", PrintThriftEnum(tnode.node_type), id_))),
     rows_returned_counter_(NULL),
     rows_returned_rate_(NULL),
     containing_subplan_(NULL),
     disable_codegen_(tnode.disable_codegen),
+    num_rows_returned_(0),
     is_closed_(false) {
   runtime_profile_->SetPlanNodeId(id_);
   debug_options_.phase = TExecNodePhase::INVALID;
@@ -410,6 +410,37 @@ Status ExecNode::ExecDebugActionImpl(TExecNodePhase::type phase, RuntimeState* s
   return Status::OK();
 }
 
+bool ExecNode::CheckLimitAndTruncateRowBatchIfNeeded(RowBatch* row_batch, bool* eos) {
+  DCHECK(limit_ != 0);
+  const int row_batch_size = row_batch->num_rows();
+  const bool reached_limit =
+      !(limit_ == -1 || (rows_returned() + row_batch_size) < limit_);
+  const int num_rows_to_consume =
+      !reached_limit ? row_batch_size : limit_ - rows_returned();
+  IncrementNumRowsReturned(num_rows_to_consume);
+  if (reached_limit) {
+    row_batch->set_num_rows(num_rows_to_consume);
+    *eos = true;
+  }
+  return reached_limit;
+}
+
+bool ExecNode::CheckLimitAndTruncateRowBatchIfNeededShared(
+    RowBatch* row_batch, bool* eos) {
+  DCHECK(limit_ != 0);
+  const int row_batch_size = row_batch->num_rows();
+  const bool reached_limit =
+      !(limit_ == -1 || (rows_returned_shared() + row_batch_size) < limit_);
+  const int num_rows_to_consume =
+      !reached_limit ? row_batch_size : limit_ - rows_returned_shared();
+  IncrementNumRowsReturnedShared(num_rows_to_consume);
+  if (reached_limit) {
+    row_batch->set_num_rows(num_rows_to_consume);
+    *eos = true;
+  }
+  return reached_limit;
+}
+
 bool ExecNode::EvalConjuncts(
     ScalarExprEvaluator* const* evals, int num_conjuncts, TupleRow* row) {
   for (int i = 0; i < num_conjuncts; ++i) {
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 28a0baa..8ec9f3d 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -203,9 +203,45 @@ class ExecNode {
     DCHECK(containing_subplan_ == NULL);
     containing_subplan_ = sp;
   }
-  int64_t rows_returned() const { return num_rows_returned_; }
+
   int64_t limit() const { return limit_; }
-  bool ReachedLimit() { return limit_ != -1 && num_rows_returned_ >= limit_; }
+
+  /// Returns the number of rows returned by this Node.
+  int64_t rows_returned() const {
+    DCHECK(getExecutionModel() != NON_TASK_BASED_SYNC);
+    return num_rows_returned_;
+  }
+
+  /// Returns the number of rows returned by this Node. Thread safe version of
+  /// rows_returned().
+  /// TODO: The thread safe versions can be removed if we remove the legacy multi-threaded
+  /// scan nodes.
+  int64_t rows_returned_shared() const {
+    // Ideally this function should only be called when num_rows_returned_ is shared by
+    // multiple threads. Both the HdfsScanNodeMt and KuduScanNodeMt call this function
+    // from the scanner code-path for simplicity.
+    DCHECK(
+        getExecutionModel() == NON_TASK_BASED_SYNC || getExecutionModel() == TASK_BASED);
+    return base::subtle::Acquire_Load(&num_rows_returned_);
+  }
+
+  /// Returns true if a valid limit is set and number of rows returned by this node has
+  /// exceeded the limit.
+  bool ReachedLimit() {
+    DCHECK(getExecutionModel() != NON_TASK_BASED_SYNC);
+    return limit_ != -1 && num_rows_returned_ >= limit_;
+  }
+
+  /// Returns true if a valid limit is set and number of rows returned by this node has
+  /// exceeded the limit. Thread safe version of ReachedLimit().
+  bool ReachedLimitShared() {
+    // Ideally this function should only be called when num_rows_returned_ is shared by
+    // multiple threads. Both the HdfsScanNodeMt and KuduScanNodeMt call this function
+    // from the scanner code-path for simplicity.
+    DCHECK(
+        getExecutionModel() == NON_TASK_BASED_SYNC || getExecutionModel() == TASK_BASED);
+    return limit_ != -1 && base::subtle::Acquire_Load(&num_rows_returned_) >= limit_;
+  }
 
   RuntimeProfile* runtime_profile() { return runtime_profile_; }
   MemTracker* mem_tracker() { return mem_tracker_.get(); }
@@ -231,6 +267,22 @@ class ExecNode {
   friend class ScopedGetNextEventAdder;
   friend class ScopedOpenEventAdder;
 
+  enum ExecutionModel {
+    /// Exec nodes with single threaded execution. This is the default execution model
+    /// for majority of the nodes. BlockingJoin node is an exception since it could spawn
+    /// a build thread in some cases. Due to the blocking nature of such join operators,
+    /// it can be considered as not requiring explicit synchronization.
+    NON_TASK_BASED_NO_SYNC,
+    /// Exec nodes which spawn multiple worker threads. Examples are HdfsScanNode and
+    /// and KuduScanNode. Requires syncronization if the main thread and worker threads
+    /// share resources.
+    NON_TASK_BASED_SYNC,
+    /// Task Based multi-threading. Examples are HdfsScanNodeMt and KuduScanNodeMt.
+    TASK_BASED
+  };
+
+  virtual ExecutionModel getExecutionModel() const { return NON_TASK_BASED_NO_SYNC; }
+
   BufferPool::ClientHandle* buffer_pool_client() {
     return reservation_manager_.buffer_pool_client();
   }
@@ -275,7 +327,6 @@ class ExecNode {
   TDebugOptions debug_options_;
 
   int64_t limit_;  // -1: no limit
-  int64_t num_rows_returned_;
 
   /// Runtime profile for this node. Owned by the QueryState's ObjectPool.
   RuntimeProfile* const runtime_profile_;
@@ -333,7 +384,56 @@ class ExecNode {
   /// TODO: IMPALA-2399: replace QueryMaintenance() - see JIRA for more details.
   Status QueryMaintenance(RuntimeState* state) WARN_UNUSED_RESULT;
 
+  /// Sets the number of rows returned.
+  void SetNumRowsReturned(int64_t value) {
+    DCHECK(getExecutionModel() != NON_TASK_BASED_SYNC);
+    num_rows_returned_ = value;
+    DFAKE_SCOPED_LOCK_THREAD_LOCKED(single_thread_check_);
+  }
+
+  /// Increment the number of rows returned.
+  void IncrementNumRowsReturned(int64_t value) {
+    DCHECK(getExecutionModel() != NON_TASK_BASED_SYNC);
+    num_rows_returned_ += value;
+    DFAKE_SCOPED_LOCK_THREAD_LOCKED(single_thread_check_);
+  }
+
+  /// Increment the number of rows returned. Thread safe version of
+  /// IncrementNumRowsReturned().
+  void IncrementNumRowsReturnedShared(int64_t value) {
+    DCHECK(getExecutionModel() == NON_TASK_BASED_SYNC);
+    base::subtle::Barrier_AtomicIncrement(&num_rows_returned_, value);
+  }
+
+  /// Decrement the number of rows returned.
+  void DecrementNumRowsReturned(int64_t value) { IncrementNumRowsReturned(-1 * value); }
+
+  /// Decrement the number of rows returned. Thread safe version of
+  /// DecrementNumRowsReturned().
+  void DecrementNumRowsReturnedShared(int64_t value) {
+    IncrementNumRowsReturnedShared(-1 * value);
+  }
+
+  /// Caps the input row batch to ensure that the limit is not exceeded.
+  /// Sets the eos and returns true, if the limit is reached.
+  bool CheckLimitAndTruncateRowBatchIfNeeded(RowBatch* row_batch, bool* eos);
+
+  /// Caps the input row batch to ensure that the limit is not exceeded.
+  /// Sets the eos and returns true, if the limit is reached.
+  /// Uses thread safe functions.
+  bool CheckLimitAndTruncateRowBatchIfNeededShared(RowBatch* row_batch, bool* eos);
+
  private:
+  /// Keeps track of number of rows returned by an exec node. If this variable is shared
+  /// by multiple threads, it should be accessed using thread-safe functions defined
+  /// above. The single-threaded code-paths should use non-atomic functions defined
+  /// above. The only exceptions are HdfsScanNodeMt and KuduScanNodeMt, which are single
+  /// threaded (task based multi-threading support), but use the thread-safe functions
+  /// in the scanner code-path for code simplicity. This is because both the task based
+  /// MT scan nodes (HdfsScanNodeMt/KuduScanNodeMt) and regular scan nodes
+  /// (HdfsScanNode/KuduScanNode) call common scanner functions.
+  int64_t num_rows_returned_;
+  DFAKE_MUTEX(single_thread_check_);
   /// Implementation of ExecDebugAction(). This is the slow path we take when there is
   /// actually a debug action enabled for 'phase'.
   Status ExecDebugActionImpl(
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 1b99c8d..a794f45 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -179,7 +179,7 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo
   while (true) {
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
-    if (ReachedLimit() || row_batch->AtCapacity()) {
+    if (row_batch->AtCapacity() || ReachedLimit()) {
       // hang on to last allocated chunk in pool, we'll keep writing into it in the
       // next GetNext() call
       *eos = ReachedLimit();
@@ -251,8 +251,8 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo
     DCHECK_EQ(conjunct_evals_.size(), conjuncts_.size());
     if (EvalConjuncts(conjunct_evals_.data(), conjuncts_.size(), row)) {
       row_batch->CommitLastRow();
-      ++num_rows_returned_;
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+      IncrementNumRowsReturned(1);
+      COUNTER_SET(rows_returned_counter_, rows_returned());
       tuple = reinterpret_cast<Tuple*>(
           reinterpret_cast<uint8_t*>(tuple) + tuple_desc_->byte_size());
     } else {
diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc
index bad2b3b..967227c 100644
--- a/be/src/exec/hdfs-avro-scanner.cc
+++ b/be/src/exec/hdfs-avro-scanner.cc
@@ -498,7 +498,7 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
   // so that we make progress even if the batch starts off with AtCapacity() == true,
   // which can happen if the tuple buffer is > 8MB.
   DCHECK_GT(row_batch->capacity(), row_batch->num_rows());
-  while (!eos_ && !scan_node_->ReachedLimit()) {
+  while (!eos_ && !scan_node_->ReachedLimitShared()) {
     if (record_pos_ == num_records_in_block_) {
       // Read new data block
       RETURN_IF_FALSE(stream_->ReadZLong(&num_records_in_block_, &parse_status_));
@@ -560,7 +560,7 @@ Status HdfsAvroScanner::ProcessRange(RowBatch* row_batch) {
       RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
       record_pos_ += max_tuples;
       COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
-      if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+      if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
     }
 
     if (record_pos_ == num_records_in_block_) {
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 6bdeeeb..a3abc2f 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -432,7 +432,7 @@ Status HdfsOrcScanner::ProcessSplit() {
     if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
       CheckFiltersEffectiveness();
     }
-  } while (!eos_ && !scan_node_->ReachedLimit());
+  } while (!eos_ && !scan_node_->ReachedLimitShared());
   return Status::OK();
 }
 
@@ -593,7 +593,7 @@ Status HdfsOrcScanner::NextStripe() {
 }
 
 Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
-  bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled();
+  bool continue_execution = !scan_node_->ReachedLimitShared() && !context_->cancelled();
   if (!continue_execution) return Status::CancelledInternal("ORC scanner");
 
   // We're going to free the previous batch. Clear the reference first.
@@ -628,7 +628,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
 
     RETURN_IF_ERROR(TransferTuples(orc_root_reader_, row_batch));
     if (row_batch->AtCapacity()) break;
-    continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
+    continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
   }
   stripe_rows_read_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
@@ -721,7 +721,7 @@ Status HdfsOrcScanner::AssembleCollection(
       conjunct_evals_map_[tuple_desc->id()];
 
   int tuple_idx = 0;
-  while (!scan_node_->ReachedLimit() && !context_->cancelled()
+  while (!scan_node_->ReachedLimitShared() && !context_->cancelled()
       && tuple_idx < total_tuples) {
     MemPool* pool;
     Tuple* tuple;
diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc
index 37376c6..bf3d99c 100644
--- a/be/src/exec/hdfs-rcfile-scanner.cc
+++ b/be/src/exec/hdfs-rcfile-scanner.cc
@@ -612,7 +612,7 @@ Status HdfsRCFileScanner::ProcessRange(RowBatch* row_batch) {
     }
     COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
     RETURN_IF_ERROR(CommitRows(num_to_commit, row_batch));
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
   }
 
   if (row_pos_ == num_rows_) {
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 9f453fc..20cf908 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -470,7 +470,7 @@ Status HdfsScanNodeBase::IssueInitialScanRanges(RuntimeState* state) {
     MakeScopeExitTrigger([&](){ UpdateRemainingScanRangeSubmissions(-1); });
 
   // No need to issue ranges with limit 0.
-  if (ReachedLimit()) {
+  if (ReachedLimitShared()) {
     DCHECK_EQ(limit_, 0);
     return Status::OK();
   }
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 4e59e0a..7a42e3f 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -111,17 +111,12 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   }
   InitNullCollectionValues(row_batch);
 
-  num_rows_returned_ += row_batch->num_rows();
-  if (ReachedLimit()) {
-    int num_rows_over = num_rows_returned_ - limit_;
-    row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
-    num_rows_returned_ -= num_rows_over;
+  if (CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos)) {
     scan_range_ = NULL;
     scanner_->Close(row_batch);
     scanner_.reset();
-    *eos = true;
   }
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
 
   if (*eos) StopAndFinalizeCounters();
   return Status::OK();
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 22c3d46..2dc0ac3 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -47,6 +47,7 @@ class HdfsScanNodeMt : public HdfsScanNodeBase {
   virtual void Close(RuntimeState* state) override;
 
   virtual bool HasRowBatchQueue() const override { return false; }
+  virtual ExecutionModel getExecutionModel() const override { return TASK_BASED; }
 
  private:
   /// Create and open new scanner for this partition type.
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 545d88d..2c64f97 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -121,7 +121,7 @@ Status HdfsScanNode::GetNextInternal(
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
 
-  if (ReachedLimit()) {
+  if (ReachedLimitShared()) {
     // LIMIT 0 case.  Other limit values handled below.
     DCHECK_EQ(limit_, 0);
     *eos = true;
@@ -131,22 +131,10 @@ Status HdfsScanNode::GetNextInternal(
   unique_ptr<RowBatch> materialized_batch = thread_state_.batch_queue()->GetBatch();
   if (materialized_batch != NULL) {
     row_batch->AcquireState(materialized_batch.get());
-    // Update the number of materialized rows now instead of when they are materialized.
-    // This means that scanners might process and queue up more rows than are necessary
-    // for the limit case but we want to avoid the synchronized writes to
-    // num_rows_returned_.
-    num_rows_returned_ += row_batch->num_rows();
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
-    if (ReachedLimit()) {
-      int num_rows_over = num_rows_returned_ - limit_;
-      row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
-      num_rows_returned_ -= num_rows_over;
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
-      *eos = true;
-      SetDone();
-    }
+    // Note that the scanner threads may have processed and queued up extra rows before
+    // this thread incremented the rows returned.
+    if (CheckLimitAndTruncateRowBatchIfNeededShared(row_batch, eos)) SetDone();
+    COUNTER_SET(rows_returned_counter_, rows_returned_shared());
     materialized_batch.reset();
     return Status::OK();
   }
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 3f2e56d..eec58b2 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -106,6 +106,10 @@ class HdfsScanNode : public HdfsScanNodeBase {
   /// Transfers all memory from 'pool' to 'scan_node_pool_'.
   virtual void TransferToScanNodePool(MemPool* pool) override;
 
+  virtual ExecutionModel getExecutionModel() const override {
+    return NON_TASK_BASED_SYNC;
+  }
+
  private:
   ScannerThreadState thread_state_;
 
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index cdb1de5..ae04cec 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -138,7 +138,7 @@ Status HdfsScanner::ProcessSplit() {
     // data referenced by previously appended batches.
     if (returned_rows) scan_node->AddMaterializedRowBatch(move(batch));
     RETURN_IF_ERROR(status);
-  } while (!eos_ && !scan_node_->ReachedLimit());
+  } while (!eos_ && !scan_node_->ReachedLimitShared());
   return Status::OK();
 }
 
diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc
index 2183655..610325b 100644
--- a/be/src/exec/hdfs-sequence-scanner.cc
+++ b/be/src/exec/hdfs-sequence-scanner.cc
@@ -187,7 +187,7 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch)
   // Step 2
   while (num_buffered_records_in_compressed_block_ > 0) {
     RETURN_IF_ERROR(ProcessDecompressedBlock(row_batch));
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
   }
 
   if (num_buffered_records_in_compressed_block_ == 0) {
@@ -291,8 +291,8 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) {
   }
 
   int max_added_tuples = (scan_node_->limit() == -1) ?
-                         num_to_process :
-                         scan_node_->limit() - scan_node_->rows_returned();
+      num_to_process :
+      scan_node_->limit() - scan_node_->rows_returned_shared();
 
   // Materialize parsed cols to tuples
   SCOPED_TIMER(scan_node_->materialize_tuple_timer());
@@ -393,7 +393,7 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) {
 
     // These checks must come after advancing past the next sync such that the stream is
     // at the start of the next data block when this function is called again.
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
   }
 
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc
index 7bd41d7..a911bae 100644
--- a/be/src/exec/hdfs-text-scanner.cc
+++ b/be/src/exec/hdfs-text-scanner.cc
@@ -415,7 +415,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) {
       break;
     }
 
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
   }
   return Status::OK();
 }
@@ -446,7 +446,7 @@ Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) {
     int num_tuples;
     RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples));
   }
-  if (scan_node_->ReachedLimit()) {
+  if (scan_node_->ReachedLimitShared()) {
     eos_ = true;
     scan_state_ = DONE;
     return Status::OK();
@@ -845,7 +845,8 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
     const bool copy_strings = !string_slot_offsets_.empty() &&
         stream_->file_desc()->file_compression == THdfsCompression::NONE;
     int max_added_tuples = (scan_node_->limit() == -1) ?
-        num_tuples : scan_node_->limit() - scan_node_->rows_returned();
+        num_tuples :
+        scan_node_->limit() - scan_node_->rows_returned_shared();
     int tuples_returned = 0;
     // Call jitted function if possible
     if (write_tuples_fn_ != nullptr) {
diff --git a/be/src/exec/kudu-scan-node-mt.cc b/be/src/exec/kudu-scan-node-mt.cc
index 5dfa5b1..5d444d8 100644
--- a/be/src/exec/kudu-scan-node-mt.cc
+++ b/be/src/exec/kudu-scan-node-mt.cc
@@ -80,18 +80,13 @@ Status KuduScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e
   }
   scanner_->KeepKuduScannerAlive();
 
-  num_rows_returned_ += row_batch->num_rows();
-  if (ReachedLimit()) {
-    int num_rows_over = num_rows_returned_ - limit_;
-    row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
-    num_rows_returned_ -= num_rows_over;
+  if (CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos)) {
     scan_token_ = nullptr;
     runtime_profile_->StopPeriodicCounters();
     scanner_->Close();
     scanner_.reset();
-    *eos = true;
   }
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
 
   return Status::OK();
 }
diff --git a/be/src/exec/kudu-scan-node-mt.h b/be/src/exec/kudu-scan-node-mt.h
index 4812e46..94a238e 100644
--- a/be/src/exec/kudu-scan-node-mt.h
+++ b/be/src/exec/kudu-scan-node-mt.h
@@ -35,9 +35,10 @@ class KuduScanNodeMt : public KuduScanNodeBase {
 
   ~KuduScanNodeMt();
 
-  virtual Status Open(RuntimeState* state);
-  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual void Close(RuntimeState* state);
+  virtual Status Open(RuntimeState* state) override;
+  virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
+  virtual void Close(RuntimeState* state) override;
+  virtual ExecutionModel getExecutionModel() const override { return TASK_BASED; }
 
  private:
   /// Current scan token and corresponding scanner.
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index c6c2052..12b5365 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -95,7 +95,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
 
   // If there are no scan tokens, nothing is ever placed in the materialized
   // row batch, so exit early for this case.
-  if (ReachedLimit() || NumScanTokens() == 0) {
+  if (NumScanTokens() == 0 || ReachedLimitShared()) {
     *eos = true;
     return Status::OK();
   }
@@ -104,18 +104,10 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos
   unique_ptr<RowBatch> materialized_batch = thread_state_.batch_queue()->GetBatch();
   if (materialized_batch != NULL) {
     row_batch->AcquireState(materialized_batch.get());
-    num_rows_returned_ += row_batch->num_rows();
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-
-    if (ReachedLimit()) {
-      int num_rows_over = num_rows_returned_ - limit_;
-      row_batch->set_num_rows(row_batch->num_rows() - num_rows_over);
-      num_rows_returned_ -= num_rows_over;
-      COUNTER_SET(rows_returned_counter_, num_rows_returned_);
-      *eos = true;
-
+    if (CheckLimitAndTruncateRowBatchIfNeededShared(row_batch, eos)) {
       SetDone();
     }
+    COUNTER_SET(rows_returned_counter_, rows_returned_shared());
     materialized_batch.reset();
   } else {
     *eos = true;
diff --git a/be/src/exec/kudu-scan-node.h b/be/src/exec/kudu-scan-node.h
index a04c162..59977a1 100644
--- a/be/src/exec/kudu-scan-node.h
+++ b/be/src/exec/kudu-scan-node.h
@@ -44,6 +44,9 @@ class KuduScanNode : public KuduScanNodeBase {
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
   virtual void Close(RuntimeState* state) override;
+  virtual ExecutionModel getExecutionModel() const override {
+    return NON_TASK_BASED_SYNC;
+  }
 
  private:
   friend class KuduScanner;
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index f2b15e6..f5e0729 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -125,7 +125,7 @@ Status KuduScanner::GetNext(RowBatch* row_batch, bool* eos) {
       if (row_batch->AtCapacity()) break;
     }
 
-    if (scanner_->HasMoreRows() && !scan_node_->ReachedLimit()) {
+    if (scanner_->HasMoreRows() && !scan_node_->ReachedLimitShared()) {
       RETURN_IF_ERROR(GetNextScannerBatch());
       continue;
     }
@@ -333,7 +333,7 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* row_batch, Tuple** tuple_me
     row->SetTuple(0, *tuple_mem);
     row_batch->CommitLastRow();
     // If we've reached the capacity, or the LIMIT for the scan, return.
-    if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break;
+    if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
     // Move to the next tuple in the tuple buffer.
     *tuple_mem = next_tuple(*tuple_mem);
   }
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 5eb5a13..8545348 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -246,11 +246,11 @@ Status NestedLoopJoinNode::GetNext(
 
 end:
   if (ReachedLimit()) {
-    int64_t extra_rows = num_rows_returned_ - limit_;
+    int64_t extra_rows = rows_returned() - limit_;
     DCHECK_GE(extra_rows, 0);
     DCHECK_LE(extra_rows, output_batch->num_rows());
     output_batch->set_num_rows(output_batch->num_rows() - extra_rows);
-    num_rows_returned_ = limit_;
+    SetNumRowsReturned(limit_);
     eos_ = true;
   }
   if (eos_) {
@@ -258,7 +258,7 @@ end:
     probe_batch_->TransferResourceOwnership(output_batch);
     build_batches_->TransferResourceOwnership(output_batch);
   }
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
@@ -321,7 +321,7 @@ Status NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
       output_batch->CopyRow(current_probe_row_, output_row);
       VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
-      ++num_rows_returned_;
+      IncrementNumRowsReturned(1);
       if (ReachedLimit()) {
         eos_ = true;
         return Status::OK();
@@ -435,7 +435,7 @@ Status NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
       ++current_build_row_idx_;
       VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
-      ++num_rows_returned_;
+      IncrementNumRowsReturned(1);
       if (output_batch->AtCapacity()) return Status::OK();
     }
     RETURN_IF_ERROR(NextProbeRow(state, output_batch));
@@ -519,7 +519,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
   if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
     VLOG_ROW << "match row:" << PrintRow(output_row, *row_desc());
     output_batch->CommitLastRow();
-    ++num_rows_returned_;
+    IncrementNumRowsReturned(1);
     if (ReachedLimit()) eos_ = true;
   }
   return Status::OK();
@@ -572,7 +572,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
     if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
       VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
-      ++num_rows_returned_;
+      IncrementNumRowsReturned(1);
       if (output_batch->AtCapacity()) return Status::OK();
     }
   }
@@ -619,7 +619,7 @@ Status NestedLoopJoinNode::FindBuildMatches(
     if (!EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) continue;
     VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
     output_batch->CommitLastRow();
-    ++num_rows_returned_;
+    IncrementNumRowsReturned(1);
     if (output_batch->AtCapacity()) {
       *return_output_batch = true;
       return Status::OK();
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 65cf788..27e5ca4 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -357,7 +357,7 @@ Status HdfsParquetScanner::ProcessSplit() {
     if ((row_batches_produced_ & (BATCHES_PER_FILTER_SELECTIVITY_CHECK - 1)) == 0) {
       CheckFiltersEffectiveness();
     }
-  } while (!eos_ && !scan_node_->ReachedLimit());
+  } while (!eos_ && !scan_node_->ReachedLimitShared());
   return Status::OK();
 }
 
@@ -1255,7 +1255,7 @@ bool HdfsParquetScanner::AssembleCollection(
       conjunct_evals_map_[tuple_desc->id()];
 
   int64_t rows_read = 0;
-  bool continue_execution = !scan_node_->ReachedLimit() && !context_->cancelled();
+  bool continue_execution = !scan_node_->ReachedLimitShared() && !context_->cancelled();
   // Note that this will be set to true at the end of the row group or the end of the
   // current collection (if applicable).
   bool end_of_collection = column_readers[0]->rep_level() == -1;
@@ -1309,7 +1309,7 @@ bool HdfsParquetScanner::AssembleCollection(
 
     rows_read += row_idx;
     coll_value_builder->CommitTuples(num_to_commit);
-    continue_execution &= !scan_node_->ReachedLimit() && !context_->cancelled();
+    continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled();
   }
   coll_items_read_counter_ += rows_read;
   if (end_of_collection) {
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 048831d..ad0ed03 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -105,8 +105,8 @@ Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
       sorter_->Reset();
       *eos = input_eos_;
     }
-    num_rows_returned_ += row_batch->num_rows();
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    IncrementNumRowsReturned(row_batch->num_rows());
+    COUNTER_SET(rows_returned_counter_, rows_returned());
     return Status::OK();
   }
 
@@ -139,8 +139,8 @@ Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
     *eos = input_eos_;
   }
 
-  num_rows_returned_ += row_batch->num_rows();
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  IncrementNumRowsReturned(row_batch->num_rows());
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2bad4ff..1b00f05 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -637,17 +637,17 @@ Status PartitionedHashJoinNode::GetNext(
   int num_rows_added = out_batch->num_rows() - num_rows_before;
   DCHECK_GE(num_rows_added, 0);
 
-  if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) {
+  if (limit_ != -1 && rows_returned() + num_rows_added > limit_) {
     // Truncate the row batch if we went over the limit.
-    num_rows_added = limit_ - num_rows_returned_;
+    num_rows_added = limit_ - rows_returned();
     DCHECK_GE(num_rows_added, 0);
     out_batch->set_num_rows(num_rows_before + num_rows_added);
     probe_batch_->TransferResourceOwnership(out_batch);
     *eos = true;
   }
 
-  num_rows_returned_ += num_rows_added;
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  IncrementNumRowsReturned(num_rows_added);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/select-node-ir.cc b/be/src/exec/select-node-ir.cc
index 35cea13..5cd5e48 100644
--- a/be/src/exec/select-node-ir.cc
+++ b/be/src/exec/select-node-ir.cc
@@ -34,7 +34,7 @@ void SelectNode::CopyRows(RowBatch* output_batch) {
     if (EvalConjuncts(conjunct_evals, num_conjuncts, src_row)) {
       output_batch->CopyRow(src_row, dst_row);
       output_batch->CommitLastRow();
-      ++num_rows_returned_;
+      IncrementNumRowsReturned(1);
       if (ReachedLimit() || output_batch->AtCapacity()) return;
     }
   }
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index f2235c4..a8708f3 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -103,7 +103,7 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
     } else {
       CopyRows(row_batch);
     }
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    COUNTER_SET(rows_returned_counter_, rows_returned());
     *eos = ReachedLimit()
         || (child_row_idx_ == child_row_batch_->num_rows() && child_eos_);
     if (*eos || child_row_idx_ == child_row_batch_->num_rows()) {
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index d5949af..b0b5b04 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -135,13 +135,9 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   }
 
   returned_buffer_ = row_batch->num_buffers() > 0;
-  num_rows_returned_ += row_batch->num_rows();
-  if (ReachedLimit()) {
-    row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));
-    *eos = true;
-  }
+  CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos);
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
 
   return Status::OK();
 }
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index 90c3d85..1a00070 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -83,8 +83,8 @@ Status StreamingAggregationNode::GetNext(
     *eos = curr_output_agg_idx_ >= aggs_.size();
   }
 
-  num_rows_returned_ += row_batch->num_rows();
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  IncrementNumRowsReturned(row_batch->num_rows());
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 4124a71..d19d77c 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -100,14 +100,14 @@ Status SubplanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
         DCHECK(!row_batch->AtCapacity());
         RETURN_IF_ERROR(child(1)->GetNext(state, row_batch, &subplan_eos_));
         // Apply limit and check whether the output batch is at capacity.
-        if (limit_ != -1 && num_rows_returned_ + row_batch->num_rows() >= limit_) {
-          row_batch->set_num_rows(limit_ - num_rows_returned_);
-          num_rows_returned_ += row_batch->num_rows();
+        if (limit_ != -1 && rows_returned() + row_batch->num_rows() >= limit_) {
+          row_batch->set_num_rows(limit_ - rows_returned());
+          IncrementNumRowsReturned(row_batch->num_rows());
           *eos = true;
           break;
         }
         if (row_batch->AtCapacity()) {
-          num_rows_returned_ += row_batch->num_rows();
+          IncrementNumRowsReturned(row_batch->num_rows());
           return Status::OK();
         }
         // Check subplan_eos_ and repeat fetching until the output batch is at capacity
@@ -140,7 +140,7 @@ Status SubplanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
     subplan_eos_ = false;
   }
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
@@ -149,7 +149,7 @@ Status SubplanNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   input_eos_ = false;
   input_row_idx_ = 0;
   subplan_eos_ = false;
-  num_rows_returned_ = 0;
+  SetNumRowsReturned(0);
   RETURN_IF_ERROR(child(0)->Reset(state, row_batch));
   // If child(1) is not open it means that we have just Reset() it and returned from
   // GetNext() without opening it again. It is not safe to call Reset() on the same
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 66c647e..6b32b94 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -202,8 +202,8 @@ Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     row_batch->CopyRow(src_row, dst_row);
     ++get_next_iter_;
     row_batch->CommitLastRow();
-    ++num_rows_returned_;
-    COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+    IncrementNumRowsReturned(1);
+    COUNTER_SET(rows_returned_counter_, rows_returned());
   }
   *eos = get_next_iter_ == sorted_top_n_.end();
 
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 3a39626..cd331f2 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -302,18 +302,18 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
 
   int num_rows_added = row_batch->num_rows() - num_rows_before;
   DCHECK_GE(num_rows_added, 0);
-  if (limit_ != -1 && num_rows_returned_ + num_rows_added > limit_) {
+  if (limit_ != -1 && rows_returned() + num_rows_added > limit_) {
     // Truncate the row batch if we went over the limit.
-    num_rows_added = limit_ - num_rows_returned_;
+    num_rows_added = limit_ - rows_returned();
     row_batch->set_num_rows(num_rows_before + num_rows_added);
     DCHECK_GE(num_rows_added, 0);
   }
-  num_rows_returned_ += num_rows_added;
+  IncrementNumRowsReturned(num_rows_added);
 
   *eos = ReachedLimit() ||
       (!HasMorePassthrough() && !HasMoreMaterialized() && !HasMoreConst(state));
 
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }
 
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 336c7ff..1dde338 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -165,17 +165,13 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
       if (row_batch->AtCapacity()) break;
     }
   }
-  num_rows_returned_ += row_batch->num_rows();
 
   // Checking the limit here is simpler/cheaper than doing it in the loop above.
-  if (ReachedLimit()) {
-    *eos = true;
-    row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_));
-    num_rows_returned_ = limit_;
-  } else if (item_idx_ == coll_value_->num_tuples) {
+  const bool reached_limit = CheckLimitAndTruncateRowBatchIfNeeded(row_batch, eos);
+  if (!reached_limit && item_idx_ == coll_value_->num_tuples) {
     *eos = true;
   }
-  COUNTER_SET(rows_returned_counter_, num_rows_returned_);
+  COUNTER_SET(rows_returned_counter_, rows_returned());
   return Status::OK();
 }