You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2018/11/19 10:45:32 UTC

[25/33] impala git commit: IMPALA-3652: Fix resource transfer in subplans with limits

IMPALA-3652: Fix resource transfer in subplans with limits

Impala assumes that when Reset() is called on an ExecNode, all of the
memory returned from that node by GetNext() has been attached to the
output RowBatch. In a query with a LIMIT on the subplan, such that
some nodes don't reach 'eos', this may not be the case.

The solution is to have Reset() take a RowBatch that any such memory
can be attached to. I examined all ExecNodes for resources being
transferred on 'eos' and added transferring of those resources in
Resst().

Testing:
- Added e2e tests that repro the issue for hash and nested loop joins.

Change-Id: I3968a379fcbb5d30fcec304995d3e44933dbbc77
Reviewed-on: http://gerrit.cloudera.org:8080/11852
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ccabc491
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ccabc491
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ccabc491

Branch: refs/heads/branch-3.1.0
Commit: ccabc491e20e566144f0b07739a65092c48a4953
Parents: ffdb8a1
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Wed Oct 24 22:16:21 2018 +0000
Committer: Zoltan Borok-Nagy <bo...@cloudera.com>
Committed: Tue Nov 13 12:51:40 2018 +0100

----------------------------------------------------------------------
 be/src/exec/aggregation-node-base.cc            |  6 ++--
 be/src/exec/aggregation-node-base.h             |  2 +-
 be/src/exec/aggregator.h                        |  2 +-
 be/src/exec/analytic-eval-node.cc               | 16 +++++------
 be/src/exec/analytic-eval-node.h                |  2 +-
 be/src/exec/blocking-join-node.cc               |  5 ++++
 be/src/exec/blocking-join-node.h                |  3 ++
 be/src/exec/cardinality-check-node.cc           |  4 +--
 be/src/exec/cardinality-check-node.h            |  2 +-
 be/src/exec/data-source-scan-node.cc            |  2 +-
 be/src/exec/data-source-scan-node.h             |  2 +-
 be/src/exec/exchange-node.cc                    |  2 +-
 be/src/exec/exchange-node.h                     |  2 +-
 be/src/exec/exec-node.cc                        |  4 +--
 be/src/exec/exec-node.h                         |  5 ++--
 be/src/exec/grouping-aggregator.cc              |  7 ++++-
 be/src/exec/grouping-aggregator.h               |  2 +-
 be/src/exec/hbase-scan-node.cc                  |  2 +-
 be/src/exec/hbase-scan-node.h                   |  2 +-
 be/src/exec/hdfs-scan-node-base.cc              |  2 +-
 be/src/exec/hdfs-scan-node-base.h               |  3 +-
 be/src/exec/nested-loop-join-node.cc            |  4 +--
 be/src/exec/nested-loop-join-node.h             |  2 +-
 be/src/exec/non-grouping-aggregator.h           |  4 ++-
 be/src/exec/partial-sort-node.cc                |  4 +--
 be/src/exec/partial-sort-node.h                 |  2 +-
 be/src/exec/partitioned-hash-join-builder.cc    | 12 ++++----
 be/src/exec/partitioned-hash-join-builder.h     |  7 +++--
 be/src/exec/partitioned-hash-join-node.cc       | 28 +++++++++++-------
 be/src/exec/partitioned-hash-join-node.h        |  7 +++--
 be/src/exec/select-node.cc                      |  6 ++--
 be/src/exec/select-node.h                       |  2 +-
 be/src/exec/sort-node.cc                        |  4 +--
 be/src/exec/sort-node.h                         |  2 +-
 be/src/exec/streaming-aggregation-node.cc       |  2 +-
 be/src/exec/streaming-aggregation-node.h        |  2 +-
 be/src/exec/subplan-node.cc                     | 13 +++++----
 be/src/exec/subplan-node.h                      |  2 +-
 be/src/exec/topn-node.cc                        |  6 ++--
 be/src/exec/topn-node.h                         |  2 +-
 be/src/exec/union-node.cc                       |  4 +--
 be/src/exec/union-node.h                        |  2 +-
 be/src/exec/unnest-node.cc                      |  4 +--
 be/src/exec/unnest-node.h                       |  2 +-
 .../QueryTest/nested-types-tpch-limit.test      | 30 ++++++++++++++++++++
 tests/query_test/test_nested_types.py           |  5 ++++
 46 files changed, 149 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/aggregation-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-base.cc b/be/src/exec/aggregation-node-base.cc
