You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/10/26 05:38:05 UTC

[3/4] incubator-impala git commit: IMPALA-4023: don't attach buffered tuple streams to batches

IMPALA-4023: don't attach buffered tuple streams to batches

This simplifies the memory transfer model by eliminating one category of
resources that can be attached.

This patch also separates the concepts of attaching resources and
flushing resources. Previously RowBatch::AddTupleStream() implicitly
flushed resources from the ExecNode pipeline, which various ExecNodes
relied on to free up memory reservations for subsequent processing. In a
subsequent patch I want the FlushResources() API to become stronger: it
will force streaming ExecNodes to flush their batches or forces
blocking ExecNodes to acquire ownership of the memory resources.
We can't do this right now since we don't have a way to transfer
ownership of BufferedBlockMgr Blocks.

Change-Id: I6471422f86ce71e4c6ab277a276000051bc2e8ff
Reviewed-on: http://gerrit.cloudera.org:8080/4448
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ba026f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ba026f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ba026f27

Branch: refs/heads/master
Commit: ba026f27267204610cf238f24cd219fe297dbe96
Parents: d363b4d
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Sep 13 23:43:32 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Oct 26 04:49:30 2016 +0000

----------------------------------------------------------------------
 be/src/exec/analytic-eval-node.cc            | 32 ++++----
 be/src/exec/analytic-eval-node.h             |  6 +-
 be/src/exec/nested-loop-join-builder.cc      |  2 +-
 be/src/exec/nested-loop-join-node.cc         |  2 +-
 be/src/exec/partitioned-aggregation-node.cc  | 31 ++++----
 be/src/exec/partitioned-hash-join-builder.cc | 12 +--
 be/src/exec/partitioned-hash-join-node.cc    | 15 ++--
 be/src/runtime/buffered-tuple-stream-test.cc | 81 +++++++++++++++++---
 be/src/runtime/buffered-tuple-stream.cc      | 19 +++--
 be/src/runtime/buffered-tuple-stream.h       | 12 ++-
 be/src/runtime/row-batch-test.cc             | 18 +++--
 be/src/runtime/row-batch.cc                  | 53 +++++--------
 be/src/runtime/row-batch.h                   | 92 ++++++++++++++---------
 be/src/runtime/sorted-run-merger.cc          |  5 +-
 be/src/runtime/sorted-run-merger.h           |  6 +-
 be/src/runtime/sorter.cc                     | 15 ++--
 16 files changed, 241 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 06b0467..16dce50 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -105,7 +105,7 @@ AnalyticEvalNode::AnalyticEvalNode(ObjectPool* pool, const TPlanNode& tnode,
 
 AnalyticEvalNode::~AnalyticEvalNode() {
   // Check that we didn't leak any memory.
-  DCHECK(input_stream_ == NULL);
+  DCHECK(input_stream_ == NULL || input_stream_->is_closed());
 }
 
 Status AnalyticEvalNode::Init(const TPlanNode& tnode, RuntimeState* state) {
@@ -191,9 +191,9 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
   RETURN_IF_ERROR(child(0)->Open(state));
   DCHECK(client_ != NULL);
   DCHECK(input_stream_ == NULL);
-  input_stream_ = new BufferedTupleStream(state, child(0)->row_desc(),
-      state->block_mgr(), client_, false /* use_initial_small_buffers */,
-      true /* read_write */);
+  input_stream_.reset(
+      new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr(), client_,
+          false /* use_initial_small_buffers */, true /* read_write */));
   RETURN_IF_ERROR(input_stream_->Init(id(), runtime_profile(), true));
   bool got_write_buffer;
   RETURN_IF_ERROR(input_stream_->PrepareForWrite(&got_write_buffer));
@@ -769,7 +769,7 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
   RETURN_IF_CANCELLED(state);
   RETURN_IF_ERROR(QueryMaintenance(state));
   VLOG_FILE << id() << " GetNext: " << DebugStateString();
-  DCHECK(input_stream_ != NULL); // input_stream_ is NULL if we already hit eos.
+  DCHECK(!input_stream_->is_closed()); // input_stream_ is closed if we already hit eos.
 
   if (ReachedLimit()) {
     // TODO: This transfer is simple and correct, but not necessarily efficient. We
@@ -777,9 +777,10 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
     // over multiple Reset()/Open()/GetNext()* cycles.
     row_batch->tuple_data_pool()->AcquireData(prev_tuple_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(curr_tuple_pool_.get(), false);
-    DCHECK(input_stream_ != NULL);
-    row_batch->AddTupleStream(input_stream_);
-    input_stream_ = NULL;
+    DCHECK(!input_stream_->is_closed());
+    // Flush resources in case we are in a subplan and need to allocate more blocks
+    // when the node is reopened.
+    input_stream_->Close(row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
     *eos = true;
     return Status::OK();
   } else {
@@ -797,8 +798,9 @@ Status AnalyticEvalNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool*
     // over multiple Reset()/Open()/GetNext()* cycles.
     row_batch->tuple_data_pool()->AcquireData(prev_tuple_pool_.get(), false);
     row_batch->tuple_data_pool()->AcquireData(curr_tuple_pool_.get(), false);
-    row_batch->AddTupleStream(input_stream_);
-    input_stream_ = NULL;
+    // Flush resources in case we are in a subplan and need to allocate more blocks
+    // when the node is reopened.
+    input_stream_->Close(row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
     *eos = true;
   } else if (prev_pool_last_result_idx_ != -1 &&
       prev_pool_last_result_idx_ < input_stream_->rows_returned() &&
@@ -840,7 +842,9 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
   }
   mem_pool_->Clear();
   // The following members will be re-created in Open().
-  DCHECK(input_stream_ == NULL); // input_stream_ should have been attached to last batch.
+  // input_stream_ should have been closed by last GetNext() call.
+  DCHECK(input_stream_ == NULL || input_stream_->is_closed());
+  input_stream_.reset();
   curr_tuple_ = NULL;
   child_tuple_cmp_row_ = NULL;
   dummy_result_tuple_ = NULL;
@@ -853,11 +857,9 @@ Status AnalyticEvalNode::Reset(RuntimeState* state) {
 void AnalyticEvalNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (client_ != NULL) state->block_mgr()->ClearReservations(client_);
+  // We may need to clean up input_stream_ if an error occurred at some point.
   if (input_stream_ != NULL) {
-    // We may need to clean up input_stream_ if an error occurred at some point.
-    input_stream_->Close();
-    delete input_stream_;
-    input_stream_ = NULL;
+    input_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
 
   // Close all evaluators and fn ctxs. If an error occurred in Init or Prepare there may

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/analytic-eval-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.h b/be/src/exec/analytic-eval-node.h
index 579ad5b..0203175 100644
--- a/be/src/exec/analytic-eval-node.h
+++ b/be/src/exec/analytic-eval-node.h
@@ -333,10 +333,10 @@ class AnalyticEvalNode : public ExecNode {
   /// no order by clause) to a single row (e.g. ROWS windows). When the amount of
   /// buffered data exceeds the available memory in the underlying BufferedBlockMgr,
   /// input_stream_ is unpinned (i.e., possibly spilled to disk if necessary).
-  /// The input stream owns tuple data backing rows returned in GetNext(), and is
-  /// attached to an output row batch on eos or ReachedLimit().
+  /// The input stream owns tuple data backing rows returned in GetNext(). The blocks
+  /// with tuple data are attached to an output row batch on eos or ReachedLimit().
   /// TODO: Consider re-pinning unpinned streams when possible.
-  BufferedTupleStream* input_stream_;
+  boost::scoped_ptr<BufferedTupleStream> input_stream_;
 
   /// Pool used for O(1) allocations that live until Close() or Reset().
   /// Does not own data backing tuples returned in GetNext(), so it does not

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc b/be/src/exec/nested-loop-join-builder.cc
index 58610e7..0fca3b5 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -47,7 +47,7 @@ Status NljBuilder::Send(RuntimeState* state, RowBatch* batch) {
   build_batch->AcquireState(batch);
 
   AddBuildBatch(build_batch);
-  if (build_batch->need_to_return()) {
+  if (build_batch->needs_deep_copy()) {
     // This batch and earlier batches may refer to resources passed from the child
     // that aren't owned by the row batch itself. Deep copying ensures that the row
     // batches are backed by memory owned by this node that is safe to hold on to.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc b/be/src/exec/nested-loop-join-node.cc
index 6c75797..b640e58 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -161,7 +161,7 @@ Status NestedLoopJoinNode::ConstructSingularBuildSide(RuntimeState* state) {
   RETURN_IF_ERROR(child(1)->GetNext(state, batch, &eos));
   DCHECK_EQ(batch->num_rows(), 1);
   DCHECK(eos);
-  DCHECK(!batch->need_to_return());
+  DCHECK(!batch->needs_deep_copy());
   builder_->AddBuildBatch(batch);
   if (matching_build_rows_ != NULL) {
     DCHECK_EQ(matching_build_rows_->num_bits(), 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc
index 6862bb3..6e587a5 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -382,19 +382,20 @@ Status PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
   if (!needs_finalize_ && !needs_serialize_) return Status::OK();
   // String data returned by Serialize() or Finalize() is from local expr allocations in
   // the agg function contexts, and will be freed on the next GetNext() call by
-  // FreeLocalAllocations(). The data either needs to be copied out or sent up the plan
-  // tree via MarkNeedToReturn(). (See IMPALA-3311)
+  // FreeLocalAllocations(). The data either needs to be copied out now or sent up the
+  // plan and copied out by a blocking ancestor. (See IMPALA-3311)
   for (int i = 0; i < aggregate_evaluators_.size(); ++i) {
     const SlotDescriptor* slot_desc = aggregate_evaluators_[i]->output_slot_desc();
     DCHECK(!slot_desc->type().IsCollectionType()) << "producing collections NYI";
     if (!slot_desc->type().IsVarLenStringType()) continue;
     if (IsInSubplan()) {
       // Copy string data to the row batch's pool. This is more efficient than
-      // MarkNeedToReturn() in a subplan since we are likely producing many small batches.
-      RETURN_IF_ERROR(CopyStringData(slot_desc, row_batch, first_row_idx,
-              row_batch->tuple_data_pool()));
+      // MarkNeedsDeepCopy() in a subplan since we are likely producing many small
+      // batches.
+      RETURN_IF_ERROR(CopyStringData(
+          slot_desc, row_batch, first_row_idx, row_batch->tuple_data_pool()));
     } else {
-      row_batch->MarkNeedToReturn();
+      row_batch->MarkNeedsDeepCopy();
       break;
     }
   }
@@ -524,7 +525,7 @@ Status PartitionedAggregationNode::GetRowsFromPartition(RuntimeState* state,
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   partition_eos_ = ReachedLimit();
-  if (output_iterator_.AtEnd()) row_batch->MarkNeedToReturn();
+  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
 
   return Status::OK();
 }
@@ -718,7 +719,9 @@ void PartitionedAggregationNode::Close(RuntimeState* state) {
   if (agg_fn_pool_.get() != NULL) agg_fn_pool_->FreeAll();
   if (mem_pool_.get() != NULL) mem_pool_->FreeAll();
   if (ht_ctx_.get() != NULL) ht_ctx_->Close();
-  if (serialize_stream_.get() != NULL) serialize_stream_->Close();
+  if (serialize_stream_.get() != NULL) {
+    serialize_stream_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
 
   if (block_mgr_client_ != NULL) {
     state->block_mgr()->ClearReservations(block_mgr_client_);
@@ -840,14 +843,14 @@ Status PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
       parent->CleanupHashTbl(agg_fn_ctxs, it);
       hash_tbl->Close();
       hash_tbl.reset();
-      aggregated_row_stream->Close();
+      aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
       RETURN_IF_ERROR(status);
       return parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_,
           parent->id());
     }
     DCHECK(status.ok());
 
-    aggregated_row_stream->Close();
+    aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     aggregated_row_stream.swap(parent->serialize_stream_);
     // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
     // when we need to spill again. We need to have this available before we need
@@ -943,10 +946,12 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) {
       // should have been finalized/serialized in Spill().
       parent->CleanupHashTbl(agg_fn_ctxs, hash_tbl->Begin(parent->ht_ctx_.get()));
     }
-    aggregated_row_stream->Close();
+    aggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   if (hash_tbl.get() != NULL) hash_tbl->Close();
-  if (unaggregated_row_stream.get() != NULL) unaggregated_row_stream->Close();
+  if (unaggregated_row_stream.get() != NULL) {
+    unaggregated_row_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
 
   for (int i = 0; i < agg_fn_ctxs.size(); ++i) {
     agg_fn_ctxs[i]->impl()->Close();
@@ -1333,7 +1338,7 @@ Status PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
       batch.Reset();
     } while (!eos);
   }
-  input_stream->Close();
+  input_stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 0f15876..845adbe 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -435,7 +435,7 @@ void PhjBuilder::CloseAndDeletePartitions() {
   hash_partitions_.clear();
   null_aware_partition_ = NULL;
   for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
-    stream->Close();
+    stream->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
   spilled_partition_probe_streams_.clear();
 }
@@ -578,14 +578,10 @@ void PhjBuilder::Partition::Close(RowBatch* batch) {
     hash_tbl_->Close();
   }
 
-  // Transfer ownership of build_rows_ to batch if batch is not NULL.
-  // Otherwise, close the stream here.
+  // Transfer ownership of 'build_rows_' memory to 'batch' if 'batch' is not NULL.
+  // Flush out the resources to free up memory for subsequent partitions.
   if (build_rows_ != NULL) {
-    if (batch == NULL) {
-      build_rows_->Close();
-    } else {
-      batch->AddTupleStream(build_rows_.release());
-    }
+    build_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
     build_rows_.reset();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index b4f8b50..116ac02 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -230,7 +230,7 @@ void PartitionedHashJoinNode::CloseAndDeletePartitions() {
   }
   output_build_partitions_.clear();
   if (null_probe_rows_ != NULL) {
-    null_probe_rows_->Close();
+    null_probe_rows_->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     null_probe_rows_.reset();
   }
 }
@@ -275,11 +275,8 @@ Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
 void PartitionedHashJoinNode::ProbePartition::Close(RowBatch* batch) {
   if (IsClosed()) return;
   if (probe_rows_ != NULL) {
-    if (batch == NULL) {
-      probe_rows_->Close();
-    } else {
-      batch->AddTupleStream(probe_rows_.release());
-    }
+    // Flush out the resources to free up memory for subsequent partitions.
+    probe_rows_->Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
     probe_rows_.reset();
   }
 }
@@ -699,7 +696,9 @@ Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
       builder_->CloseNullAwarePartition(out_batch);
       null_aware_probe_partition_->Close(out_batch);
       null_aware_probe_partition_.reset();
-      out_batch->AddTupleStream(null_probe_rows_.release());
+      // Flush out the resources to free up memory.
+      null_probe_rows_->Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+      null_probe_rows_.reset();
       return Status::OK();
     }
   }
@@ -747,7 +746,7 @@ Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
 error:
   DCHECK(!status.ok());
   // Ensure the temporary 'probe_rows' stream is closed correctly on error.
-  probe_rows->Close();
+  probe_rows->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   return status;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 76b1bff..9063a7d 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -331,7 +331,7 @@ class SimpleTupleStreamTest : public testing::Test {
     // Verify result
     VerifyResults<T>(*desc, results, num_rows * num_batches, gen_null);
 
-    stream.Close();
+    stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
 
   void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
@@ -372,12 +372,14 @@ class SimpleTupleStreamTest : public testing::Test {
 
       VerifyResults<int>(*int_desc_, results, BATCH_SIZE * num_batches, false);
 
-      stream.Close();
+      stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     }
   }
 
   void TestUnpinPin(bool varlen_data);
 
+  void TestTransferMemory(bool pinned_stream, bool read_write);
+
   scoped_ptr<TestEnv> test_env_;
   RuntimeState* runtime_state_;
   BufferedBlockMgr::Client* client_;
@@ -614,7 +616,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
   // until the stream is closed.
   ASSERT_EQ(stream.bytes_in_mem(false), buffer_size);
 
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
   ASSERT_EQ(stream.bytes_in_mem(false), 0);
 }
@@ -672,7 +674,64 @@ TEST_F(SimpleTupleStreamTest, SmallBuffers) {
 
   // TODO: Test for IMPALA-2330. In case SwitchToIoBuffers() fails to get buffer then
   // using_small_buffers() should still return true.
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  int buffer_size = 4 * 1024;
+  InitBlockMgr(100 * buffer_size, buffer_size);
+
+  BufferedTupleStream stream(runtime_state_, *int_desc_, runtime_state_->block_mgr(),
+      client_, false, read_write);
+  ASSERT_OK(stream.Init(-1, NULL, pin_stream));
+  bool got_write_buffer;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
+  ASSERT_TRUE(got_write_buffer);
+  RowBatch* batch = CreateIntBatch(0, 1024, false);
+
+  // Construct a stream with 4 blocks.
+  const int total_num_blocks = 4;
+  while (stream.byte_size() < total_num_blocks * buffer_size) {
+    Status status;
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      bool ret = stream.AddRow(batch->GetRow(i), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+  }
+
+  bool got_read_buffer;
+  ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
+  ASSERT_TRUE(got_read_buffer);
+
+  batch->Reset();
+  stream.Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  if (pin_stream) {
+    DCHECK_EQ(total_num_blocks, batch->num_blocks());
+  } else if (read_write) {
+    // Read and write block should be attached.
+    DCHECK_EQ(2, batch->num_blocks());
+  } else {
+    // Read block should be attached.
+    DCHECK_EQ(1, batch->num_blocks());
+  }
+  DCHECK(batch->AtCapacity()); // Flush resources flag should have been set.
+  batch->Reset();
+  DCHECK_EQ(0, batch->num_blocks());
+}
+
+/// Test attaching memory to a row batch from a pinned stream.
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStream) {
+  TestTransferMemory(true, true);
+  TestTransferMemory(true, false);
+}
+
+/// Test attaching memory to a row batch from an unpinned stream.
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStream) {
+  TestTransferMemory(false, true);
+  TestTransferMemory(false, false);
 }
 
 // Test that tuple stream functions if it references strings outside stream. The
@@ -720,7 +779,7 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
   }
 
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 // Construct a big row by stiching together many tuples so the total row size
@@ -761,7 +820,7 @@ TEST_F(SimpleTupleStreamTest, BigRow) {
       runtime_state_->block_mgr(), client_, false, false);
   Status status = nullable_stream.Init(-1, NULL, true);
   ASSERT_FALSE(status.ok());
-  nullable_stream.Close();
+  nullable_stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 // Test for IMPALA-3923: overflow of 32-bit int in GetRows().
@@ -779,7 +838,7 @@ TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
   bool got_rows;
   scoped_ptr<RowBatch> overflow_batch;
   ASSERT_FALSE(stream.GetRows(&overflow_batch, &got_rows).ok());
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 // Basic API test. No data should be going to disk.
@@ -913,7 +972,7 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
   }
 
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 // Test with rows with multiple nullable tuples.
@@ -1003,7 +1062,7 @@ TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
   sv->len = 1234;
   EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
 
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 /// Test that deep copy works with arrays by copying into a BufferedTupleStream, freeing
@@ -1112,7 +1171,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
     rows_read += batch.num_rows();
   } while (!eos);
   ASSERT_EQ(NUM_ROWS, rows_read);
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 /// Test that ComputeRowSize handles nulls
@@ -1180,7 +1239,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   tuple0->SetNull(array_slot->null_indicator_offset());
   EXPECT_EQ(array_desc_->GetRowSize(), stream.ComputeRowSize(row.get()));
 
-  stream.Close();
+  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 }
 
 // TODO: more tests.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index e3b3b4a..53e1a69 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -22,7 +22,6 @@
 
 #include "runtime/collection-value.h"
 #include "runtime/descriptors.h"
-#include "runtime/row-batch.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/bit-util.h"
@@ -191,9 +190,13 @@ Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) {
   return status;
 }
 
-void BufferedTupleStream::Close() {
+void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
   for (BufferedBlockMgr::Block* block : blocks_) {
-    block->Delete();
+    if (batch != NULL && block->is_pinned()) {
+      batch->AddBlock(block, flush);
+    } else {
+      block->Delete();
+    }
   }
   blocks_.clear();
   num_pinned_ = 0;
@@ -645,11 +648,11 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos,
   batch->CommitRows(rows_to_fill);
   rows_returned_ += rows_to_fill;
   *eos = (rows_returned_ == num_rows_);
-  if ((!pinned_ || delete_on_read_) &&
-      rows_returned_curr_block + rows_to_fill == (*read_block_)->num_rows()) {
-    // No more data in this block. Mark this batch as needing to return so
-    // the caller can pass the rows up the operator tree.
-    batch->MarkNeedToReturn();
+  if ((!pinned_ || delete_on_read_)
+      && rows_returned_curr_block + rows_to_fill == (*read_block_)->num_rows()) {
+    // No more data in this block. The batch must be immediately returned up the operator
+    // tree and deep copied so that NextReadBlock() can reuse the read block's buffer.
+    batch->MarkNeedsDeepCopy();
   }
   if (FILL_INDICES) DCHECK_EQ(indices->size(), rows_to_fill);
   DCHECK_LE(read_ptr_, read_end_ptr_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index d138150..f741eda 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -23,13 +23,13 @@
 
 #include "common/status.h"
 #include "runtime/buffered-block-mgr.h"
+#include "runtime/row-batch.h"
 
 namespace impala {
 
 class BufferedBlockMgr;
 class RuntimeProfile;
 class RuntimeState;
-class RowBatch;
 class RowDescriptor;
 class SlotDescriptor;
 class TupleRow;
@@ -128,7 +128,7 @@ class TupleRow;
 /// Memory lifetime of rows read from stream:
 /// If the stream is pinned, it is valid to access any tuples returned via
 /// GetNext() or GetRows() until the stream is unpinned. If the stream is unpinned, and
-/// the batch returned from GetNext() has the need_to_return flag set, any tuple memory
+/// the batch returned from GetNext() has the needs_deep_copy flag set, any tuple memory
 /// returned so far from the stream may be freed on the next call to GetNext().
 ///
 /// Manual construction of rows with AllocateRow():
@@ -305,8 +305,11 @@ class BufferedTupleStream {
   /// *got_rows is false if the stream could not be pinned.
   Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows);
 
-  /// Must be called once at the end to cleanup all resources. Idempotent.
-  void Close();
+  /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
+  /// attaches any pinned blocks to the batch and deletes unpinned blocks. Otherwise
+  /// deletes all blocks. Does nothing if the stream was already closed. The 'flush'
+  /// mode is forwarded to RowBatch::AddBlock() when attaching blocks.
+  void Close(RowBatch* batch, RowBatch::FlushMode flush);
 
   /// Number of rows in the stream.
   int64_t num_rows() const { return num_rows_; }
@@ -321,6 +324,7 @@ class BufferedTupleStream {
   /// If ignore_current is true, the write_block_ memory is not included.
   int64_t bytes_in_mem(bool ignore_current) const;
 
+  bool is_closed() const { return closed_; }
   bool is_pinned() const { return pinned_; }
   int blocks_pinned() const { return num_pinned_; }
   int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/row-batch-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-test.cc b/be/src/runtime/row-batch-test.cc
index 041c3c2..bbdcc2d 100644
--- a/be/src/runtime/row-batch-test.cc
+++ b/be/src/runtime/row-batch-test.cc
@@ -34,8 +34,8 @@ using namespace impala;
 // For computing tuple mem layouts.
 static scoped_ptr<Frontend> fe;
 
-TEST(RowBatchTest, AcquireStateWithMarkAtCapacity) {
-  // Test that AcquireState() can be correctly called with MarkAtCapacity() on the
+TEST(RowBatchTest, AcquireStateWithMarkFlushResources) {
+  // Test that AcquireState() can be correctly called with MarkFlushResources() on the
   // source batch.
   ObjectPool pool;
   DescriptorTblBuilder builder(fe.get(), &pool);
@@ -48,9 +48,10 @@ TEST(RowBatchTest, AcquireStateWithMarkAtCapacity) {
   MemTracker tracker;
   {
     RowBatch src(row_desc, 1024, &tracker);
-    src.AddRow(); src.CommitLastRow();
-    // Calls MarkAtCapacity().
-    src.MarkNeedToReturn();
+    src.AddRow();
+    src.CommitLastRow();
+    // Calls MarkFlushResources().
+    src.MarkNeedsDeepCopy();
 
     // Note InitialCapacity(), not capacity(). Latter will DCHECK.
     RowBatch dest(row_desc, src.InitialCapacity(), &tracker);
@@ -60,9 +61,10 @@ TEST(RowBatchTest, AcquireStateWithMarkAtCapacity) {
   // Confirm the bad pattern causes an error.
   {
     RowBatch src(row_desc, 1024, &tracker);
-    src.AddRow(); src.CommitLastRow();
-    // Calls MarkAtCapacity().
-    src.MarkNeedToReturn();
+    src.AddRow();
+    src.CommitLastRow();
+    // Calls MarkFlushResources().
+    src.MarkNeedsDeepCopy();
     RowBatch bad_dest(row_desc, src.capacity(), &tracker);
     IMPALA_ASSERT_DEBUG_DEATH(bad_dest.AcquireState(&src), "");
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index daac913..6daa02f 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -20,7 +20,6 @@
 #include <stdint.h>  // for intptr_t
 #include <boost/scoped_ptr.hpp>
 
-#include "runtime/buffered-tuple-stream.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
@@ -41,11 +40,11 @@ namespace impala {
 const int RowBatch::AT_CAPACITY_MEM_USAGE;
 const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
 
-RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity,
-    MemTracker* mem_tracker)
+RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_tracker)
   : num_rows_(0),
     capacity_(capacity),
-    need_to_return_(false),
+    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    needs_deep_copy_(false),
     num_tuples_per_row_(row_desc.tuple_descriptors().size()),
     auxiliary_mem_usage_(0),
     tuple_data_pool_(mem_tracker),
@@ -71,11 +70,12 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity,
 //              xfer += iprot->readString(this->tuple_data[_i9]);
 // to allocated string data in special mempool
 // (change via python script that runs over Data_types.cc)
-RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
-    MemTracker* mem_tracker)
+RowBatch::RowBatch(
+    const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
   : num_rows_(input_batch.num_rows),
     capacity_(input_batch.num_rows),
-    need_to_return_(false),
+    flush_(FlushMode::NO_FLUSH_RESOURCES),
+    needs_deep_copy_(false),
     num_tuples_per_row_(input_batch.row_tuples.size()),
     auxiliary_mem_usage_(0),
     tuple_data_pool_(mem_tracker),
@@ -154,7 +154,6 @@ RowBatch::~RowBatch() {
   for (int i = 0; i < io_buffers_.size(); ++i) {
     io_buffers_[i]->Return();
   }
-  CloseTupleStreams();
   for (int i = 0; i < blocks_.size(); ++i) {
     blocks_[i]->Delete();
   }
@@ -298,18 +297,12 @@ void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer) {
   buffer->TransferOwnership(mem_tracker_);
 }
 
-void RowBatch::AddTupleStream(BufferedTupleStream* stream) {
-  DCHECK(stream != NULL);
-  tuple_streams_.push_back(stream);
-  auxiliary_mem_usage_ += stream->byte_size();
-  // Batches with attached streams are always considered at capacity.
-  MarkAtCapacity();
-}
-
-void RowBatch::AddBlock(BufferedBlockMgr::Block* block) {
+void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
   DCHECK(block != NULL);
+  DCHECK(block->is_pinned());
   blocks_.push_back(block);
   auxiliary_mem_usage_ += block->buffer_len();
+  if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
 }
 
 void RowBatch::Reset() {
@@ -321,7 +314,6 @@ void RowBatch::Reset() {
     io_buffers_[i]->Return();
   }
   io_buffers_.clear();
-  CloseTupleStreams();
   for (int i = 0; i < blocks_.size(); ++i) {
     blocks_[i]->Delete();
   }
@@ -330,15 +322,8 @@ void RowBatch::Reset() {
   if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
     tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
   }
-  need_to_return_ = false;
-}
-
-void RowBatch::CloseTupleStreams() {
-  for (int i = 0; i < tuple_streams_.size(); ++i) {
-    tuple_streams_[i]->Close();
-    delete tuple_streams_[i];
-  }
-  tuple_streams_.clear();
+  flush_ = FlushMode::NO_FLUSH_RESOURCES;
+  needs_deep_copy_ = false;
 }
 
 void RowBatch::TransferResourceOwnership(RowBatch* dest) {
@@ -347,15 +332,15 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
     dest->AddIoBuffer(io_buffers_[i]);
   }
   io_buffers_.clear();
-  for (int i = 0; i < tuple_streams_.size(); ++i) {
-    dest->AddTupleStream(tuple_streams_[i]);
-  }
-  tuple_streams_.clear();
   for (int i = 0; i < blocks_.size(); ++i) {
-    dest->AddBlock(blocks_[i]);
+    dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
   }
   blocks_.clear();
-  if (need_to_return_) dest->MarkNeedToReturn();
+  if (needs_deep_copy_) {
+    dest->MarkNeedsDeepCopy();
+  } else if (flush_ == FlushMode::FLUSH_RESOURCES) {
+    dest->MarkFlushResources();
+  }
   if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
     // Tuple pointers were allocated from tuple_data_pool_ so are transferred.
     tuple_ptrs_ = NULL;
@@ -376,7 +361,7 @@ void RowBatch::AcquireState(RowBatch* src) {
   DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
 
   // The destination row batch should be empty.
-  DCHECK(!need_to_return_);
+  DCHECK(!needs_deep_copy_);
   DCHECK_EQ(num_rows_, 0);
   DCHECK_EQ(auxiliary_mem_usage_, 0);
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 75c1f44..91433c4 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -33,7 +33,6 @@
 
 namespace impala {
 
-class BufferedTupleStream;
 template <typename K, typename V> class FixedSizeHashTable;
 class MemTracker;
 class RowBatchSerializeTest;
@@ -43,7 +42,6 @@ class Tuple;
 class TupleRow;
 class TupleDescriptor;
 
-
 /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
 /// The maximum number of rows is fixed at the time of construction.
 /// The row batch reference a few different sources of memory.
@@ -76,6 +74,14 @@ class TupleDescriptor;
 /// auxiliary memory up to a soft cap. (See at_capacity_mem_usage_ comment).
 class RowBatch {
  public:
+  /// Flag indicating whether the resources attached to a RowBatch need to be flushed.
+  /// Defined here as a convenience for other modules that need to communicate flushing
+  /// modes.
+  enum class FlushMode {
+    FLUSH_RESOURCES,
+    NO_FLUSH_RESOURCES,
+  };
+
   /// Create RowBatch for a maximum of 'capacity' rows of tuples specified
   /// by 'row_desc'.
   /// tracker cannot be NULL.
@@ -129,10 +135,10 @@ class RowBatch {
   /// auxiliary memory attached to the row batch.
   bool ALWAYS_INLINE AtCapacity() {
     DCHECK_LE(num_rows_, capacity_);
-    // Check AtCapacity() conditions enforced in MarkNeedToReturn() and AddTupleStream()
-    if (need_to_return_) DCHECK_EQ(num_rows_, capacity_);
-    if (num_tuple_streams() > 0) DCHECK_EQ(num_rows_, capacity_);
-    DCHECK((num_tuple_streams() == 0 && !need_to_return_) || num_rows_ == capacity_);
+    // Check AtCapacity() condition enforced in MarkNeedsDeepCopy() and
+    // MarkFlushResources().
+    DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
+        || num_rows_ == capacity_);
     int64_t mem_usage = auxiliary_mem_usage_ + tuple_data_pool_.total_allocated_bytes();
     return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
   }
@@ -202,7 +208,6 @@ class RowBatch {
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
   int num_io_buffers() const { return io_buffers_.size(); }
   int num_blocks() const { return blocks_.size(); }
-  int num_tuple_streams() const { return tuple_streams_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.
   void Reset();
@@ -210,23 +215,45 @@ class RowBatch {
   /// Add io buffer to this row batch.
   void AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer);
 
-  /// Add tuple stream to this row batch. The row batch takes ownership of the stream
-  /// and will call Close() on the stream and delete it when freeing resources.
-  void AddTupleStream(BufferedTupleStream* stream);
-
   /// Adds a block to this row batch. The block must be pinned. The blocks must be
-  /// deleted when freeing resources.
-  void AddBlock(BufferedBlockMgr::Block* block);
-
-  /// Called to indicate that resources backing this row batch will be cleaned up after
-  /// the next GetNext() call and the batch must be returned up the operator tree.
-  /// This is used to control memory management for streaming rows.
-  void MarkNeedToReturn() {
-    need_to_return_ = true;
-    MarkAtCapacity();
+  /// deleted when freeing resources. The block's memory remains accounted against
+  /// the original owner, even when the ownership of batches is transferred. If the
+  /// original owner wants the memory to be released, it should call this with 'mode'
+  /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
+  /// TODO: after IMPALA-3200, make the ownership transfer model consistent between
+  /// Blocks and I/O buffers.
+  void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
+
+  /// Used by an operator to indicate that it cannot produce more rows until the
+  /// resources that it has attached to the row batch are freed or acquired by an
+  /// ancestor operator. After this is called, the batch is at capacity and no more rows
+  /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This
+  /// ensures that batches are flushed by streaming operators all the way up the operator
+  /// tree. Blocking operators can still accumulate batches with this flag.
+  /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including
+  /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the
+  /// resources will not be accounted against the original operator (this is currently
+  /// not true for Blocks, which can't be transferred).
+  void MarkFlushResources() {
+    DCHECK_LE(num_rows_, capacity_);
+    capacity_ = num_rows_;
+    flush_ = FlushMode::FLUSH_RESOURCES;
+  }
+
+  /// Called to indicate that some resources backing this batch were not attached and
+  /// will be cleaned up after the next GetNext() call. This means that the batch must
+  /// be returned up the operator tree. Blocking operators must deep-copy any rows from
+  /// this batch or preceding batches.
+  ///
+  /// This is a stronger version of MarkFlushResources(), because blocking operators
+  /// are not allowed to accumulate batches with the 'needs_deep_copy' flag.
+  /// TODO: IMPALA-4179: always attach backing resources and remove this flag.
+  void MarkNeedsDeepCopy() {
+    MarkFlushResources(); // No more rows should be added to the batch.
+    needs_deep_copy_ = true;
   }
 
-  bool need_to_return() { return need_to_return_; }
+  bool needs_deep_copy() { return needs_deep_copy_; }
 
   /// Transfer ownership of resources to dest.  This includes tuple data in mem
   /// pool and io buffers.
@@ -334,15 +361,6 @@ class RowBatch {
   void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
       TRowBatch* output_batch);
 
-  /// Close owned tuple streams and delete if needed.
-  void CloseTupleStreams();
-
-  /// Mark that no more rows should be added to the batch.
-  void MarkAtCapacity() {
-    DCHECK_LE(num_rows_, capacity_);
-    capacity_ = num_rows_;
-  }
-
   /// All members below need to be handled in RowBatch::AcquireState()
 
   // Class members that are accessed on performance-critical paths should appear
@@ -351,9 +369,15 @@ class RowBatch {
   int num_rows_;  // # of committed rows
   int capacity_;  // the value of num_rows_ at which batch is considered full.
 
+  /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
+  /// acquired by a new owner as soon as possible. See MarkFlushResources(). If
+  /// FLUSH_RESOURCES, AtCapacity() is also true.
+  FlushMode flush_;
+
   /// If true, this batch references unowned memory that will be cleaned up soon.
-  /// See MarkNeedToReturn().
-  bool need_to_return_;
+  /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
+  /// AtCapacity() is true.
+  bool needs_deep_copy_;
 
   const int num_tuples_per_row_;
 
@@ -396,9 +420,6 @@ class RowBatch {
   /// (i.e. they are not ref counted) so most row batches don't own any.
   std::vector<DiskIoMgr::BufferDescriptor*> io_buffers_;
 
-  /// Tuple streams currently owned by this row batch.
-  std::vector<BufferedTupleStream*> tuple_streams_;
-
   /// Blocks attached to this row batch. The underlying memory and block manager client
   /// are owned by the BufferedBlockMgr.
   std::vector<BufferedBlockMgr::Block*> blocks_;
@@ -412,7 +433,6 @@ class RowBatch {
   /// allocated to the right size.
   std::string compression_scratch_;
 };
-
 }
 
 /// Macros for iterating through '_row_batch', starting at '_start_row_idx'.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc b/be/src/runtime/sorted-run-merger.cc
index dd0ea30..8f8891a 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -65,8 +65,9 @@ class SortedRunMerger::SortedRunWrapper {
     do {
       // Make sure to transfer resources from every batch received from 'sorted_run_'.
       if (transfer_batch != NULL) {
-        DCHECK(!input_row_batch_->need_to_return()) << "Run batch suppliers that set the "
-          "need_to_return flag must use a deep-copying merger";
+        DCHECK(!input_row_batch_->needs_deep_copy())
+            << "Run batch suppliers that set the "
+               "needs_deep_copy flag must use a deep-copying merger";
         input_row_batch_->TransferResourceOwnership(transfer_batch);
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h b/be/src/runtime/sorted-run-merger.h
index e81ca88..563b6f7 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -43,9 +43,9 @@ class RuntimeProfile;
 /// and transfers resource ownership from the input batches to the output batch when
 /// an input batch is processed.
 ///
-/// SortedRunMerger cannot handle the 'need_to_return' resource-transfer model so
-/// if the RunBatchSupplierFn can return batches with the 'need_to_return' flag set,
-/// the merger must have 'deep_copy_input'. TODO: once 'need_to_return' is deprecated,
+/// SortedRunMerger cannot handle the 'needs_deep_copy' resource-transfer model so
+/// if the RunBatchSupplierFn can return batches with the 'needs_deep_copy' flag set,
+/// the merger must have 'deep_copy_input' set. TODO: once 'needs_deep_copy' is removed
 /// this is no longer a problem.
 class SortedRunMerger {
  public:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ba026f27/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 513aba2..ed00cd2 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -822,9 +822,13 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
       // that we don't attach resources to the batch on eos.
       DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 0);
       DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 0);
+
+      // Flush resources in case we are in a subplan and need to allocate more blocks
+      // when the node is reopened.
+      output_batch->MarkFlushResources();
     } else {
       // We held onto the last fixed or var len blocks without transferring them to the
-      // caller. We signalled MarkNeedToReturn() to the caller, so we can safely delete
+      // caller. We signalled MarkNeedsDeepCopy() to the caller, so we can safely delete
       // them now to free memory.
       if (!fixed_len_blocks_.empty()) DCHECK_EQ(NumNonNullBlocks(fixed_len_blocks_), 1);
       if (!var_len_blocks_.empty()) DCHECK_EQ(NumNonNullBlocks(var_len_blocks_), 1);
@@ -866,7 +870,7 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
       // this block for the next in the next GetNext() call. So therefore we must hold
       // onto the current var-len block and signal to the caller that the block is going
       // to be deleted.
-      output_batch->MarkNeedToReturn();
+      output_batch->MarkNeedsDeepCopy();
       end_of_var_len_block_ = true;
       break;
     }
@@ -880,14 +884,15 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
     // Reached the block boundary, need to move to the next block.
     if (is_pinned_) {
       // Attach block to batch. Caller can delete the block when it wants to.
-      output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_]);
+      output_batch->AddBlock(fixed_len_blocks_[fixed_len_blocks_index_],
+          RowBatch::FlushMode::NO_FLUSH_RESOURCES);
       fixed_len_blocks_[fixed_len_blocks_index_] = NULL;
 
       // Attach the var-len blocks at eos once no more rows will reference the blocks.
       if (fixed_len_blocks_index_ == fixed_len_blocks_.size() - 1) {
         for (BufferedBlockMgr::Block* var_len_block: var_len_blocks_) {
           DCHECK(var_len_block != NULL);
-          output_batch->AddBlock(var_len_block);
+          output_batch->AddBlock(var_len_block, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
         }
         var_len_blocks_.clear();
       }
@@ -895,7 +900,7 @@ Status Sorter::Run::GetNext(RowBatch* output_batch, bool* eos) {
       // To iterate over unpinned runs, we need to exchange this block for the next
       // in the next GetNext() call, so we need to hold onto the block and signal to
       // the caller that the block is going to be deleted.
-      output_batch->MarkNeedToReturn();
+      output_batch->MarkNeedsDeepCopy();
     }
     end_of_fixed_len_block_ = true;
   }