You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/07 21:32:14 UTC

[2/2] impala git commit: Revert "IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS"

Revert "IMPALA-7333: remove MarkNeedsDeepCopy() in agg and BTS"

This reverts commit 240fde62d532c7166fc613a97b38c199cec09f1f.

The commit caused a regression: IMPALA-7403

Change-Id: Idbdae8a7b87e19ce30c83a817d940c0a689262a4
Reviewed-on: http://gerrit.cloudera.org:8080/11145
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d33d5b74
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d33d5b74
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d33d5b74

Branch: refs/heads/master
Commit: d33d5b74c24b452c23f705d51768a86e90d25772
Parents: 22f167f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Aug 7 10:25:20 2018 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Tue Aug 7 21:18:15 2018 +0000

----------------------------------------------------------------------
 be/src/exec/grouping-aggregator.cc           |   8 +-
 be/src/runtime/buffered-tuple-stream-test.cc | 249 ++--------------------
 be/src/runtime/buffered-tuple-stream.cc      | 145 +++++--------
 be/src/runtime/buffered-tuple-stream.h       | 138 +++++-------
 be/src/runtime/row-batch.h                   |   1 -
 be/src/runtime/tuple.h                       |   4 -
 6 files changed, 137 insertions(+), 408 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/exec/grouping-aggregator.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 42d6be8..092ecfd 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -294,12 +294,8 @@ Status GroupingAggregator::GetRowsFromPartition(
 
   COUNTER_SET(rows_returned_counter_, num_rows_returned_);
   partition_eos_ = ReachedLimit();
-  if (partition_eos_ || output_iterator_.AtEnd()) {
-    // Attach all buffers referenced by previously-returned rows. On the next GetNext()
-    // call we will close the partition.
-    output_partition_->aggregated_row_stream->Close(
-        row_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
-  }
+  if (output_iterator_.AtEnd()) row_batch->MarkNeedsDeepCopy();
+
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 6ff9805..ef66824 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -398,22 +398,6 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestTransferMemory(bool pinned_stream, bool read_write);
 
-  void TestAttachMemory(bool pinned_stream, bool attach_on_read);
-
-  void TestFlushResourcesReadWrite(bool pinned_stream, bool attach_on_read);
-
-  /// Helper for TestFlushResourcesReadWrite() to write and read back rows from
-  /// *stream. 'append_batch_size' is the number of rows to append at a time before
-  /// reading them back. *num_buffers_attached tracks the number of buffers attached
-  /// to the output batch.
-  void AppendToReadWriteStream(int64_t append_batch_size, int64_t buffer_size,
-      int* num_buffers_attached, BufferedTupleStream* stream);
-
-  // Helper for AppendToReadWriteStream() to verify 'out_batch' contents. The value of
-  // row i of 'out_batch' is expected to be the same as the row at index
-  // (i + start_index) % out_batch->num_rows() of 'in_batch'.
-  void VerifyReadWriteBatch(RowBatch* in_batch, RowBatch* out_batch, int64_t start_index);
-
   // Helper to writes 'row' comprised of only string slots to 'data'. The expected
   // length of the data written is 'expected_len'.
   void WriteStringRow(const RowDescriptor* row_desc, TupleRow* row, int64_t fixed_size,
@@ -665,13 +649,13 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
   ASSERT_TRUE(pinned);
 
   // Read and verify result a few times. We should be able to reread the stream if
-  // we don't use attach on read mode.
+  // we don't use delete on read mode.
   int read_iters = 3;
   for (int i = 0; i < read_iters; ++i) {
-    bool attach_on_read = i == read_iters - 1;
+    bool delete_on_read = i == read_iters - 1;
     if (i > 0 || !read_write) {
       bool got_read_reservation;
-      ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+      ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
       ASSERT_TRUE(got_read_reservation);
     }
 
@@ -686,13 +670,15 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
     }
   }
 
-  // After attach_on_read, all buffers should have been attached to the output batches
-  // on previous GetNext() calls.
-  ASSERT_EQ(0, stream.BytesPinned(false));
+  // After delete_on_read, all blocks aside from the last should be deleted.
+  // Note: this should really be 0, but the BufferedTupleStream returns eos before
+  // deleting the last block, rather than after, so the last block isn't deleted
+  // until the stream is closed.
+  ASSERT_EQ(stream.BytesPinned(false), buffer_size);
 
   stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
 
-  ASSERT_EQ(0, stream.BytesPinned(false));
+  ASSERT_EQ(stream.BytesPinned(false), 0);
 }
 
 TEST_F(SimpleTupleStreamTest, UnpinPin) {
@@ -779,205 +765,6 @@ TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
   TestTransferMemory(false, false);
 }
 
-/// Test iteration over a stream with and without attaching memory.
-void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_read) {
-  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
-  // make the batch at capacity.
-  int buffer_size = 4 * 1024;
-  Init(100 * buffer_size);
-
-  BufferedTupleStream stream(
-      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
-  ASSERT_OK(stream.Init(-1, pin_stream));
-  bool got_write_reservation;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
-  ASSERT_TRUE(got_write_reservation);
-  RowBatch* in_batch = CreateIntBatch(0, 1024, false);
-
-  // Construct a stream with 4 pages.
-  const int total_num_pages = 4;
-  while (stream.byte_size() < total_num_pages * buffer_size) {
-    Status status;
-    for (int i = 0; i < in_batch->num_rows(); ++i) {
-      bool ret = stream.AddRow(in_batch->GetRow(i), &status);
-      EXPECT_TRUE(ret);
-      ASSERT_OK(status);
-    }
-  }
-
-  RowBatch* out_batch = pool_.Add(new RowBatch(int_desc_, 100, &tracker_));
-  int num_buffers_attached = 0;
-  int num_flushes = 0;
-  int64_t num_rows_returned = 0;
-  bool got_read_reservation;
-  ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
-  ASSERT_TRUE(got_read_reservation);
-  bool eos;
-  do {
-    ASSERT_EQ(0, out_batch->num_buffers());
-    ASSERT_OK(stream.GetNext(out_batch, &eos));
-    EXPECT_LE(out_batch->num_buffers(), 1) << "Should only attach one buffer at a time";
-    if (out_batch->num_buffers() > 0) {
-      EXPECT_TRUE(out_batch->AtCapacity()) << "Flush resources flag should have been set";
-    }
-    num_buffers_attached += out_batch->num_buffers();
-    for (int i = 0; i < out_batch->num_rows(); ++i) {
-      int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
-      TupleRow* in_row = in_batch->GetRow(num_rows_returned % in_batch->num_rows());
-      EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
-          *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
-      ++num_rows_returned;
-    }
-    num_flushes += out_batch->flush_mode() == RowBatch::FlushMode::FLUSH_RESOURCES;
-    out_batch->Reset();
-  } while (!eos);
-
-  if (attach_on_read) {
-    EXPECT_EQ(4, num_buffers_attached) << "All buffers attached during iteration.";
-  } else {
-    EXPECT_EQ(0, num_buffers_attached) << "No buffers attached during iteration.";
-  }
-  if (attach_on_read || !pin_stream) EXPECT_EQ(4, num_flushes);
-  out_batch->Reset();
-  stream.Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
-  if (attach_on_read) {
-    EXPECT_EQ(0, out_batch->num_buffers());
-  } else if (pin_stream) {
-    // All buffers should be attached.
-    EXPECT_EQ(4, out_batch->num_buffers());
-  } else {
-    // Buffer from last pinned page should be attached.
-    EXPECT_EQ(1, out_batch->num_buffers());
-  }
-  in_batch->Reset();
-  out_batch->Reset();
-}
-
-TEST_F(SimpleTupleStreamTest, TestAttachMemoryPinned) {
-  TestAttachMemory(true, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryPinned) {
-  TestAttachMemory(true, false);
-}
-
-TEST_F(SimpleTupleStreamTest, TestAttachMemoryUnpinned) {
-  TestAttachMemory(false, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestNoAttachMemoryUnpinned) {
-  TestAttachMemory(false, false);
-}
-
-// Test for advancing the read/write page with resource flushing.
-void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
-    bool pin_stream, bool attach_on_read) {
-  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
-  // make the batch at capacity.
-  const int BUFFER_SIZE = 512;
-  const int BATCH_SIZE = 100;
-  // For unpinned streams, we should be able to iterate with only two buffers.
-  const int MAX_PINNED_PAGES = pin_stream ? 1000 : 2;
-  Init(MAX_PINNED_PAGES * BUFFER_SIZE);
-
-  BufferedTupleStream stream(
-      runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
-  ASSERT_OK(stream.Init(-1, pin_stream));
-  bool got_reservation;
-  ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
-  ASSERT_TRUE(got_reservation);
-  int num_buffers_attached = 0;
-  /// Read over the page in different increments.
-  for (int append_batch_size : {1, 10, 100, 1000}) {
-    AppendToReadWriteStream(
-        append_batch_size, BUFFER_SIZE, &num_buffers_attached, &stream);
-  }
-
-  if (attach_on_read) {
-    EXPECT_EQ(stream.byte_size() / BUFFER_SIZE - 1, num_buffers_attached)
-        << "All buffers except the current write page should have been attached";
-  } else {
-    EXPECT_EQ(0, num_buffers_attached);
-  }
-
-  RowBatch* final_out_batch = pool_.Add(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
-  stream.Close(final_out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
-  final_out_batch->Reset();
-}
-
-void SimpleTupleStreamTest::AppendToReadWriteStream(int64_t append_batch_size,
-    int64_t buffer_size, int* num_buffers_attached, BufferedTupleStream* stream) {
-  RowBatch* in_batch = CreateIntBatch(0, BATCH_SIZE, false);
-
-  /// Accumulate row batches until we see a flush. The contents of the batches should
-  /// remain valid until reset or delete trailing batches.
-  vector<unique_ptr<RowBatch>> out_batches;
-  // The start row index of each batch in 'out_batches'.
-  vector<int64_t> out_batch_start_indices;
-  // Iterate over at least 10 pages.
-  int64_t start_byte_size = stream->byte_size();
-  while (stream->byte_size() - start_byte_size < 10 * buffer_size) {
-    Status status;
-    for (int i = 0; i < append_batch_size; ++i) {
-      bool ret = stream->AddRow(
-          in_batch->GetRow(stream->num_rows() % in_batch->num_rows()), &status);
-      EXPECT_TRUE(ret);
-      ASSERT_OK(status);
-    }
-    int64_t rows_read = 0;
-    bool eos;
-    while (rows_read < append_batch_size) {
-      out_batches.emplace_back(new RowBatch(int_desc_, BATCH_SIZE, &tracker_));
-      out_batch_start_indices.push_back(stream->rows_returned());
-      ASSERT_OK(stream->GetNext(out_batches.back().get(), &eos));
-      // Verify the contents of all valid batches to make sure that they haven't become
-      // invalid.
-      LOG(INFO) << "Verifying " << out_batches.size() << " batches";
-      for (int i = 0; i < out_batches.size(); ++i) {
-        VerifyReadWriteBatch(in_batch, out_batches[i].get(), out_batch_start_indices[i]);
-      }
-      *num_buffers_attached += out_batches.back()->num_buffers();
-      rows_read += out_batches.back()->num_rows();
-      EXPECT_EQ(rows_read == append_batch_size, eos);
-      if (out_batches.back().get()->flush_mode()
-          == RowBatch::FlushMode::FLUSH_RESOURCES) {
-        out_batches.clear();
-        out_batch_start_indices.clear();
-      }
-    }
-    EXPECT_EQ(append_batch_size, rows_read);
-    EXPECT_EQ(true, eos);
-  }
-  in_batch->Reset();
-}
-
-void SimpleTupleStreamTest::VerifyReadWriteBatch(
-    RowBatch* in_batch, RowBatch* out_batch, int64_t start_index) {
-  int slot_offset = int_desc_->tuple_descriptors()[0]->slots()[0]->tuple_offset();
-  int64_t row_index = start_index;
-  for (int i = 0; i < out_batch->num_rows(); ++i) {
-    TupleRow* in_row = in_batch->GetRow(row_index++ % in_batch->num_rows());
-    EXPECT_EQ(*in_row->GetTuple(0)->GetIntSlot(slot_offset),
-        *out_batch->GetRow(i)->GetTuple(0)->GetIntSlot(slot_offset));
-  }
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedAttach) {
-  TestFlushResourcesReadWrite(true, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWritePinnedNoAttach) {
-  TestFlushResourcesReadWrite(true, false);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedAttach) {
-  TestFlushResourcesReadWrite(false, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TestFlushResourcesReadWriteUnpinnedNoAttach) {
-  TestFlushResourcesReadWrite(false, false);
-}
-
 // Test that tuple stream functions if it references strings outside stream. The
 // aggregation node relies on this since it updates tuples in-place.
 TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
@@ -1018,11 +805,11 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 
   DCHECK_EQ(rows_added, stream.num_rows());
 
-  for (int attach_on_read = 0; attach_on_read <= 1; ++attach_on_read) {
+  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
     // Keep stream in memory and test we can read ok.
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
@@ -1147,8 +934,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   vector<uint8_t> tuple_mem(tuple_desc->byte_size());
   Tuple* write_tuple = reinterpret_cast<Tuple*>(tuple_mem.data());
   write_row->SetTuple(0, write_tuple);
-  StringValue* write_str =
-      write_tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+  StringValue* write_str = reinterpret_cast<StringValue*>(
+      write_tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
   // Make the string large enough to fill a page.
   const int64_t string_len = BIG_ROW_BYTES - tuple_desc->byte_size();
   vector<char> data(string_len);
@@ -1174,7 +961,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_TRUE(eos);
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+    StringValue* str = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       EXPECT_EQ(i, str->ptr[j]) << j;
@@ -1200,7 +988,8 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
     EXPECT_EQ(1, read_batch.num_rows());
     EXPECT_EQ(eos, i == MAX_BUFFERS) << i;
     Tuple* tuple = read_batch.GetRow(0)->GetTuple(0);
-    StringValue* str = tuple->GetStringSlot(tuple_desc->slots()[0]->tuple_offset());
+    StringValue* str = reinterpret_cast<StringValue*>(
+        tuple->GetSlot(tuple_desc->slots()[0]->tuple_offset()));
     EXPECT_EQ(string_len, str->len);
     for (int j = 0; j < string_len; ++j) {
       ASSERT_EQ(i, str->ptr[j]) << j;
@@ -1328,10 +1117,10 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   }
 
   for (int i = 0; i < 3; ++i) {
-    bool attach_on_read = i == 2;
+    bool delete_on_read = i == 2;
     vector<StringValue> results;
     bool got_read_reservation;
-    ASSERT_OK(stream.PrepareForRead(attach_on_read, &got_read_reservation));
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
     ASSERT_TRUE(got_read_reservation);
     ReadValues(&stream, string_desc_, &results);
     VerifyResults<StringValue>(*string_desc_, results, rows_added, false);

http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 71175d1..9326507 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,20 +47,36 @@ using namespace impala;
 using namespace strings;
 
 using BufferHandle = BufferPool::BufferHandle;
-using FlushMode = RowBatch::FlushMode;
 
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
+    node_id_(-1),
     buffer_pool_(state->exec_env()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
+    num_pages_(0),
+    total_byte_size_(0),
+    has_read_iterator_(false),
     read_page_reservation_(buffer_pool_client_),
+    read_page_rows_returned_(-1),
+    read_ptr_(nullptr),
+    read_end_ptr_(nullptr),
+    write_ptr_(nullptr),
+    write_end_ptr_(nullptr),
+    rows_returned_(0),
+    has_write_iterator_(false),
+    write_page_(nullptr),
     write_page_reservation_(buffer_pool_client_),
+    bytes_pinned_(0),
+    num_rows_(0),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()) {
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
+    delete_on_read_(false),
+    closed_(false),
+    pinned_(true) {
   DCHECK_GE(max_page_len, default_page_len);
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
@@ -94,6 +110,10 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   }
 }
 
+BufferedTupleStream::~BufferedTupleStream() {
+  DCHECK(closed_);
+}
+
 void BufferedTupleStream::CheckConsistencyFull() const {
   CheckConsistencyFast();
   // The below checks require iterating over all the pages in the stream.
@@ -119,13 +139,11 @@ void BufferedTupleStream::CheckConsistencyFast() const {
   DCHECK(has_read_iterator() || read_page_ == pages_.end());
   if (read_page_ != pages_.end()) {
     CheckPageConsistency(&*read_page_);
-    if (!read_page_->attached_to_output_batch) {
-      DCHECK(read_page_->is_pinned());
-      DCHECK(read_page_->retrieved_buffer);
-      // Can't check read buffer without affecting behaviour, because a read may be in
-      // flight and this would required blocking on that write.
-      DCHECK_GE(read_end_ptr_, read_ptr_);
-    }
+    DCHECK(read_page_->is_pinned());
+    DCHECK(read_page_->retrieved_buffer);
+    // Can't check read buffer without affecting behaviour, because a read may be in
+    // flight and this would required blocking on that write.
+    DCHECK_GE(read_end_ptr_, read_ptr_);
   }
   if (NeedReadReservation()) {
     DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
@@ -141,12 +159,6 @@ void BufferedTupleStream::CheckConsistencyFast() const {
 }
 
 void BufferedTupleStream::CheckPageConsistency(const Page* page) const {
-  if (page->attached_to_output_batch) {
-    /// Read page was just attached to output batch.
-    DCHECK(is_read_page(page)) << page->DebugString();
-    DCHECK(!page->handle.is_open());
-    return;
-  }
   DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString();
   // Only one large row per page.
   if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
@@ -158,7 +170,7 @@ string BufferedTupleStream::DebugString() const {
   stringstream ss;
   ss << "BufferedTupleStream num_rows=" << num_rows_
      << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " attach_on_read=" << attach_on_read_ << " closed=" << closed_ << "\n"
+     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
      << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_
      << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_
      << " read_page=";
@@ -189,23 +201,8 @@ string BufferedTupleStream::DebugString() const {
   return ss.str();
 }
 
-void BufferedTupleStream::Page::AttachBufferToBatch(
-    BufferedTupleStream* parent, RowBatch* batch, FlushMode flush) {
-  DCHECK(is_pinned());
-  DCHECK(retrieved_buffer);
-  parent->bytes_pinned_ -= len();
-  // ExtractBuffer() cannot fail because the buffer is already in memory.
-  BufferPool::BufferHandle buffer;
-  Status status =
-      parent->buffer_pool_->ExtractBuffer(parent->buffer_pool_client_, &handle, &buffer);
-  DCHECK(status.ok());
-  batch->AddBuffer(parent->buffer_pool_client_, move(buffer), flush);
-  attached_to_output_batch = true;
-}
-
 string BufferedTupleStream::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1 retrived_buffer=$2 attached_to_output_batch=$3",
-      handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
+  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
 }
 
 Status BufferedTupleStream::Init(int node_id, bool pinned) {
@@ -217,7 +214,7 @@ Status BufferedTupleStream::Init(int node_id, bool pinned) {
 Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!attach_on_read_);
+  DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -232,10 +229,10 @@ Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) {
 }
 
 Status BufferedTupleStream::PrepareForReadWrite(
-    bool attach_on_read, bool* got_reservation) {
+    bool delete_on_read, bool* got_reservation) {
   // This must be the first iterator created.
   DCHECK(pages_.empty());
-  DCHECK(!attach_on_read_);
+  DCHECK(!delete_on_read_);
   DCHECK(!has_write_iterator());
   DCHECK(!has_read_iterator());
   CHECK_CONSISTENCY_FULL();
@@ -246,17 +243,20 @@ Status BufferedTupleStream::PrepareForReadWrite(
   // Save reservation for both the read and write iterators.
   buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_);
   buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(attach_on_read));
+  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
   return Status::OK();
 }
 
-void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
+void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
   for (Page& page : pages_) {
-    if (page.attached_to_output_batch) continue; // Already returned.
     if (batch != nullptr && page.retrieved_buffer) {
       // Subtle: We only need to attach buffers from pages that we may have returned
-      // references to.
-      page.AttachBufferToBatch(this, batch, flush);
+      // references to. ExtractBuffer() cannot fail for these pages because the data
+      // is guaranteed to already be in -memory.
+      BufferPool::BufferHandle buffer;
+      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+      DCHECK(status.ok());
+      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
     } else {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
     }
@@ -271,9 +271,7 @@ void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
 
 int64_t BufferedTupleStream::CalcBytesPinned() const {
   int64_t result = 0;
-  for (const Page& page : pages_) {
-    if (!page.attached_to_output_batch) result += page.pin_count() * page.len();
-  }
+  for (const Page& page : pages_) result += page.pin_count() * page.len();
   return result;
 }
 
@@ -284,7 +282,6 @@ Status BufferedTupleStream::PinPage(Page* page) {
 }
 
 int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const {
-  DCHECK(!page->attached_to_output_batch);
   return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
 }
 
@@ -486,11 +483,12 @@ Status BufferedTupleStream::NextReadPage() {
         && !NeedReadReservation(pinned_, num_pages_, true, true)) {
       buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
     }
-  } else if (attach_on_read_) {
+  } else if (delete_on_read_) {
     DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
                                          << DebugString();
     DCHECK_NE(&*read_page_, write_page_);
-    DCHECK(read_page_->attached_to_output_batch);
+    bytes_pinned_ -= pages_.front().len();
+    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
     pages_.pop_front();
     --num_pages_;
     read_page_ = pages_.begin();
@@ -559,13 +557,12 @@ void BufferedTupleStream::InvalidateReadIterator() {
   if (read_page_reservation_.GetReservation() > 0) {
     buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_);
   }
-  // It is safe to re-read an attach-on-read stream if no rows were read and no pages
+  // It is safe to re-read a delete-on-read stream if no rows were read and no pages
   // were therefore deleted.
-  DCHECK(attach_on_read_ == false || rows_returned_ == 0);
-  if (rows_returned_ == 0) attach_on_read_ = false;
+  if (rows_returned_ == 0) delete_on_read_ = false;
 }
 
-Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reservation) {
+Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) {
   CHECK_CONSISTENCY_FULL();
   InvalidateWriteIterator();
   InvalidateReadIterator();
@@ -573,12 +570,12 @@ Status BufferedTupleStream::PrepareForRead(bool attach_on_read, bool* got_reserv
   *got_reservation = pinned_ || pages_.empty()
       || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
   if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(attach_on_read);
+  return PrepareForReadInternal(delete_on_read);
 }
 
-Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
+Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) {
   DCHECK(!closed_);
-  DCHECK(!attach_on_read_);
+  DCHECK(!delete_on_read_);
   DCHECK(!has_read_iterator());
 
   has_read_iterator_ = true;
@@ -602,7 +599,7 @@ Status BufferedTupleStream::PrepareForReadInternal(bool attach_on_read) {
   }
   read_page_rows_returned_ = 0;
   rows_returned_ = 0;
-  attach_on_read_ = attach_on_read;
+  delete_on_read_ = delete_on_read;
   CHECK_CONSISTENCY_FULL();
   return Status::OK();
 }
@@ -711,15 +708,6 @@ Status BufferedTupleStream::GetNextInternal(
 
   if (UNLIKELY(read_page_ == pages_.end()
           || read_page_rows_returned_ == read_page_->num_rows)) {
-    if (read_page_ != pages_.end() && attach_on_read_
-        && !read_page_->attached_to_output_batch) {
-      DCHECK(has_write_iterator());
-      // We're in a read-write stream in the case where we're at the end of the read page
-      // but the buffer was not attached on the last GetNext() call because the write
-      // iterator had not yet advanced.
-      read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
-      return Status::OK();
-    }
     // Get the next page in the stream (or the first page if read_page_ was not yet
     // initialized.) We need to do this at the beginning of the GetNext() call to ensure
     // the buffer management semantics. NextReadPage() may unpin or delete the buffer
@@ -741,7 +729,7 @@ Status BufferedTupleStream::GetNextInternal(
   // null tuple indicator.
   if (FILL_FLAT_ROWS) {
     DCHECK(flat_rows != nullptr);
-    DCHECK(!attach_on_read_);
+    DCHECK(!delete_on_read_);
     DCHECK_EQ(batch->num_rows(), 0);
     flat_rows->clear();
     flat_rows->reserve(rows_to_fill);
@@ -780,28 +768,11 @@ Status BufferedTupleStream::GetNextInternal(
   rows_returned_ += rows_to_fill;
   read_page_rows_returned_ += rows_to_fill;
   *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows) {
-    // No more data in this page. NextReadPage() may need to reuse the reservation
-    // currently used for 'read_page_' so we may need to flush resources. When
-    // 'attach_on_read_' is true, we're returning the buffer. Otherwise the buffer will
-    // be unpinned later but we're returning a reference to the memory so we need to
-    // signal to the caller that the resources are going away. Note that if there is a
-    // read-write page it is not safe to attach the buffer yet because more rows may be
-    // appended to the page.
-    if (attach_on_read_) {
-      if (!has_read_write_page()) {
-        // Safe to attach because we already called GetBuffer() in NextReadPage().
-        // TODO: always flushing for pinned streams is overkill since we may not need
-        // to reuse the reservation immediately. Changing this may require modifying
-        // callers of this class.
-        read_page_->AttachBufferToBatch(this, batch, FlushMode::FLUSH_RESOURCES);
-      }
-    } else if (!pinned_) {
-      // Flush resources so that we can safely unpin the page on the next GetNext() call.
-      // Note that if this is a read/write page we might not actually do the advance on
-      // the next call to GetNext(). In that case the flush is still safe to do.
-      batch->MarkFlushResources();
-    }
+  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
+    // No more data in this page. The batch must be immediately returned up the operator
+    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
+    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
+    batch->MarkNeedsDeepCopy();
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
   DCHECK_LE(read_ptr_, read_end_ptr_);
@@ -1057,7 +1028,7 @@ void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const
   DCHECK(row != nullptr);
   DCHECK(!closed_);
   DCHECK(is_pinned());
-  DCHECK(!attach_on_read_);
+  DCHECK(!delete_on_read_);
   uint8_t* data = flat_row;
   return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
                                UnflattenTupleRow<false>(&data, row);

http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 4090ea8..565b5fa 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -93,9 +93,12 @@ class TupleRow;
 /// buffer is needed to keep the row being processed in-memory, but only default-sized
 /// buffers are needed for the other streams being written.
 ///
-/// The tuple stream also supports a 'attach_on_read' mode, enabled by passing a flag
-/// to PrepareForRead() which attaches the stream's pages to the output batch as it
-/// does a final destructive read pass over the stream.
+/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which deletes the stream's pages as it does a final read
+/// pass over the stream.
+///
+/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
+/// buffers to RowBatches.
 ///
 /// Page layout:
 /// Rows are stored back to back starting at the first byte of each page's buffer, with
@@ -159,11 +162,11 @@ class TupleRow;
 /// Read:
 ///   1. Unpinned: Only a single read page is pinned at a time. This means that only
 ///     enough reservation to pin a single page is needed to read the stream, regardless
-///     of the stream's size. Each page is attached or unpinned (if attach on read is true
+///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
 ///     or false respectively) before advancing to the next page.
 ///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
-///     unpinned when reading from the stream. If attach on read is true, pages are
-///     attached after being read. If the stream was previously unpinned, the page's data
+///     unpinned when reading from the stream. If delete on read is true, pages are
+///     deleted after being read. If the stream was previously unpinned, the page's data
 ///     may not yet be in memory - reading from the stream can block on I/O or fail with
 ///     an I/O error.
 /// Write:
@@ -176,17 +179,12 @@ class TupleRow;
 ///     or free up other memory before retrying.
 ///
 /// Memory lifetime of rows read from stream:
-/// There are several cases.
-/// 1. If the stream is pinned and attach on read is false, it is valid to access any
-///    tuples returned via GetNext() until the stream is unpinned.
-/// 2. If the stream is in attach on read mode, all buffers referenced by returned rows
-///    are attached to the batches by that GetNext() call or a subsequent one. The
-///    caller is responsible for managing the lifetime of those buffers.
-/// 3. If the stream is unpinned and not in attach on read mode, then the batch returned
-///    from GetNext() may have the FLUSH_RESOURCES flag set, which means that any tuple
-///    memory returned so far from the stream may be freed on the next call to GetNext().
-///    It is *not* safe to return references to rows returned in this mode outside of
-///    the ExecNode.
+/// If the stream is pinned and delete on read is false, it is valid to access any tuples
+/// returned via GetNext() until the stream is unpinned. If the stream is unpinned or
+/// delete on read is true, then the batch returned from GetNext() may have the
+/// needs_deep_copy flag set, which means that any tuple memory returned so far from the
+/// stream may be freed on the next call to GetNext().
+/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
 ///
 /// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
 /// The BufferedTupleStream supports allocation of uninitialized rows with
@@ -199,7 +197,8 @@ class TupleRow;
 /// will not be modified until the stream is read via GetNext().
 /// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
 ///
-/// TODO: prefetching for pages could speed up iteration over unpinned streams.
+/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
+/// page will need to be pinned soon.
 class BufferedTupleStream {
  public:
   /// A pointer to the start of a flattened TupleRow in the stream.
@@ -214,7 +213,7 @@ class BufferedTupleStream {
       int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
 
-  ~BufferedTupleStream() { DCHECK(closed_); }
+  virtual ~BufferedTupleStream();
 
   /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
   /// once before any of the other APIs.
@@ -234,23 +233,23 @@ class BufferedTupleStream {
   /// Prepares the stream for interleaved reads and writes by saving enough reservation
   /// for default-sized read and write pages. Called after Init() and before the first
   /// AddRow() or AddRowCustomBegin() call.
-  /// 'attach_on_read': Pages are attached to the output batch after they are read.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     read and write pages and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
   Status PrepareForReadWrite(
-      bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for reading, invalidating the write iterator (if there is one).
   /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before
   /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
   /// over the stream, unless rows were read from the stream after PrepareForRead() or
-  /// PrepareForReadWrite() was called with attach_on_read = true.
-  /// 'attach_on_read': Pages are attached to the output batch after they are read.
+  /// PrepareForReadWrite() was called with delete_on_read = true.
+  /// 'delete_on_read': Pages are deleted after they are read.
   /// 'got_reservation': set to true if there was enough reservation to initialize the
   ///     first read page and false if there was not enough reservation and no other
   ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool attach_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
 
   /// Adds a single row to the stream. There are three possible outcomes:
   /// a) The append succeeds. True is returned.
@@ -304,10 +303,7 @@ class BufferedTupleStream {
   enum UnpinMode {
     /// All pages in the stream are unpinned and the read/write positions in the stream
     /// are reset. No more rows can be written to the stream after this. The stream can
-    /// be re-read from the beginning by calling PrepareForRead(). It in invalid to call
-    /// UnpinStream(UNPIN_ALL) if the stream is in 'attach_on_read' mode and >= 1 row has
-    /// been read from the stream, because this would leave the stream in limbo where it
-    /// still has unpinned pages but it cannot be read or written to.
+    /// be re-read from the beginning by calling PrepareForRead().
     UNPIN_ALL,
     /// All pages are unpinned aside from the current read and write pages (if any),
     /// which is left in the same state. The unpinned stream can continue being read
@@ -319,21 +315,14 @@ class BufferedTupleStream {
   void UnpinStream(UnpinMode mode);
 
   /// Get the next batch of output rows, which are backed by the stream's memory.
+  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
+  /// flag may be set on 'batch' to signal that memory will be freed on the next
+  /// call to GetNext() and that the caller should copy out any data it needs from
+  /// rows in 'batch' or in previous batches returned from GetNext().
   ///
-  /// If the stream is in 'attach_on_read' mode then buffers are attached to 'batch'
-  /// when the last row referencing the buffer is returned. The FLUSH_RESOURCES flag
-  /// is always set when attaching such a buffer.
-  /// TODO: always flushing for pinned streams is overkill since we may not need
-  /// to reuse the reservation immediately. Changing this may require modifying
-  /// callers of this class.
-  ///
-  /// If the stream is unpinned and not in 'attach_on_read' mode, the FLUSH_RESOURCES
-  /// flag may be set on the batch to signal that memory will be freed on the next call
-  /// to GetNext() and that the caller should copy out any data it needs from rows in
-  /// 'batch' or in previous batches returned from GetNext().
-  ///
-  /// If the stream is pinned and 'attach_on_read' is false, the memory backing the
+  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
   /// rows will remain valid until the stream is unpinned, destroyed, etc.
+  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
   Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
 
   /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
@@ -381,6 +370,8 @@ class BufferedTupleStream {
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
+    Page() : num_rows(0), retrieved_buffer(true) {}
+
     inline int len() const { return handle.len(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
@@ -389,27 +380,17 @@ class BufferedTupleStream {
       retrieved_buffer = true;
       return Status::OK();
     }
-
-    /// Attach the buffer from this page to 'batch'. Only valid to call if the page is
-    /// pinned and 'retrieved_buffer' is true. Decrements parent->bytes_pinned_.
-    void AttachBufferToBatch(
-        BufferedTupleStream* parent, RowBatch* batch, RowBatch::FlushMode flush);
-
     std::string DebugString() const;
 
     BufferPool::PageHandle handle;
 
     /// Number of rows written to the page.
-    int num_rows = 0;
+    int num_rows;
 
     /// Whether we called GetBuffer() on the page since it was last pinned. This means
     /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
     /// returned rows referencing the page's buffer.
-    bool retrieved_buffer = true;
-
-    /// If the page was just attached to the output batch on the last GetNext() call while
-    /// in attach_on_read mode. If true, then 'handle' is closed.
-    bool attached_to_output_batch = false;
+    bool retrieved_buffer;
   };
 
   /// Runtime state instance used to check for cancellation. Not owned.
@@ -419,7 +400,7 @@ class BufferedTupleStream {
   const RowDescriptor* desc_;
 
   /// Plan node ID, used for error reporting.
-  int node_id_ = -1;
+  int node_id_;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -439,18 +420,18 @@ class BufferedTupleStream {
   /// List of pages in the stream.
   /// Empty iff one of two cases applies:
   /// * before the first row has been added with AddRow() or AddRowCustom().
-  /// * after the stream has been destructively read in 'attach_on_read' mode
+  /// * after the stream has been destructively read in 'delete_on_read' mode
   std::list<Page> pages_;
   // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages.
   // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561.
-  int64_t num_pages_ = 0;
+  int64_t num_pages_;
 
-  /// Total size of pages_, including any pages already deleted in 'attach_on_read'
+  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
   /// mode.
-  int64_t total_byte_size_ = 0;
+  int64_t total_byte_size_;
 
   /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_ = false;
+  bool has_read_iterator_;
 
   /// The current page being read. When no read iterator is active, equal to list.end().
   /// When a read iterator is active, either points to the current read page, or equals
@@ -466,31 +447,31 @@ class BufferedTupleStream {
   BufferPool::SubReservation read_page_reservation_;
 
   /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_ = -1;
+  uint32_t read_page_rows_returned_;
 
   /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_ = nullptr;
+  uint8_t* read_ptr_;
 
   /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_ = nullptr;
+  const uint8_t* read_end_ptr_;
 
   /// Pointer into write_page_ to the byte after the last row written.
-  uint8_t* write_ptr_ = nullptr;
+  uint8_t* write_ptr_;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
-  const uint8_t* write_end_ptr_ = nullptr;
+  const uint8_t* write_end_ptr_;
 
   /// Number of rows returned to the caller from GetNext() since the last
   /// PrepareForRead() call.
-  int64_t rows_returned_ = 0;
+  int64_t rows_returned_;
 
   /// True if there is currently an active write iterator into the stream.
-  bool has_write_iterator_ = false;
+  bool has_write_iterator_;
 
   /// The current page for writing. NULL if there is no write iterator or no current
   /// write page. Always pinned. Size is 'default_page_len_', except temporarily while
   /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
-  Page* write_page_ = nullptr;
+  Page* write_page_;
 
   /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if
   /// there is a write iterator, no page currently pinned for writing and the possibility
@@ -503,11 +484,11 @@ class BufferedTupleStream {
 
   /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
   /// to compute it.
-  int64_t bytes_pinned_ = 0;
+  int64_t bytes_pinned_;
 
   /// Number of rows stored in the stream. Includes rows that were already deleted during
-  /// a destructive 'attach_on_read' pass over the stream.
-  int64_t num_rows_ = 0;
+  /// a destructive 'delete_on_read' pass over the stream.
+  int64_t num_rows_;
 
   /// The default length in bytes of pages used to store the stream's rows. All rows that
   /// fit in a default-sized page are stored in default-sized page.
@@ -522,14 +503,14 @@ class BufferedTupleStream {
   const bool has_nullable_tuple_;
 
   /// If true, pages are deleted after they are read during this read pass. Once rows
-  /// have been read from a stream with 'attach_on_read_' true, this is always true.
-  bool attach_on_read_ = false;
+  /// have been read from a stream with 'delete_on_read_' true, this is always true.
+  bool delete_on_read_;
 
-  bool closed_ = false; // Used for debugging.
+  bool closed_; // Used for debugging.
 
   /// If true, this stream has been explicitly pinned by the caller and all pages are
   /// kept pinned until the caller calls UnpinStream().
-  bool pinned_ = true;
+  bool pinned_;
 
   bool is_read_page(const Page* page) const {
     return read_page_ != pages_.end() && &*read_page_ == page;
@@ -604,7 +585,7 @@ class BufferedTupleStream {
 
   /// Same as PrepareForRead(), except the iterators are not invalidated and
   /// the caller is assumed to have checked there is sufficient unused reservation.
-  Status PrepareForReadInternal(bool attach_on_read) WARN_UNUSED_RESULT;
+  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
 
   /// Pins the next read page. This blocks reading from disk if necessary to bring the
   /// page's data into memory. Updates read_page_, read_ptr_, and
@@ -612,9 +593,7 @@ class BufferedTupleStream {
   Status NextReadPage() WARN_UNUSED_RESULT;
 
   /// Invalidate the read iterator, and release any resources associated with the active
-  /// iterator. Invalid to call if 'attach_on_read_' is true and >= 1 rows have been read,
-  /// because that would leave the stream in limbo where it still has pages but it is
-  /// invalid to read or write from in future.
+  /// iterator.
   void InvalidateReadIterator();
 
   /// Returns the total additional bytes that this row will consume in write_page_ if
@@ -639,8 +618,7 @@ class BufferedTupleStream {
   void UnpinPageIfNeeded(Page* page, bool stream_pinned);
 
   /// Return the expected pin count for 'page' in the current stream based on the current
-  /// read and write pages and whether the stream is pinned. Not valid to call if
-  /// the page was just deleted, i.e. page->attached_to_output_batch == false.
+  /// read and write pages and whether the stream is pinned.
   int ExpectedPinCount(bool stream_pinned, const Page* page) const;
 
   /// Return true if the stream in its current state needs to have a reservation for

http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 90d8c4d..67adb9b 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -418,7 +418,6 @@ class RowBatch {
   friend class RowBatchSerializeBaseline;
   friend class RowBatchSerializeBenchmark;
   friend class RowBatchSerializeTest;
-  friend class SimpleTupleStreamTest;
 
   /// Creates an empty row batch based on the serialized row batch header. Called from
   /// FromProtobuf() above before desrialization of a protobuf row batch.

http://git-wip-us.apache.org/repos/asf/impala/blob/d33d5b74/be/src/runtime/tuple.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 91517f1..68f313d 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -247,10 +247,6 @@ class Tuple {
     return static_cast<bool*>(GetSlot(offset));
   }
 
-  int32_t* GetIntSlot(int offset) {
-    return static_cast<int32_t*>(GetSlot(offset));
-  }
-
   int64_t* GetBigIntSlot(int offset) {
     return static_cast<int64_t*>(GetSlot(offset));
   }