You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/31 06:48:28 UTC

[2/5] impala git commit: IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS

This takes advantage of work (e.g. IMPALA-3200, IMPALA-5844)
to remove a couple of uses of the API.

Testing:
Ran core, ASAN and exhaustive builds.

Added unit tests to directly test the attaching behaviour.

Change-Id: I91ac53bacc00df4726c015a30ba5a2026aa4b5f5
Reviewed-on: http://gerrit.cloudera.org:8080/11007
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/240fde62
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/240fde62
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/240fde62

Branch: refs/heads/master
Commit: 240fde62d532c7166fc613a97b38c199cec09f1f
Parents: 9146f73
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jul 19 11:20:40 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 02:25:27 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc | 249 ++++++++++++++++++++--
 be/src/runtime/buffered-tuple-stream.cc      | 145 ++++++++-----
 be/src/runtime/buffered-tuple-stream.h       | 138 +++++++-----
 be/src/runtime/row-batch.h                   |   1 +
 be/src/runtime/tuple.h                       |   4 +
 6 files changed, 408 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 092ecfd..42d6be8 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -294,8 +294,12 @@ Status GroupingAggregator::GetRowsFromPartition(
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   partition_eos_ = ReachedLimit();
-  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
-
+  if (partition_eos_ || output_iterator_.AtEnd()) {
+    // Attach all buffers referenced by previously-returned rows. On the next GetNext()
+    // call we will close the partition.
+    output_partition_->aggregated_row_stream->Close(
+        row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  }
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index ef66824..6ff9805 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -398,6 +398,22 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestTransferMemory(bool pinned_stream, bool read_write);
 
+  void TestAttachMemory(bool pinned_stream, bool attach_on_read);
+
+  void TestFlushResourcesReadWrite(bool pinned_stream, bool attach_on_read);
+
+  /// Helper for TestFlushResourcesReadWrite() to write and read back rows from
+  /// *stream. 'append_batch_size' is the number of rows to append at a time before
+  /// reading them back. *num_buffers_attached tracks the number of buffers attached
+  /// to the output batch.
+  void AppendToReadWriteStream(int64_t append_batch_size, int64_t buffer_size,
+      int* num_buffers_attached, BufferedTupleStream* stream);
+
+  // Helper for AppendToReadWriteStream() to verify 'out_batch' contents. The value of
+  // row i of 'out_batch' is expected to be the same as the row at index
+  // (i + start_index) % out_batch->num_rows() of 'in_batch'.
+  void VerifyReadWriteBatch(RowBatch* in_batch, RowBatch* out_batch, int64_t start_index);
+
   // Helper to writes 'row' comprised of only string slots to 'data'. The expected
   // length of the data written is 'expected_len'.
   void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
@@ -649,13 +665,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
   ASSERT_TRUE(pinned);
 
   // Read and verify result a few times. We should be able to reread the stream if
-  // we don't use delete on read mode.
+  // we don't use attach on read mode.
   int read_iters = 3;
   for (int i = 0; i < read_iters; ++i) {
-    bool delete_on_read = i == read_iters - 1;
+    bool attach_on_read = i == read_iters - 1;
     if (i > 0 || !read_write) {
       bool got_read_reservation;
-      ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
       ASSERT_TRUE(got_read_reservation);
     }
 
@@ -670,15 +686,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     }
   }
 
-  // After delete_on_read, all blocks aside from the last should be deleted.
-  // Note: this should really be 0, but the BufferedTupleStream returns eos before
-  // deleting the last block, rather than after, so the last block isn't deleted
-  // until the stream is closed.
-  ASSERT_EQ(stream.BytesPinned(false), buffer_size);
+  // After attach_on_read, all buffers should have been attached to the output batches
+  // on previous GetNext() calls.
+  ASSERT_EQ(0, stream.BytesPinned(false));
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
-  ASSERT_EQ(stream.BytesPinned(false), 0);
+  ASSERT_EQ(0, stream.BytesPinned(false));
 }
 
 TEST_F(SimpleTupleStreamTest, UnpinPin) {
@@ -765,6 +779,205 @@ TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
   TestTransferMemory(false, false);
 }
 