index 5de54be..cd6cfe3 100644
--- a/be/src/exec/aggregation-node-base.cc
+++ b/be/src/exec/aggregation-node-base.cc
@@ -73,10 +73,10 @@ void AggregationNodeBase::Codegen(RuntimeState* state) {
   for (auto& agg : aggs_) agg->Codegen(state);
 }
 
-Status AggregationNodeBase::Reset(RuntimeState* state) {
-  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Reset(state));
+Status AggregationNodeBase::Reset(RuntimeState* state, RowBatch* row_batch) {
+  for (auto& agg : aggs_) RETURN_IF_ERROR(agg->Reset(state, row_batch));
   curr_output_agg_idx_ = 0;
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 Status AggregationNodeBase::SplitMiniBatches(

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/aggregation-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node-base.h b/be/src/exec/aggregation-node-base.h
index 7dfab34..89508e0 100644
--- a/be/src/exec/aggregation-node-base.h
+++ b/be/src/exec/aggregation-node-base.h
@@ -35,7 +35,7 @@ class AggregationNodeBase : public ExecNode {
   virtual Status Init(const TPlanNode& tnode, RuntimeState* state) override;
   virtual Status Prepare(RuntimeState* state) override;
   virtual void Codegen(RuntimeState* state) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
 
  protected:
   /// If true, the input to this node should be passed into each Aggregator in 'aggs_'.

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregator.h b/be/src/exec/aggregator.h
index bd2f10e..7cb47c3 100644
--- a/be/src/exec/aggregator.h
+++ b/be/src/exec/aggregator.h
@@ -74,7 +74,7 @@ class Aggregator {
   virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT;
   virtual Status GetNext(
       RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0;
-  virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT = 0;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT = 0;
   virtual void Close(RuntimeState* state);
 
   /// Adds all of the rows in 'batch' to the aggregation.

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index c7bcbda..0a4d601 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -810,7 +810,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   return Status::OK();
 }
 
-Status AnalyticEvalNode::Reset(RuntimeState* state) {
+Status AnalyticEvalNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   result_tuples_.clear();
   window_tuples_.clear();
   last_result_idx_ = -1;
@@ -818,11 +818,9 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   prev_pool_last_result_idx_ = -1;
   prev_pool_last_window_idx_ = -1;
   input_eos_ = false;
-  // TODO: The Reset() contract allows calling Reset() even if eos has not been reached,
-  // but the analytic eval node currently does not support that. In practice, we only
-  // call Reset() after eos.
-  DCHECK_EQ(curr_tuple_pool_->total_allocated_bytes(), 0);
-  DCHECK_EQ(prev_tuple_pool_->total_allocated_bytes(), 0);
+  // Transfer the ownership of all row-backing resources.
+  row_batch->tuple_data_pool()->AcquireData(prev_tuple_pool_.get(), false);
+  row_batch->tuple_data_pool()->AcquireData(curr_tuple_pool_.get(), false);
   // Call Finalize() to clear evaluator allocations, but do not Close() them,
   // so we can keep evaluating them.
   if (curr_tuple_init_) {
@@ -831,12 +829,14 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   }
   // The following members will be re-created in Open().
   // input_stream_ should have been closed by last GetNext() call.
-  DCHECK(input_stream_ == nullptr || input_stream_->is_closed());
+  if (input_stream_ != nullptr && !input_stream_->is_closed()) {
+    input_stream_->Close(row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  }
   input_stream_.reset();
   prev_input_tuple_ = nullptr;
   prev_input_tuple_pool_->Clear();
   curr_child_batch_->Reset();
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void AnalyticEvalNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 0c185ae..3d295aa 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -70,7 +70,7 @@ class AnalyticEvalNode : public ExecNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  protected:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 88ca007..e9281c6 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -137,6 +137,11 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
+Status BlockingJoinNode::Reset(RuntimeState* state, RowBatch* row_batch) {
+  probe_batch_->TransferResourceOwnership(row_batch);
+  return ExecNode::Reset(state, row_batch);
+}
+
 void BlockingJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   build_batch_.reset();

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/blocking-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.h b/be/src/exec/blocking-join-node.h
index 8198ad0..2f1c161 100644
--- a/be/src/exec/blocking-join-node.h
+++ b/be/src/exec/blocking-join-node.h
@@ -57,6 +57,9 @@ class BlockingJoinNode : public ExecNode {
   /// Calls ExecNode::Open() and initializes 'eos_' and 'probe_side_eos_'.
   virtual Status Open(RuntimeState* state);
 
+  /// Transfers resources from 'probe_batch_' to 'row_batch'.
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
+
   /// Subclasses should close any other structures and then call
   /// BlockingJoinNode::Close().
   virtual void Close(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/cardinality-check-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/cardinality-check-node.cc b/be/src/exec/cardinality-check-node.cc
index 76579dd..016b0ef 100644
--- a/be/src/exec/cardinality-check-node.cc
+++ b/be/src/exec/cardinality-check-node.cc
@@ -95,9 +95,9 @@ Status CardinalityCheckNode::GetNext(RuntimeState* state, RowBatch* output_row_b
   return Status::OK();
 }
 
-Status CardinalityCheckNode::Reset(RuntimeState* state) {
+Status CardinalityCheckNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   row_batch_->Reset();
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void CardinalityCheckNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/cardinality-check-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/cardinality-check-node.h b/be/src/exec/cardinality-check-node.h
index c71bd2b..d44efd1 100644
--- a/be/src/exec/cardinality-check-node.h
+++ b/be/src/exec/cardinality-check-node.h
@@ -40,7 +40,7 @@ class CardinalityCheckNode : public ExecNode {
   virtual Status Prepare(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   virtual void Close(RuntimeState* state) override;
  private:
   /////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/data-source-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc
index 10f0c7d..1f2a003 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -367,7 +367,7 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo
   }
 }
 
-Status DataSourceScanNode::Reset(RuntimeState* state) {
+Status DataSourceScanNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "NYI";
   return Status("NYI");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/data-source-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-source-scan-node.h b/be/src/exec/data-source-scan-node.h
index 065fcf3..e8450fe 100644
--- a/be/src/exec/data-source-scan-node.h
+++ b/be/src/exec/data-source-scan-node.h
@@ -55,7 +55,7 @@ class DataSourceScanNode : public ScanNode {
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
 
   /// NYI
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
 
   /// Close the scanner, and report errors.
   virtual void Close(RuntimeState* state) override;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index ca8b973..affd93c 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -127,7 +127,7 @@ Status ExchangeNode::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status ExchangeNode::Reset(RuntimeState* state) {
+Status ExchangeNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "NYI";
   return Status("NYI");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index bdc52c6..e9f1d00 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -52,7 +52,7 @@ class ExchangeNode : public ExecNode {
   /// Blocks until the first batch is available for consumption via GetNext().
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
   /// the number of senders needs to be set after the c'tor, because it's not

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 3a606f5..d8495ac 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -145,10 +145,10 @@ Status ExecNode::Open(RuntimeState* state) {
   return ScalarExprEvaluator::Open(conjunct_evals_, state);
 }
 
-Status ExecNode::Reset(RuntimeState* state) {
+Status ExecNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   num_rows_returned_ = 0;
   for (int i = 0; i < children_.size(); ++i) {
-    RETURN_IF_ERROR(children_[i]->Reset(state));
+    RETURN_IF_ERROR(children_[i]->Reset(state, row_batch));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index a62ed6c..1bbb095 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -111,7 +111,8 @@ class ExecNode {
   /// Resets the stream of row batches to be retrieved by subsequent GetNext() calls.
   /// Clears all internal state, returning this node to the state it was in after calling
   /// Prepare() and before calling Open(). This function must not clear memory
-  /// still owned by this node that is backing rows returned in GetNext().
+  /// still owned by this node that is backing rows returned in GetNext(). 'row_batch' can
+  /// be used to transfer ownership of any such memory.
   /// Prepare() and Open() must have already been called before calling Reset().
   /// GetNext() may have optionally been called (not necessarily until eos).
   /// Close() must not have been called.
@@ -121,7 +122,7 @@ class ExecNode {
   /// implementation calls Reset() on children.
   /// Note that this function may be called many times (proportional to the input data),
   /// so should be fast.
-  virtual Status Reset(RuntimeState* state) WARN_UNUSED_RESULT;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
 
   /// Close() will get called for every exec node, regardless of what else is called and
   /// the status of these calls (i.e. Prepare() may never have been called, or

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 9b38a0c..382a6ae 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -364,12 +364,17 @@ void GroupingAggregator::CleanupHashTbl(
   }
 }
 
-Status GroupingAggregator::Reset(RuntimeState* state) {
+Status GroupingAggregator::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(!is_streaming_preagg_) << "Cannot reset preaggregation";
   partition_eos_ = false;
   streaming_idx_ = 0;
   // Reset the HT and the partitions for this grouping agg.
   ht_ctx_->set_level(0);
+  if (output_partition_ != nullptr) {
+    // Attach all buffers referenced by previously-returned rows.
+    output_partition_->aggregated_row_stream->Close(
+        row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  }
   ClosePartitions();
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 6f630d4..405d4ab 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -124,7 +124,7 @@ class GroupingAggregator : public Aggregator {
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   virtual void Close(RuntimeState* state) override;
 
   virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/hbase-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.cc b/be/src/exec/hbase-scan-node.cc
index 148be66..f379823 100644
--- a/be/src/exec/hbase-scan-node.cc
+++ b/be/src/exec/hbase-scan-node.cc
@@ -266,7 +266,7 @@ Status HBaseScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eo
   return Status::OK();
 }
 
-Status HBaseScanNode::Reset(RuntimeState* state) {
+Status HBaseScanNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "NYI";
   return Status("NYI");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/hbase-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-scan-node.h b/be/src/exec/hbase-scan-node.h
index dbe7ff1..bd9502e 100644
--- a/be/src/exec/hbase-scan-node.h
+++ b/be/src/exec/hbase-scan-node.h
@@ -51,7 +51,7 @@ class HBaseScanNode : public ScanNode {
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
 
   /// NYI
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
 
   /// Close the hbase_scanner_, and report errors.
   virtual void Close(RuntimeState* state) override;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 4a39124..a78bdb2 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -403,7 +403,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   return Status::OK();
 }
 
-Status HdfsScanNodeBase::Reset(RuntimeState* state) {
+Status HdfsScanNodeBase::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "Internal error: Scan nodes should not appear in subplans.";
   return Status("Internal error: Scan nodes should not appear in subplans.");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h
index 0a5a328..4f94b11 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -152,7 +152,8 @@ class HdfsScanNodeBase : public ScanNode {
   virtual Status Prepare(RuntimeState* state) override WARN_UNUSED_RESULT;
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override WARN_UNUSED_RESULT;
-  virtual Status Reset(RuntimeState* state) override WARN_UNUSED_RESULT;
+  virtual Status Reset(
+      RuntimeState* state, RowBatch* row_batch) override WARN_UNUSED_RESULT;
   virtual void Close(RuntimeState* state) override;
 
   /// Returns true if this node uses separate threads for scanners that append RowBatches

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index e0a375b..1e1face 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -124,14 +124,14 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   return Status::OK();
 }
 
-Status NestedLoopJoinNode::Reset(RuntimeState* state) {
+Status NestedLoopJoinNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   builder_->Reset();
   build_batches_ = NULL;
   matched_probe_ = false;
   current_probe_row_ = NULL;
   probe_batch_pos_ = 0;
   process_unmatched_build_rows_ = false;
-  return BlockingJoinNode::Reset(state);
+  return BlockingJoinNode::Reset(state, row_batch);
 }
 
 void NestedLoopJoinNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/nested-loop-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.h b/be/src/exec/nested-loop-join-node.h
index c94abbf..6fc09dc 100644
--- a/be/src/exec/nested-loop-join-node.h
+++ b/be/src/exec/nested-loop-join-node.h
@@ -48,7 +48,7 @@ class NestedLoopJoinNode : public BlockingJoinNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/non-grouping-aggregator.h
----------------------------------------------------------------------
diff --git a/be/src/exec/non-grouping-aggregator.h b/be/src/exec/non-grouping-aggregator.h
index 49a663d..3f18a71 100644
--- a/be/src/exec/non-grouping-aggregator.h
+++ b/be/src/exec/non-grouping-aggregator.h
@@ -48,7 +48,9 @@ class NonGroupingAggregator : public Aggregator {
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override { return Status::OK(); }
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override {
+    return Status::OK();
+  }
   virtual void Close(RuntimeState* state) override;
 
   virtual Status AddBatch(RuntimeState* state, RowBatch* batch) override;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 7e3c4d7..81445de 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -141,9 +141,9 @@ Status PartialSortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   return Status::OK();
 }
 
-Status PartialSortNode::Reset(RuntimeState* state) {
+Status PartialSortNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "PartialSortNode cannot be part of a subplan.";
-  return ExecNode::Reset(state);
+  return Status("Cannot reset partial sort");
 }
 
 void PartialSortNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partial-sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.h b/be/src/exec/partial-sort-node.h
index 421df31..156d574 100644
--- a/be/src/exec/partial-sort-node.h
+++ b/be/src/exec/partial-sort-node.h
@@ -50,7 +50,7 @@ class PartialSortNode : public ExecNode {
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  protected:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 7754c99..2232108 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -236,7 +236,7 @@ Status PhjBuilder::FlushFinal(RuntimeState* state) {
 
 void PhjBuilder::Close(RuntimeState* state) {
   if (closed_) return;
-  CloseAndDeletePartitions();
+  CloseAndDeletePartitions(nullptr);
   if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
   for (const FilterContext& ctx : filter_ctxs_) {
@@ -249,10 +249,10 @@ void PhjBuilder::Close(RuntimeState* state) {
   closed_ = true;
 }
 
-void PhjBuilder::Reset() {
+void PhjBuilder::Reset(RowBatch* row_batch) {
   expr_results_pool_->Clear();
   non_empty_build_ = false;
-  CloseAndDeletePartitions();
+  CloseAndDeletePartitions(row_batch);
 }
 
 Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
@@ -441,14 +441,14 @@ vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
   return std::move(spilled_partition_probe_streams_);
 }
 
-void PhjBuilder::CloseAndDeletePartitions() {
+void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) {
   // Close all the partitions and clean up all references to them.
-  for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(NULL);
+  for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(row_batch);
   all_partitions_.clear();
   hash_partitions_.clear();
   null_aware_partition_ = NULL;
   for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
-    stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+    stream->Close(row_batch, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   spilled_partition_probe_streams_.clear();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index f6105fc..147504b 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -98,7 +98,7 @@ class PhjBuilder : public DataSink {
   /////////////////////////////////////////
 
   /// Reset the builder to the same state as it was in after calling Open().
-  void Reset();
+  void Reset(RowBatch* row_batch);
 
   /// Transfer ownership of the probe streams to the caller. One stream was allocated per
   /// spilled partition in FlushFinal(). The probe streams are empty but prepared for
@@ -333,8 +333,9 @@ class PhjBuilder : public DataSink {
   Status InitSpilledPartitionProbeStreams() WARN_UNUSED_RESULT;
 
   /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
-  /// may reference them. Also cleans up 'spilled_partition_probe_streams_'.
-  void CloseAndDeletePartitions();
+  /// may reference them. Also cleans up 'spilled_partition_probe_streams_'. If
+  /// 'row_batch' if not NULL, transfers the ownership of all row-backing resources to it.
+  void CloseAndDeletePartitions(RowBatch* row_batch);
 
   /// For each filter in filters_, allocate a bloom_filter from the fragment-local
   /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 2b36990..2ef5c08 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -197,7 +197,7 @@ Status PartitionedHashJoinNode::AcquireResourcesForBuild(RuntimeState* state) {
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
+Status PartitionedHashJoinNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_probe_output_idx_ = -1;
     matched_null_probe_.clear();
@@ -205,35 +205,41 @@ Status PartitionedHashJoinNode::Reset(RuntimeState* state) {
   }
   state_ = PARTITIONING_BUILD;
   ht_ctx_->set_level(0);
-  CloseAndDeletePartitions();
-  builder_->Reset();
+  CloseAndDeletePartitions(row_batch);
+  builder_->Reset(row_batch);
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
+  if (output_unmatched_batch_ != nullptr) {
+    output_unmatched_batch_->TransferResourceOwnership(row_batch);
+  }
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
-  return ExecNode::Reset(state);
+  return BlockingJoinNode::Reset(state, row_batch);
 }
 
-void PartitionedHashJoinNode::CloseAndDeletePartitions() {
+void PartitionedHashJoinNode::CloseAndDeletePartitions(RowBatch* row_batch) {
   // Close all the partitions and clean up all references to them.
   for (unique_ptr<ProbePartition>& partition : probe_hash_partitions_) {
-    if (partition != NULL) partition->Close(NULL);
+    if (partition != NULL) partition->Close(row_batch);
   }
   probe_hash_partitions_.clear();
   for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
-    partition->Close(NULL);
+    partition->Close(row_batch);
   }
   spilled_partitions_.clear();
   if (input_partition_ != NULL) {
-    input_partition_->Close(NULL);
+    input_partition_->Close(row_batch);
     input_partition_.reset();
   }
   if (null_aware_probe_partition_ != NULL) {
-    null_aware_probe_partition_->Close(NULL);
+    null_aware_probe_partition_->Close(row_batch);
     null_aware_probe_partition_.reset();
   }
+  for (PhjBuilder::Partition* partition : output_build_partitions_) {
+    partition->Close(row_batch);
+  }
   output_build_partitions_.clear();
   if (null_probe_rows_ != NULL) {
-    null_probe_rows_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+    null_probe_rows_->Close(row_batch, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     null_probe_rows_.reset();
   }
 }
@@ -245,7 +251,7 @@ void PartitionedHashJoinNode::Close(RuntimeState* state) {
   output_null_aware_probe_rows_running_ = false;
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
-  CloseAndDeletePartitions();
+  CloseAndDeletePartitions(nullptr);
   if (builder_ != nullptr) builder_->Close(state);
   ScalarExprEvaluator::Close(other_join_conjunct_evals_, state);
   ScalarExpr::Close(build_exprs_);

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/partitioned-hash-join-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index a0c03ef..8ad4d4a 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -107,7 +107,7 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   virtual void Close(RuntimeState* state) override;
 
  protected:
@@ -383,8 +383,9 @@ class PartitionedHashJoinNode : public BlockingJoinNode {
   Status NullAwareAntiJoinError(BufferedTupleStream* rows);
 
   /// Calls Close() on every probe partition, destroys the partitions and cleans up any
-  /// references to the partitions. Also closes and destroys 'null_probe_rows_'.
-  void CloseAndDeletePartitions();
+  /// references to the partitions. Also closes and destroys 'null_probe_rows_'. If
+  /// 'row_batch' is not NULL, transfers ownership of all row-backing resources to it.
+  void CloseAndDeletePartitions(RowBatch* row_batch);
 
   /// Prepares for probing the next batch.
   void ResetForProbe();

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/select-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc
index 21cfa4b..d5cb7f0 100644
--- a/be/src/exec/select-node.cc
+++ b/be/src/exec/select-node.cc
@@ -111,11 +111,11 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
   return Status::OK();
 }
 
-Status SelectNode::Reset(RuntimeState* state) {
-  child_row_batch_->Reset();
+Status SelectNode::Reset(RuntimeState* state, RowBatch* row_batch) {
+  child_row_batch_->TransferResourceOwnership(row_batch);
   child_row_idx_ = 0;
   child_eos_ = false;
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void SelectNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/select-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/select-node.h b/be/src/exec/select-node.h
index ce85cfb..c4d8098 100644
--- a/be/src/exec/select-node.h
+++ b/be/src/exec/select-node.h
@@ -40,7 +40,7 @@ class SelectNode : public ExecNode {
   virtual void Codegen(RuntimeState* state) override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   virtual void Close(RuntimeState* state) override;
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index f3e3ffc..3a70c29 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -142,10 +142,10 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   return Status::OK();
 }
 
-Status SortNode::Reset(RuntimeState* state) {
+Status SortNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   num_rows_skipped_ = 0;
   if (sorter_.get() != NULL) sorter_->Reset();
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void SortNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/sort-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h
index 8075b8e..ce7b188 100644
--- a/be/src/exec/sort-node.h
+++ b/be/src/exec/sort-node.h
@@ -42,7 +42,7 @@ class SortNode : public ExecNode {
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  protected:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/streaming-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.cc b/be/src/exec/streaming-aggregation-node.cc
index 42f3e9f..d2608ad 100644
--- a/be/src/exec/streaming-aggregation-node.cc
+++ b/be/src/exec/streaming-aggregation-node.cc
@@ -180,7 +180,7 @@ Status StreamingAggregationNode::GetRowsStreaming(
   return Status::OK();
 }
 
-Status StreamingAggregationNode::Reset(RuntimeState* state) {
+Status StreamingAggregationNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   DCHECK(false) << "Cannot reset preaggregation";
   return Status("Cannot reset preaggregation");
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/streaming-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/streaming-aggregation-node.h b/be/src/exec/streaming-aggregation-node.h
index 71193b9..4b1ca74 100644
--- a/be/src/exec/streaming-aggregation-node.h
+++ b/be/src/exec/streaming-aggregation-node.h
@@ -50,7 +50,7 @@ class StreamingAggregationNode : public AggregationNodeBase {
 
   virtual Status Open(RuntimeState* state) override;
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) override;
-  virtual Status Reset(RuntimeState* state) override;
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch) override;
   virtual void Close(RuntimeState* state) override;
 
   virtual void DebugString(int indentation_level, std::stringstream* out) const override;

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/subplan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/subplan-node.cc b/be/src/exec/subplan-node.cc
index 98ef13a..fdd9660 100644
--- a/be/src/exec/subplan-node.cc
+++ b/be/src/exec/subplan-node.cc
@@ -87,9 +87,9 @@ Status SubplanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
   while (true) {
     if (subplan_is_open_) {
       if (subplan_eos_) {
-        // Reset the subplan before opening it again. At this point, all resources from
-        // the subplan are assumed to have been transferred to the output row_batch.
-        RETURN_IF_ERROR(child(1)->Reset(state));
+        // Reset the subplan before opening it again. 'row_batch' is passed in to allow
+        // any remaining resources to be transferred to it.
+        RETURN_IF_ERROR(child(1)->Reset(state, row_batch));
         subplan_is_open_ = false;
       } else {
         // Continue fetching rows from the open subplan into the output row_batch.
@@ -140,16 +140,17 @@ Status SubplanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
   return Status::OK();
 }
 
-Status SubplanNode::Reset(RuntimeState* state) {
+Status SubplanNode::Reset(RuntimeState* state, RowBatch* row_batch) {
+  input_batch_->TransferResourceOwnership(row_batch);
   input_eos_ = false;
   input_row_idx_ = 0;
   subplan_eos_ = false;
   num_rows_returned_ = 0;
-  RETURN_IF_ERROR(child(0)->Reset(state));
+  RETURN_IF_ERROR(child(0)->Reset(state, row_batch));
   // If child(1) is not open it means that we have just Reset() it and returned from
   // GetNext() without opening it again. It is not safe to call Reset() on the same
   // exec node twice in a row.
-  if (subplan_is_open_) RETURN_IF_ERROR(child(1)->Reset(state));
+  if (subplan_is_open_) RETURN_IF_ERROR(child(1)->Reset(state, row_batch));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/subplan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/subplan-node.h b/be/src/exec/subplan-node.h
index bf13ca1..ecf7c71 100644
--- a/be/src/exec/subplan-node.h
+++ b/be/src/exec/subplan-node.h
@@ -54,7 +54,7 @@ class SubplanNode : public ExecNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index c616a89..961416f 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -212,12 +212,14 @@ Status TopNNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   return Status::OK();
 }
 
-Status TopNNode::Reset(RuntimeState* state) {
+Status TopNNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   priority_queue_.clear();
   num_rows_skipped_ = 0;
+  // Transfer ownership of tuple data to output batch.
+  row_batch->tuple_data_pool()->AcquireData(tuple_pool_.get(), false);
   // We deliberately do not free the tuple_pool_ here to allow selective transferring
   // of resources in the future.
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void TopNNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/topn-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.h b/be/src/exec/topn-node.h
index e12cfc3..a80d766 100644
--- a/be/src/exec/topn-node.h
+++ b/be/src/exec/topn-node.h
@@ -47,7 +47,7 @@ class TopNNode : public ExecNode {
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  protected:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index 73e7a29..1caa2a6 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -313,7 +313,7 @@ Status UnionNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) {
   return Status::OK();
 }
 
-Status UnionNode::Reset(RuntimeState* state) {
+Status UnionNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   child_idx_ = 0;
   child_batch_.reset();
   child_row_idx_ = 0;
@@ -322,7 +322,7 @@ Status UnionNode::Reset(RuntimeState* state) {
   // Since passthrough is disabled in subplans, verify that there is no passthrough child
   // that needs to be closed.
   DCHECK_EQ(to_close_child_idx_, -1);
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void UnionNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/union-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.h b/be/src/exec/union-node.h
index 9c83276..b459392 100644
--- a/be/src/exec/union-node.h
+++ b/be/src/exec/union-node.h
@@ -52,7 +52,7 @@ class UnionNode : public ExecNode {
   virtual void Codegen(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
  private:

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/unnest-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index ef09731..52dd94d 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -175,9 +175,9 @@ Status UnnestNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos)
   return Status::OK();
 }
 
-Status UnnestNode::Reset(RuntimeState* state) {
+Status UnnestNode::Reset(RuntimeState* state, RowBatch* row_batch) {
   item_idx_ = 0;
-  return ExecNode::Reset(state);
+  return ExecNode::Reset(state, row_batch);
 }
 
 void UnnestNode::Close(RuntimeState* state) {

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/be/src/exec/unnest-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.h b/be/src/exec/unnest-node.h
index 247dd76..ff4d661 100644
--- a/be/src/exec/unnest-node.h
+++ b/be/src/exec/unnest-node.h
@@ -59,7 +59,7 @@ class UnnestNode : public ExecNode {
   virtual Status Prepare(RuntimeState* state);
   virtual Status Open(RuntimeState* state);
   virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
-  virtual Status Reset(RuntimeState* state);
+  virtual Status Reset(RuntimeState* state, RowBatch* row_batch);
   virtual void Close(RuntimeState* state);
 
   /// Initializes the expression which produces the collection to be unnested.

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch-limit.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch-limit.test b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch-limit.test
new file mode 100644
index 0000000..4ee5f59
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/nested-types-tpch-limit.test
@@ -0,0 +1,30 @@
+====
+---- QUERY
+use tpch_nested_parquet
+====
+---- QUERY
+# IMPALA-3652: test limit on a hash join in a subplan where resources need to be
+# transfered in Reset()
+select count(*)
+from customer c,
+  (select o1.o_orderkey, o2.o_orderdate
+   from c.c_orders o1, c.c_orders o2, c.c_orders o3
+   where o1.o_orderkey = o2.o_orderkey and o1.o_orderkey = o3.o_orderkey
+   limit 1) v
+where c_custkey = 113644;
+---- RESULTS
+1
+====
+---- QUERY
+# IMPALA-3652: test limit on a nested loop join in a subplan where resources need to be
+# transfered in Reset()
+select count(*)
+from customer c,
+  (select o1.o_orderkey, o2.o_orderdate
+   from c.c_orders o1 join /* +broadcast */ c.c_orders o2, c.c_orders o3
+   where o1.o_orderkey < o2.o_orderkey and o1.o_orderkey < o3.o_orderkey
+   limit 1) v
+where c_custkey = 113644;
+---- RESULTS
+1
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/ccabc491/tests/query_test/test_nested_types.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_nested_types.py b/tests/query_test/test_nested_types.py
index 4fccd9d..38a2582 100644
--- a/tests/query_test/test_nested_types.py
+++ b/tests/query_test/test_nested_types.py
@@ -83,6 +83,11 @@ class TestNestedTypes(ImpalaTestSuite):
     """Queries over the larger nested TPCH dataset."""
     self.run_test_case('QueryTest/nested-types-tpch', vector)
 
+  def test_tpch_limit(self, vector):
+    """Queries over the larger nested TPCH dataset with limits in their subplan."""
+    vector.get_value('exec_option')['batch_size'] = 10
+    self.run_test_case('QueryTest/nested-types-tpch-limit', vector)
+
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   def test_tpch_mem_limit(self, vector):
     """Queries over the larger nested TPCH dataset with memory limits tuned for