+/// Test iteration over a stream with and without attaching memory.
+void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+  RowBatch* in_batch = CreateIntBatch(0, 1024, false);
+
+  // Construct a stream with 4 pages.
+  const int total_num_pages = 4;
+  while (stream.byte_size() < total_num_pages * buffer_size) {
+    Status status;
+    for (int i = 0; i < in_batch->num_rows(); ++i) {
+      bool ret = stream.AddRow(in_batch->GetRow(i), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+  }
+
+  RowBatch* out_batch = pool_.Add(new RowBatch(int_desc_, 100, &tracker_));
+  int num_buffers_attached = 0;
+  int num_flushes = 0;
+  int64_t num_rows_returned = 0;
+  bool got_read_reservation;
+  ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+  ASSERT_TRUE(got_read_reservation);
+  bool eos;
+  do {
+    ASSERT_EQ(0, out_batch->num_buffers());
+    ASSERT_OK(stream.GetNext(out_batch, &eos));
+    EXPECT_LE(out_batch->num_buffers(), 1) << "Should only attach one buffer at a time";
+    if (out_batch->num_buffers() > 0) {
+      EXPECT_TRUE(out_batch->AtCapacity()) << "Flush resources flag should have been set";
+    }
+    num_buffers_attached += out_batch->num_buffers();
+    for (int i = 0; i < out_batch->num_rows(); ++i) {
+      int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+      TupleRow* in_row = in_batch->GetRow(num_rows_returned % in_batch->num_rows());
+      EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+          *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+      ++num_rows_returned;
+    }
+    num_flushes += out_batch->flush_mode() == RowBatch::FlushMode::FLUSH_RESOURCES;
+    out_batch->Reset();
+  } while (!eos);
+
+  if (attach_on_read) {
+    EXPECT_EQ(4, num_buffers_attached) << "All buffers attached during iteration.";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached) << "No buffers attached during iteration.";
+  }
+  if (attach_on_read || !pin_stream) EXPECT_EQ(4, num_flushes);
+  out_batch->Reset();
+  stream.Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  if (attach_on_read) {
+    EXPECT_EQ(0, out_batch->num_buffers());
+  } else if (pin_stream) {
+    // All buffers should be attached.
+    EXPECT_EQ(4, out_batch->num_buffers());
+  } else {
+    // Buffer from last pinned page should be attached.
+    EXPECT_EQ(1, out_batch->num_buffers());
+  }
+  in_batch->Reset();
+  out_batch->Reset();
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryPinned) {
+  TestAttachMemory(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryPinned) {
+  TestAttachMemory(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestAttachMemoryUnpinned) {
+  TestAttachMemory(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryUnpinned) {
+  TestAttachMemory(false, false);
+}
+
+// Test for advancing the read/write page with resource flushing.
+void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
+    bool pin_stream, bool attach_on_read) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  const int BUFFER_SIZE = 512;
+  const int BATCH_SIZE = 100;
+  // For unpinned streams, we should be able to iterate with only two buffers.
+  const int MAX_PINNED_PAGES = pin_stream ? 1000 : 2;
+  Init(MAX_PINNED_PAGES * BUFFER_SIZE);
+
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  bool got_reservation;
+  ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
+  ASSERT_TRUE(got_reservation);
+  int num_buffers_attached = 0;
+  /// Read over the page in different increments.
+  for (int append_batch_size : {1, 10, 100, 1000}) {
+    AppendToReadWriteStream(
+        append_batch_size, BUFFER_SIZE, &num_buffers_attached, &stream);
+  }
+
+  if (attach_on_read) {
+    EXPECT_EQ(stream.byte_size() / BUFFER_SIZE - 1, num_buffers_attached)
+        << "All buffers except the current write page should have been attached";
+  } else {
+    EXPECT_EQ(0, num_buffers_attached);
+  }
+
+  RowBatch* final_out_batch = pool_.Add(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+  stream.Close(final_out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  final_out_batch->Reset();
+}
+
+void SimpleTupleStreamTest::AppendToReadWriteStream(int64_t append_batch_size,
+    int64_t buffer_size, int* num_buffers_attached, BufferedTupleStream* stream) {
+  RowBatch* in_batch = CreateIntBatch(0, BATCH_SIZE, false);
+
+  /// Accumulate row batches until we see a flush. The contents of the batches should
+  /// remain valid until reset or delete trailing batches.
+  vector<unique_ptr<RowBatch>> out_batches;
+  // The start row index of each batch in 'out_batches'.
+  vector<int64_t> out_batch_start_indices;
+  // Iterate over at least 10 pages.
+  int64_t start_byte_size = stream->byte_size();
+  while (stream->byte_size() - start_byte_size < 10 * buffer_size) {
+    Status status;
+    for (int i = 0; i < append_batch_size; ++i) {
+      bool ret = stream->AddRow(
+          in_batch->GetRow(stream->num_rows() % in_batch->num_rows()), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+    int64_t rows_read = 0;
+    bool eos;
+    while (rows_read < append_batch_size) {
+      out_batches.emplace_back(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
+      out_batch_start_indices.push_back(stream->rows_returned());
+      ASSERT_OK(stream->GetNext(out_batches.back().get(), &eos));
+      // Verify the contents of all valid batches to make sure that they haven't become
+      // invalid.
+      LOG(INFO) << "Verifying " << out_batches.size() << " batches";
+      for (int i = 0; i < out_batches.size(); ++i) {
+        VerifyReadWriteBatch(in_batch, out_batches[i].get(), out_batch_start_indices[i]);
+      }
+      *num_buffers_attached += out_batches.back()->num_buffers();
+      rows_read += out_batches.back()->num_rows();
+      EXPECT_EQ(rows_read == append_batch_size, eos);
+      if (out_batches.back().get()->flush_mode()
+          == RowBatch::FlushMode::FLUSH_RESOURCES) {
+        out_batches.clear();
+        out_batch_start_indices.clear();
+      }
+    }
+    EXPECT_EQ(append_batch_size, rows_read);
+    EXPECT_EQ(true, eos);
+  }
+  in_batch->Reset();
+}
+
+void SimpleTupleStreamTest::VerifyReadWriteBatch(
+    RowBatch* in_batch, RowBatch* out_batch, int64_t start_index) {
+  int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
+  int64_t row_index = start_index;
+  for (int i = 0; i < out_batch->num_rows(); ++i) {
+    TupleRow* in_row = in_batch->GetRow(row_index++ % in_batch->num_rows());
+    EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
+        *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
+  }
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedAttach) {
+  TestFlushResourcesReadWrite(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedNoAttach) {
+  TestFlushResourcesReadWrite(true, false);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedAttach) {
+  TestFlushResourcesReadWrite(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedNoAttach) {
+  TestFlushResourcesReadWrite(false, false);
+}
+
 // Test that tuple stream functions if it references strings outside stream. The
 // aggregation node relies on this since it updates tuples in-place.
 TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
@@ -805,11 +1018,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 
   DCHECK_EQ(rows_added, stream.num_rows());
 
-  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
+  for (int attach_on_read = 0; attach_on_read <= 1; ++attach_on_read) {
     // Keep stream in memory and test we can read ok.
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
@@ -934,8 +1147,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   vector<uint8_t> tuple_mem(tuple_desc->byte_size());
   Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
   write_row->SetTuple(0, write_tuple);
-  StringValue* write_str = reinterpret_cast<StringValue*>(
-      write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+  StringValue* write_str =
+      write_tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
   // Make the string large enough to fill a page.
   const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
   vector<char> data(string_len);
@@ -961,8 +1174,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_TRUE(eos);
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       EXPECT_EQ(i, str->ptr[j]) << j;
@@ -988,8 +1200,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = reinterpret_cast<StringValue*>(
-        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
+    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       ASSERT_EQ(i, str->ptr[j]) << j;
@@ -1117,10 +1328,10 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   }
 
   for (int i = 0; i < 3; ++i) {
-    bool delete_on_read = i == 2;
+    bool attach_on_read = i == 2;
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 9326507..71175d1 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,36 +47,20 @@ using namespace impala;
 using namespace strings;
 
 using BufferHandle = BufferPool::BufferHandle;
+using FlushMode = RowBatch::FlushMode;
 
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
-    node_id_(-1),
     buffer_pool_(state->exec_env()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
-    num_pages_(0),
-    total_byte_size_(0),
-    has_read_iterator_(false),
     read_page_reservation_(buffer_pool_client_),
-    read_page_rows_returned_(-1),
-    read_ptr_(nullptr),
-    read_end_ptr_(nullptr),
-    write_ptr_(nullptr),
-    write_end_ptr_(nullptr),
-    rows_returned_(0),
-    has_write_iterator_(false),
-    write_page_(nullptr),
     write_page_reservation_(buffer_pool_client_),
-    bytes_pinned_(0),
-    num_rows_(0),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
-    delete_on_read_(false),
-    closed_(false),
-    pinned_(true) {
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()) {
   DCHECK_GE(max_page_len, default_page_len);
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
@@ -110,10 +94,6 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   }
 }
 
-BufferedTupleStream::~BufferedTupleStream() {
-  DCHECK(closed_);
-}
-
 void BufferedTupleStream::CheckConsistencyFull() const {
   CheckConsistencyFast();
   // The below checks require iterating over all the pages in the stream.
@@ -139,11 +119,13 @@ void BufferedTupleStream::CheckConsistencyFast() const {
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
     CheckPageConsistency(&*read_page_);
-    DCHECK(read_page_->is_pinned());
-    DCHECK(read_page_->retrieved_buffer);
-    // Can't check read buffer without affecting behaviour, because a read may be in
-    // flight and this would required blocking on that write.
-    DCHECK_GE(read_end_ptr_, read_ptr_);
+    if (!read_page_->attached_to_output_batch) {
+      DCHECK(read_page_->is_pinned());
+      DCHECK(read_page_->retrieved_buffer);
+      // Can't check read buffer without affecting behaviour, because a read may be in
+      // flight and this would required blocking on that write.
+      DCHECK_GE(read_end_ptr_, read_ptr_);
+    }
   }
   if (NeedReadReservation()) {
     DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
@@ -159,6 +141,12 @@ void BufferedTupleStream::CheckConsistencyFast() const {
 }
 
 void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
+  if (page->attached_to_output_batch) {
+    /// Read page was just attached to output batch.
+    DCHECK(is_read_page(page)) << page->DebugString();
+    DCHECK(!page->handle.is_open());
+    return;
+  }
   DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
   // Only one large row per page.
   if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
@@ -170,7 +158,7 @@ string BufferedTupleStream::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStream num_rows=" << num_rows_
      << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
+     << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
      << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
      << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
      << " read_page=";
@@ -201,8 +189,23 @@ string BufferedTupleStream::DebugString() const {
   return ss.str();
 }
 
+void BufferedTupleStream::Page::AttachBufferToBatch(
+    BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
+  DCHECK(is_pinned());
+  DCHECK(retrieved_buffer);
+  parent->bytes_pinned_ -= len();
+  // ExtractBuffer() cannot fail because the buffer is already in memory.
+  BufferPool::BufferHandle buffer;
+  Status status =
+      parent->buffer_pool_->ExtractBuffer(parent->buffer_pool_client_, &handle, &buffer);
+  DCHECK(status.ok());
+  batch->AddBuffer(parent->buffer_pool_client_, move(buffer), flush);
+  attached_to_output_batch = true;
+}
+
 string BufferedTupleStream::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
+  return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
+      handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
 }
 
 Status BufferedTupleStream::Init(int node_id, bool pinned) {
@@ -214,7 +217,7 @@ Status BufferedTupleStream::Init(int node_id, bool pinned) {
 Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -229,10 +232,10 @@ Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
 }
 
 Status BufferedTupleStream::PrepareForReadWrite(
-    bool delete_on_read, bool* got_reservation) {
+    bool attach_on_read, bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -243,20 +246,17 @@ Status BufferedTupleStream::PrepareForReadWrite(
   // Save reservation for both the read and write iterators.
   buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
+  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
   return Status::OK();
 }
 
-void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
+void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
   for (Page& page : pages_) {
+    if (page.attached_to_output_batch) continue; // Already returned.
     if (batch != nullptr && page.retrieved_buffer) {
       // Subtle: We only need to attach buffers from pages that we may have returned
-      // references to. ExtractBuffer() cannot fail for these pages because the data
-      // is guaranteed to already be in -memory.
-      BufferPool::BufferHandle buffer;
-      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
-      DCHECK(status.ok());
-      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
+      // references to.
+      page.AttachBufferToBatch(this, batch, flush);
     } else {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
     }
@@ -271,7 +271,9 @@ void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
 
 int64_t BufferedTupleStream::CalcBytesPinned() const {
   int64_t result = 0;
-  for (const Page& page : pages_) result += page.pin_count() * page.len();
+  for (const Page& page : pages_) {
+    if (!page.attached_to_output_batch) result += page.pin_count() * page.len();
+  }
   return result;
 }
 
@@ -282,6 +284,7 @@ Status BufferedTupleStream::PinPage(Page* page) {
 }
 
 int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const {
+  DCHECK(!page->attached_to_output_batch);
   return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
 }
 
@@ -483,12 +486,11 @@ Status BufferedTupleStream::NextReadPage() {
         && !NeedReadReservation(pinned_, num_pages_, true, true)) {
       buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
     }
-  } else if (delete_on_read_) {
+  } else if (attach_on_read_) {
     DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
                                          << DebugString();
     DCHECK_NE(&*read_page_, write_page_);
-    bytes_pinned_ -= pages_.front().len();
-    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
+    DCHECK(read_page_->attached_to_output_batch);
     pages_.pop_front();
     --num_pages_;
     read_page_ = pages_.begin();
@@ -557,12 +559,13 @@ void BufferedTupleStream::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
-  // It is safe to re-read a delete-on-read stream if no rows were read and no pages
+  // It is safe to re-read an attach-on-read stream if no rows were read and no pages
   // were therefore deleted.
-  if (rows_returned_ == 0) delete_on_read_ = false;
+  DCHECK(attach_on_read_ == false || rows_returned_ == 0);
+  if (rows_returned_ == 0) attach_on_read_ = false;
 }
 
-Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) {
+Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
   CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
@@ -570,12 +573,12 @@ Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reserv
   *got_reservation = pinned_ || pages_.empty()
       || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(delete_on_read);
+  return PrepareForReadInternal(attach_on_read);
 }
 
-Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
   DCHECK(!closed_);
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   DCHECK(!has_read_iterator());
 
   has_read_iterator_ = true;
@@ -599,7 +602,7 @@ Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
   }
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
-  delete_on_read_ = delete_on_read;
+  attach_on_read_ = attach_on_read;
   CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
@@ -708,6 +711,15 @@ Status BufferedTupleStream::GetNextInternal(
 
   if (UNLIKELY(read_page_ == pages_.end()
           || read_page_rows_returned_ == read_page_->num_rows)) {
+    if (read_page_ != pages_.end() && attach_on_read_
+        && !read_page_->attached_to_output_batch) {
+      DCHECK(has_write_iterator());
+      // We're in a read-write stream in the case where we're at the end of the read page
+      // but the buffer was not attached on the last GetNext() call because the write
+      // iterator had not yet advanced.
+      read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      return Status::OK();
+    }
     // Get the next page in the stream (or the first page if read_page_ was not yet
     // initialized.) We need to do this at the beginning of the GetNext() call to ensure
     // the buffer management semantics. NextReadPage() may unpin or delete the buffer
@@ -729,7 +741,7 @@ Status BufferedTupleStream::GetNextInternal(
   // null tuple indicator.
   if (FILL_FLAT_ROWS) {
     DCHECK(flat_rows != nullptr);
-    DCHECK(!delete_on_read_);
+    DCHECK(!attach_on_read_);
     DCHECK_EQ(batch->num_rows(), 0);
     flat_rows->clear();
     flat_rows->reserve(rows_to_fill);
@@ -768,11 +780,28 @@ Status BufferedTupleStream::GetNextInternal(
   rows_returned_ += rows_to_fill;
   read_page_rows_returned_ += rows_to_fill;
   *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
-    // No more data in this page. The batch must be immediately returned up the operator
-    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
-    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
-    batch->MarkNeedsDeepCopy();
+  if (read_page_rows_returned_ == read_page_->num_rows) {
+    // No more data in this page. NextReadPage() may need to reuse the reservation
+    // currently used for 'read_page_' so we may need to flush resources. When
+    // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
+    // be unpinned later but we're returning a reference to the memory so we need to
+    // signal to the caller that the resources are going away. Note that if there is a
+    // read-write page it is not safe to attach the buffer yet because more rows may be
+    // appended to the page.
+    if (attach_on_read_) {
+      if (!has_read_write_page()) {
+        // Safe to attach because we already called GetBuffer() in NextReadPage().
+        // TODO: always flushing for pinned streams is overkill since we may not need
+        // to reuse the reservation immediately. Changing this may require modifying
+        // callers of this class.
+        read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
+      }
+    } else if (!pinned_) {
+      // Flush resources so that we can safely unpin the page on the next GetNext() call.
+      // Note that if this is a read/write page we might not actually do the advance on
+      // the next call to GetNext(). In that case the flush is still safe to do.
+      batch->MarkFlushResources();
+    }
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
   DCHECK_LE(read_ptr_, read_end_ptr_);
@@ -1028,7 +1057,7 @@ void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const
   DCHECK(row != nullptr);
   DCHECK(!closed_);
   DCHECK(is_pinned());
-  DCHECK(!delete_on_read_);
+  DCHECK(!attach_on_read_);
   uint8_t* data = flat_row;
   return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
                                UnflattenTupleRow<false>(&data, row);

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 565b5fa..4090ea8 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -93,12 +93,9 @@ class TupleRow;
 /// buffer is needed to keep the row being processed in-memory, but only default-sized
 /// buffers are needed for the other streams being written.
 ///
-/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
-/// to PrepareForRead() which deletes the stream's pages as it does a final read
-/// pass over the stream.
-///
-/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
-/// buffers to RowBatches.
+/// The tuple stream also supports a 'attach_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which attaches the stream's pages to the output batch as it
+/// does a final destructive read pass over the stream.
 ///
 /// Page layout:
 /// Rows are stored back to back starting at the first byte of each page's buffer, with
@@ -162,11 +159,11 @@ class TupleRow;
 /// Read:
 ///   1. Unpinned: Only a single read page is pinned at a time. This means that only
 ///     enough reservation to pin a single page is needed to read the stream, regardless
-///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
+///     of the stream's size. Each page is attached or unpinned (if attach on read is true
 ///     or false respectively) before advancing to the next page.
 ///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
-///     unpinned when reading from the stream. If delete on read is true, pages are
-///     deleted after being read. If the stream was previously unpinned, the page's data
+///     unpinned when reading from the stream. If attach on read is true, pages are
+///     attached after being read. If the stream was previously unpinned, the page's data
 ///     may not yet be in memory - reading from the stream can block on I/O or fail with
 ///     an I/O error.
 /// Write:
@@ -179,12 +176,17 @@ class TupleRow;
 ///     or free up other memory before retrying.
 ///
 /// Memory lifetime of rows read from stream:
-/// If the stream is pinned and delete on read is false, it is valid to access any tuples
-/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or
-/// delete on read is true, then the batch returned from GetNext() may have the
-/// needs_deep_copy flag set, which means that any tuple memory returned so far from the
-/// stream may be freed on the next call to GetNext().
-/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
+/// There are several cases.
+/// 1. If the stream is pinned and attach on read is false, it is valid to access any
+///    tuples returned via GetNext() until the stream is unpinned.
+/// 2. If the stream is in attach on read mode, all buffers referenced by returned rows
+///    are attached to the batches by that GetNext() call or a subsequent one. The
+///    caller is responsible for managing the lifetime of those buffers.
+/// 3. If the stream is unpinned and not in attach on read mode, then the batch returned
+///    from GetNext() may have the FLUSH_RESOURCES flag set, which means that any tuple
+///    memory returned so far from the stream may be freed on the next call to GetNext().
+///    It is *not* safe to return references to rows returned in this mode outside of
+///    the ExecNode.
 ///
 /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
 /// The BufferedTupleStream supports allocation of uninitialized rows with
@@ -197,8 +199,7 @@ class TupleRow;
 /// will not be modified until the stream is read via GetNext().
 /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
-/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
-/// page will need to be pinned soon.
+/// TODO: prefetching for pages could speed up iteration over unpinned streams.
 class BufferedTupleStream {
  public:
   /// A pointer to the start of a flattened TupleRow in the stream.
@@ -213,7 +214,7 @@ class BufferedTupleStream {
       int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
 
-  virtual ~BufferedTupleStream();
+  ~BufferedTupleStream() { DCHECK(closed_); }
 
   /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
   /// once before any of the other APIs.
@@ -233,23 +234,23 @@ class BufferedTupleStream {
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
   /// AddRow() or AddRowCustomBegin() call.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForReadWrite(
-      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+      bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
   /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
   /// over the stream, unless rows were read from the stream after PrepareForRead() or
-  /// PrepareForReadWrite() was called with delete_on_read = true.
-  /// 'delete_on_read': Pages are deleted after they are read.
+  /// PrepareForReadWrite() was called with attach_on_read = true.
+  /// 'attach_on_read': Pages are attached to the output batch after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
@@ -303,7 +304,10 @@ class BufferedTupleStream {
   enum UnpinMode {
     /// All pages in the stream are unpinned and the read/write positions in the stream
     /// are reset. No more rows can be written to the stream after this. The stream can
-    /// be re-read from the beginning by calling PrepareForRead().
+    /// be re-read from the beginning by calling PrepareForRead(). It in invalid to call
+    /// UnpinStream(UNPIN_ALL) if the stream is in 'attach_on_read' mode and >= 1 row has
+    /// been read from the stream, because this would leave the stream in limbo where it
+    /// still has unpinned pages but it cannot be read or written to.
     UNPIN_ALL,
     /// All pages are unpinned aside from the current read and write pages (if any),
     /// which is left in the same state. The unpinned stream can continue being read
@@ -315,14 +319,21 @@ class BufferedTupleStream {
   void UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
-  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
-  /// flag may be set on 'batch' to signal that memory will be freed on the next
-  /// call to GetNext() and that the caller should copy out any data it needs from
-  /// rows in 'batch' or in previous batches returned from GetNext().
   ///
-  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
+  /// If the stream is in 'attach_on_read' mode then buffers are attached to 'batch'
+  /// when the last row referencing the buffer is returned. The FLUSH_RESOURCES flag
+  /// is always set when attaching such a buffer.
+  /// TODO: always flushing for pinned streams is overkill since we may not need
+  /// to reuse the reservation immediately. Changing this may require modifying
+  /// callers of this class.
+  ///
+  /// If the stream is unpinned and not in 'attach_on_read' mode, the FLUSH_RESOURCES
+  /// flag may be set on the batch to signal that memory will be freed on the next call
+  /// to GetNext() and that the caller should copy out any data it needs from rows in
+  /// 'batch' or in previous batches returned from GetNext().
+  ///
+  /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
   /// rows will remain valid until the stream is unpinned, destroyed, etc.
-  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
   Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
   /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
@@ -370,8 +381,6 @@ class BufferedTupleStream {
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
-    Page() : num_rows(0), retrieved_buffer(true) {}
-
     inline int len() const { return handle.len(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
@@ -380,17 +389,27 @@ class BufferedTupleStream {
       retrieved_buffer = true;
       return Status::OK();
     }
+
+    /// Attach the buffer from this page to 'batch'. Only valid to call if the page is
+    /// pinned and 'retrieved_buffer' is true. Decrements parent->bytes_pinned_.
+    void AttachBufferToBatch(
+        BufferedTupleStream* parent, RowBatch* batch, RowBatch::FlushMode flush);
+
     std::string DebugString() const;
 
     BufferPool::PageHandle handle;
 
     /// Number of rows written to the page.
-    int num_rows;
+    int num_rows = 0;
 
     /// Whether we called GetBuffer() on the page since it was last pinned. This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
     /// returned rows referencing the page's buffer.
-    bool retrieved_buffer;
+    bool retrieved_buffer = true;
+
+    /// If the page was just attached to the output batch on the last GetNext() call while
+    /// in attach_on_read mode. If true, then 'handle' is closed.
+    bool attached_to_output_batch = false;
   };
 
   /// Runtime state instance used to check for cancellation. Not owned.
@@ -400,7 +419,7 @@ class BufferedTupleStream {
   const RowDescriptor* desc_;
 
   /// Plan node ID, used for error reporting.
-  int node_id_;
+  int node_id_ = -1;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -420,18 +439,18 @@ class BufferedTupleStream {
   /// List of pages in the stream.
   /// Empty iff one of two cases applies:
   /// * before the first row has been added with AddRow() or AddRowCustom().
-  /// * after the stream has been destructively read in 'delete_on_read' mode
+  /// * after the stream has been destructively read in 'attach_on_read' mode
   std::list<Page> pages_;
   // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages.
   // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561.
-  int64_t num_pages_;
+  int64_t num_pages_ = 0;
 
-  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
+  /// Total size of pages_, including any pages already deleted in 'attach_on_read'
   /// mode.
-  int64_t total_byte_size_;
+  int64_t total_byte_size_ = 0;
 
   /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_;
+  bool has_read_iterator_ = false;
 
   /// The current page being read. When no read iterator is active, equal to list.end().
   /// When a read iterator is active, either points to the current read page, or equals
@@ -447,31 +466,31 @@ class BufferedTupleStream {
   BufferPool::SubReservation read_page_reservation_;
 
   /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_;
+  uint32_t read_page_rows_returned_ = -1;
 
   /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_;
+  uint8_t* read_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_;
+  const uint8_t* read_end_ptr_ = nullptr;
 
   /// Pointer into write_page_ to the byte after the last row written.
-  uint8_t* write_ptr_;
+  uint8_t* write_ptr_ = nullptr;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
-  const uint8_t* write_end_ptr_;
+  const uint8_t* write_end_ptr_ = nullptr;
 
   /// Number of rows returned to the caller from GetNext() since the last
   /// PrepareForRead() call.
-  int64_t rows_returned_;
+  int64_t rows_returned_ = 0;
 
   /// True if there is currently an active write iterator into the stream.
-  bool has_write_iterator_;
+  bool has_write_iterator_ = false;
 
   /// The current page for writing. NULL if there is no write iterator or no current
   /// write page. Always pinned. Size is 'default_page_len_', except temporarily while
   /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
-  Page* write_page_;
+  Page* write_page_ = nullptr;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
   /// there is a write iterator, no page currently pinned for writing and the possibility
@@ -484,11 +503,11 @@ class BufferedTupleStream {
 
   /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
   /// to compute it.
-  int64_t bytes_pinned_;
+  int64_t bytes_pinned_ = 0;
 
   /// Number of rows stored in the stream. Includes rows that were already deleted during
-  /// a destructive 'delete_on_read' pass over the stream.
-  int64_t num_rows_;
+  /// a destructive 'attach_on_read' pass over the stream.
+  int64_t num_rows_ = 0;
 
   /// The default length in bytes of pages used to store the stream's rows. All rows that
   /// fit in a default-sized page are stored in default-sized page.
@@ -503,14 +522,14 @@ class BufferedTupleStream {
   const bool has_nullable_tuple_;
 
   /// If true, pages are deleted after they are read during this read pass. Once rows
-  /// have been read from a stream with 'delete_on_read_' true, this is always true.
-  bool delete_on_read_;
+  /// have been read from a stream with 'attach_on_read_' true, this is always true.
+  bool attach_on_read_ = false;
 
-  bool closed_; // Used for debugging.
+  bool closed_ = false; // Used for debugging.
 
   /// If true, this stream has been explicitly pinned by the caller and all pages are
   /// kept pinned until the caller calls UnpinStream().
-  bool pinned_;
+  bool pinned_ = true;
 
   bool is_read_page(const Page* page) const {
     return read_page_ != pages_.end() && &*read_page_ == page;
@@ -585,7 +604,7 @@ class BufferedTupleStream {
 
   /// Same as PrepareForRead(), except the iterators are not invalidated and
   /// the caller is assumed to have checked there is sufficient unused reservation.
-  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
+  Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
 
   /// Pins the next read page. This blocks reading from disk if necessary to bring the
   /// page's data into memory. Updates read_page_, read_ptr_, and
@@ -593,7 +612,9 @@ class BufferedTupleStream {
   Status NextReadPage() WARN_UNUSED_RESULT;
 
   /// Invalidate the read iterator, and release any resources associated with the active
-  /// iterator.
+  /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
+  /// because that would leave the stream in limbo where it still has pages but it is
+  /// invalid to read or write from in future.
   void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in write_page_ if
@@ -618,7 +639,8 @@ class BufferedTupleStream {
   void UnpinPageIfNeeded(Page* page, bool stream_pinned);
 
   /// Return the expected pin count for 'page' in the current stream based on the current
-  /// read and write pages and whether the stream is pinned.
+  /// read and write pages and whether the stream is pinned. Not valid to call if
+  /// the page was just deleted, i.e. page->attached_to_output_batch == false.
   int ExpectedPinCount(bool stream_pinned, const Page* page) const;
 
   /// Return true if the stream in its current state needs to have a reservation for

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 67adb9b..90d8c4d 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -418,6 +418,7 @@ class RowBatch {
   friend class RowBatchSerializeBaseline;
   friend class RowBatchSerializeBenchmark;
   friend class RowBatchSerializeTest;
+  friend class SimpleTupleStreamTest;
 
   /// Creates an empty row batch based on the serialized row batch header. Called from
   /// FromProtobuf() above before desrialization of a protobuf row batch.

http://git-wip-us.apache.org/repos/asf/impala/blob/240fde62/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 68f313d..91517f1 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -247,6 +247,10 @@ class Tuple {
     return static_cast<bool*>(GetSlot(offset));
   }
 
+  int32_t* GetIntSlot(int offset) {
+    return static_cast<int32_t*>(GetSlot(offset));
+  }
+
   int64_t* GetBigIntSlot(int offset) {
     return static_cast<int64_t*>(GetSlot(offset));
   }