You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/08/05 03:18:17 UTC

[06/11] incubator-impala git commit: IMPALA-4674: Part 2: port backend exec to BufferPool

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
deleted file mode 100644
index 0904833..0000000
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ /dev/null
@@ -1,1264 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <boost/scoped_ptr.hpp>
-#include <boost/bind.hpp>
-#include <boost/filesystem.hpp>
-
-#include <set>
-#include <string>
-#include <limits> // for std::numeric_limits<int>::max()
-
-#include "testutil/gtest-util.h"
-#include "codegen/llvm-codegen.h"
-#include "gutil/gscoped_ptr.h"
-#include "runtime/buffered-tuple-stream.inline.h"
-#include "runtime/collection-value.h"
-#include "runtime/collection-value-builder.h"
-#include "runtime/raw-value.h"
-#include "runtime/row-batch.h"
-#include "runtime/string-value.inline.h"
-#include "runtime/test-env.h"
-#include "runtime/tmp-file-mgr.h"
-#include "service/fe-support.h"
-#include "testutil/desc-tbl-builder.h"
-#include "util/test-info.h"
-
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ImpalaInternalService_types.h"
-
-#include "common/names.h"
-
-using kudu::FreeDeleter;
-
-static const int BATCH_SIZE = 250;
-static const int IO_BLOCK_SIZE = 8 * 1024 * 1024;
-static const uint32_t PRIME = 479001599;
-
-namespace impala {
-
-static const StringValue STRINGS[] = {
-  StringValue("ABC"),
-  StringValue("HELLO"),
-  StringValue("123456789"),
-  StringValue("FOOBAR"),
-  StringValue("ONE"),
-  StringValue("THREE"),
-  StringValue("abcdefghijklmno"),
-  StringValue("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
-  StringValue("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
-};
-
-static const int NUM_STRINGS = sizeof(STRINGS) / sizeof(StringValue);
-
-class SimpleTupleStreamTest : public testing::Test {
- protected:
-  virtual void SetUp() {
-    test_env_.reset(new TestEnv());
-    ASSERT_OK(test_env_->Init());
-
-    CreateDescriptors();
-
-    mem_pool_.reset(new MemPool(&tracker_));
-  }
-
-  virtual void CreateDescriptors() {
-    vector<bool> nullable_tuples(1, false);
-    vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
-
-    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_desc_ = pool_.Add(new RowDescriptor(
-        *int_builder.Build(), tuple_ids, nullable_tuples));
-
-    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_desc_ = pool_.Add(new RowDescriptor(
-        *string_builder.Build(), tuple_ids, nullable_tuples));
-  }
-
-  virtual void TearDown() {
-    runtime_state_ = NULL;
-    client_ = NULL;
-    pool_.Clear();
-    mem_pool_->FreeAll();
-    test_env_.reset();
-  }
-
-  /// Setup a block manager with the provided settings and client with no reservation,
-  /// tracked by tracker_.
-  void InitBlockMgr(int64_t limit, int block_size) {
-    ASSERT_OK(test_env_->CreateQueryStateWithBlockMgr(
-        0, limit, block_size, nullptr, &runtime_state_));
-    MemTracker* client_tracker =
-        pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
-    ASSERT_OK(runtime_state_->block_mgr()->RegisterClient(
-        "", 0, false, client_tracker, runtime_state_, &client_));
-  }
-
-  /// Generate the ith element of a sequence of int values.
-  int GenIntValue(int i) {
-    // Multiply by large prime to get varied bit patterns.
-    return i * PRIME;
-  }
-
-  /// Generate the ith element of a sequence of bool values.
-  bool GenBoolValue(int i) {
-    // Use a middle bit of the int value.
-    return ((GenIntValue(i) >> 8) & 0x1) != 0;
-  }
-
-  /// Count the total number of slots per row based on the given row descriptor.
-  int CountSlotsPerRow(const RowDescriptor& row_desc) {
-    int slots_per_row = 0;
-    for (int i = 0; i < row_desc.tuple_descriptors().size(); ++i) {
-      TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[i];
-      slots_per_row += tuple_desc->slots().size();
-    }
-    return slots_per_row;
-  }
-
-  /// Allocate a row batch with 'num_rows' of rows with layout described by 'row_desc'.
-  /// 'offset' is used to account for rows occupied by any previous row batches. This is
-  /// needed to match the values generated in VerifyResults(). If 'gen_null' is true,
-  /// some tuples will be set to NULL.
-  virtual RowBatch* CreateBatch(
-      const RowDescriptor* row_desc, int offset, int num_rows, bool gen_null) {
-    RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_));
-    int num_tuples = row_desc->tuple_descriptors().size();
-
-    int idx = offset * CountSlotsPerRow(*row_desc);
-    for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
-      TupleRow* row = batch->GetRow(row_idx);
-      for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-        TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx];
-        Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), batch->tuple_data_pool());
-        bool is_null = gen_null && !GenBoolValue(idx);
-        for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); ++slot_idx, ++idx) {
-          SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx];
-          void* slot = tuple->GetSlot(slot_desc->tuple_offset());
-          switch (slot_desc->type().type) {
-            case TYPE_INT:
-              *reinterpret_cast<int*>(slot) = GenIntValue(idx);
-              break;
-            case TYPE_STRING:
-              *reinterpret_cast<StringValue*>(slot) = STRINGS[idx % NUM_STRINGS];
-              break;
-            default:
-              // The memory has been zero'ed out already by Tuple::Create().
-              break;
-          }
-        }
-        if (is_null) {
-          row->SetTuple(tuple_idx, NULL);
-        } else {
-          row->SetTuple(tuple_idx, tuple);
-        }
-      }
-      batch->CommitLastRow();
-    }
-    return batch;
-  }
-
-  virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) {
-    return CreateBatch(int_desc_, offset, num_rows, gen_null);
-  }
-
-  virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) {
-    return CreateBatch(string_desc_, offset, num_rows, gen_null);
-  }
-
-  void AppendValue(uint8_t* ptr, vector<int>* results) {
-    if (ptr == NULL) {
-      // For the tests indicate null-ability using the max int value
-      results->push_back(std::numeric_limits<int>::max());
-    } else {
-      results->push_back(*reinterpret_cast<int*>(ptr));
-    }
-  }
-
-  void AppendValue(uint8_t* ptr, vector<StringValue>* results) {
-    if (ptr == NULL) {
-      results->push_back(StringValue());
-    } else {
-      StringValue sv = *reinterpret_cast<StringValue*>(ptr);
-      uint8_t* copy = mem_pool_->Allocate(sv.len);
-      memcpy(copy, sv.ptr, sv.len);
-      sv.ptr = reinterpret_cast<char*>(copy);
-      results->push_back(sv);
-    }
-  }
-
-  template <typename T>
-  void AppendRowTuples(TupleRow* row, RowDescriptor* row_desc, vector<T>* results) {
-    DCHECK(row != NULL);
-    const int num_tuples = row_desc->tuple_descriptors().size();
-
-    for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-      TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx];
-      Tuple* tuple = row->GetTuple(tuple_idx);
-      const int num_slots = tuple_desc->slots().size();
-      for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx) {
-        SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx];
-        if (tuple == NULL) {
-          AppendValue(NULL, results);
-        } else {
-          void* slot = tuple->GetSlot(slot_desc->tuple_offset());
-          AppendValue(reinterpret_cast<uint8_t*>(slot), results);
-        }
-      }
-    }
-  }
-
-  template <typename T>
-  void ReadValues(BufferedTupleStream* stream, RowDescriptor* desc, vector<T>* results,
-      int num_batches = -1) {
-    bool eos = false;
-    RowBatch batch(desc, BATCH_SIZE, &tracker_);
-    int batches_read = 0;
-    do {
-      batch.Reset();
-      EXPECT_OK(stream->GetNext(&batch, &eos));
-      ++batches_read;
-      for (int i = 0; i < batch.num_rows(); ++i) {
-        AppendRowTuples(batch.GetRow(i), desc, results);
-      }
-    } while (!eos && (num_batches < 0 || batches_read <= num_batches));
-  }
-
-  void GetExpectedValue(int idx, bool is_null, int* val) {
-    if (is_null) {
-      *val = std::numeric_limits<int>::max();
-    } else {
-      *val = GenIntValue(idx);
-    }
-  }
-
-  void GetExpectedValue(int idx, bool is_null, StringValue* val) {
-    if (is_null) {
-      *val = StringValue();
-    } else {
-      *val = STRINGS[idx % NUM_STRINGS];
-    }
-  }
-
-  template <typename T>
-  void VerifyResults(const RowDescriptor& row_desc, const vector<T>& results,
-      int num_rows, bool gen_null) {
-    int idx = 0;
-    for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
-      const int num_tuples = row_desc.tuple_descriptors().size();
-      for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-        const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
-        const int num_slots = tuple_desc->slots().size();
-        bool is_null = gen_null && !GenBoolValue(idx);
-        for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx, ++idx) {
-          T expected_val;
-          GetExpectedValue(idx, is_null, &expected_val);
-          ASSERT_EQ(results[idx], expected_val)
-              << "results[" << idx << "] " << results[idx] << " != "
-              << expected_val << " row_idx=" << row_idx
-              << " tuple_idx=" << tuple_idx << " slot_idx=" << slot_idx
-              << " gen_null=" << gen_null;
-        }
-      }
-    }
-    DCHECK_EQ(results.size(), idx);
-  }
-
-  // Test adding num_batches of ints to the stream and reading them back.
-  // If unpin_stream is true, operate the stream in unpinned mode.
-  // Assumes that enough buffers are available to read and write the stream.
-  template <typename T>
-  void TestValues(int num_batches, RowDescriptor* desc, bool gen_null,
-      bool unpin_stream, int num_rows = BATCH_SIZE, bool use_small_buffers = true) {
-    BufferedTupleStream stream(runtime_state_, desc, runtime_state_->block_mgr(), client_,
-        use_small_buffers, false);
-    ASSERT_OK(stream.Init(-1, NULL, true));
-    bool got_write_buffer;
-    ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-    ASSERT_TRUE(got_write_buffer);
-
-    if (unpin_stream) {
-      ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-    }
-    // Add rows to the stream
-    int offset = 0;
-    for (int i = 0; i < num_batches; ++i) {
-      RowBatch* batch = NULL;
-
-      Status status;
-      ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == sizeof(StringValue));
-      batch = CreateBatch(desc, offset, num_rows, gen_null);
-      for (int j = 0; j < batch->num_rows(); ++j) {
-        bool b = stream.AddRow(batch->GetRow(j), &status);
-        ASSERT_OK(status);
-        if (!b) {
-          ASSERT_TRUE(stream.using_small_buffers());
-          bool got_buffer;
-          ASSERT_OK(stream.SwitchToIoBuffers(&got_buffer));
-          ASSERT_TRUE(got_buffer);
-          b = stream.AddRow(batch->GetRow(j), &status);
-          ASSERT_OK(status);
-        }
-        ASSERT_TRUE(b);
-      }
-      offset += batch->num_rows();
-      // Reset the batch to make sure the stream handles the memory correctly.
-      batch->Reset();
-    }
-
-    bool got_read_buffer;
-    ASSERT_OK(stream.PrepareForRead(false, &got_read_buffer));
-    ASSERT_TRUE(got_read_buffer);
-
-    // Read all the rows back
-    vector<T> results;
-    ReadValues(&stream, desc, &results);
-
-    // Verify result
-    VerifyResults<T>(*desc, results, num_rows * num_batches, gen_null);
-
-    stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
-
-  void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
-      bool unpin_stream) {
-    for (int small_buffers = 0; small_buffers < 2; ++small_buffers) {
-      BufferedTupleStream stream(runtime_state_, int_desc_, runtime_state_->block_mgr(),
-          client_, small_buffers == 0, // initial small buffers
-          true); // read_write
-      ASSERT_OK(stream.Init(-1, NULL, true));
-      bool got_write_buffer;
-      ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-      ASSERT_TRUE(got_write_buffer);
-      bool got_read_buffer;
-      ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
-      ASSERT_TRUE(got_read_buffer);
-      if (unpin_stream) {
-        ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-      }
-
-      vector<int> results;
-
-      for (int i = 0; i < num_batches; ++i) {
-        RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false);
-        for (int j = 0; j < batch->num_rows(); ++j) {
-          Status status;
-          bool b = stream.AddRow(batch->GetRow(j), &status);
-          ASSERT_TRUE(b);
-          ASSERT_OK(status);
-        }
-        // Reset the batch to make sure the stream handles the memory correctly.
-        batch->Reset();
-        if (i % num_batches_before_read == 0) {
-          ReadValues(&stream, int_desc_, &results,
-              (rand() % num_batches_before_read) + 1);
-        }
-      }
-      ReadValues(&stream, int_desc_, &results);
-
-      VerifyResults<int>(*int_desc_, results, BATCH_SIZE * num_batches, false);
-
-      stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-    }
-  }
-
-  void TestUnpinPin(bool varlen_data);
-
-  void TestTransferMemory(bool pinned_stream, bool read_write);
-
-  scoped_ptr<TestEnv> test_env_;
-  RuntimeState* runtime_state_;
-  BufferedBlockMgr::Client* client_;
-
-  MemTracker tracker_;
-  ObjectPool pool_;
-  RowDescriptor* int_desc_;
-  RowDescriptor* string_desc_;
-  scoped_ptr<MemPool> mem_pool_;
-};
-
-
-// Tests with a non-NULLable tuple per row.
-class SimpleNullStreamTest : public SimpleTupleStreamTest {
- protected:
-  virtual void CreateDescriptors() {
-    vector<bool> nullable_tuples(1, true);
-    vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
-
-    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_desc_ = pool_.Add(new RowDescriptor(
-        *int_builder.Build(), tuple_ids, nullable_tuples));
-
-    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_desc_ = pool_.Add(new RowDescriptor(
-        *string_builder.Build(), tuple_ids, nullable_tuples));
-  }
-}; // SimpleNullStreamTest
-
-// Tests with multiple non-NULLable tuples per row.
-class MultiTupleStreamTest : public SimpleTupleStreamTest {
- protected:
-  virtual void CreateDescriptors() {
-    vector<bool> nullable_tuples;
-    nullable_tuples.push_back(false);
-    nullable_tuples.push_back(false);
-    nullable_tuples.push_back(false);
-
-    vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(static_cast<TTupleId>(0));
-    tuple_ids.push_back(static_cast<TTupleId>(1));
-    tuple_ids.push_back(static_cast<TTupleId>(2));
-
-    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_desc_ = pool_.Add(new RowDescriptor(
-        *int_builder.Build(), tuple_ids, nullable_tuples));
-
-    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_desc_ = pool_.Add(new RowDescriptor(
-        *string_builder.Build(), tuple_ids, nullable_tuples));
-  }
-};
-
-// Tests with multiple NULLable tuples per row.
-class MultiNullableTupleStreamTest : public SimpleTupleStreamTest {
- protected:
-  virtual void CreateDescriptors() {
-    vector<bool> nullable_tuples;
-    nullable_tuples.push_back(false);
-    nullable_tuples.push_back(true);
-    nullable_tuples.push_back(true);
-
-    vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(static_cast<TTupleId>(0));
-    tuple_ids.push_back(static_cast<TTupleId>(1));
-    tuple_ids.push_back(static_cast<TTupleId>(2));
-
-    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_builder.DeclareTuple() << TYPE_INT;
-    int_desc_ = pool_.Add(new RowDescriptor(
-        *int_builder.Build(), tuple_ids, nullable_tuples));
-
-    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_builder.DeclareTuple() << TYPE_STRING;
-    string_desc_ = pool_.Add(new RowDescriptor(
-        *string_builder.Build(), tuple_ids, nullable_tuples));
-  }
-};
-
-/// Tests with collection types.
-class ArrayTupleStreamTest : public SimpleTupleStreamTest {
- protected:
-  RowDescriptor* array_desc_;
-
-  virtual void CreateDescriptors() {
-    // tuples: (array<string>, array<array<int>>) (array<int>)
-    vector<bool> nullable_tuples(2, true);
-    vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(static_cast<TTupleId>(0));
-    tuple_ids.push_back(static_cast<TTupleId>(1));
-    ColumnType string_array_type;
-    string_array_type.type = TYPE_ARRAY;
-    string_array_type.children.push_back(TYPE_STRING);
-
-    ColumnType int_array_type;
-    int_array_type.type = TYPE_ARRAY;
-    int_array_type.children.push_back(TYPE_STRING);
-
-    ColumnType nested_array_type;
-    nested_array_type.type = TYPE_ARRAY;
-    nested_array_type.children.push_back(int_array_type);
-
-    DescriptorTblBuilder builder(test_env_->exec_env()->frontend(), &pool_);
-    builder.DeclareTuple() << string_array_type << nested_array_type;
-    builder.DeclareTuple() << int_array_type;
-    array_desc_ = pool_.Add(new RowDescriptor(
-        *builder.Build(), tuple_ids, nullable_tuples));
-  }
-};
-
-// Basic API test. No data should be going to disk.
-TEST_F(SimpleTupleStreamTest, Basic) {
-  InitBlockMgr(-1, IO_BLOCK_SIZE);
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(100, int_desc_, false, true);
-  TestValues<int>(1, int_desc_, false, false);
-  TestValues<int>(10, int_desc_, false, false);
-  TestValues<int>(100, int_desc_, false, false);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(100, string_desc_, false, true);
-  TestValues<StringValue>(1, string_desc_, false, false);
-  TestValues<StringValue>(10, string_desc_, false, false);
-  TestValues<StringValue>(100, string_desc_, false, false);
-
-  TestIntValuesInterleaved(1, 1, true);
-  TestIntValuesInterleaved(10, 5, true);
-  TestIntValuesInterleaved(100, 15, true);
-  TestIntValuesInterleaved(1, 1, false);
-  TestIntValuesInterleaved(10, 5, false);
-  TestIntValuesInterleaved(100, 15, false);
-}
-
-// Test with only 1 buffer.
-TEST_F(SimpleTupleStreamTest, OneBufferSpill) {
-  // Each buffer can only hold 100 ints, so this spills quite often.
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(buffer_size, buffer_size);
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-}
-
-// Test with a few buffers.
-TEST_F(SimpleTupleStreamTest, ManyBufferSpill) {
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(10 * buffer_size, buffer_size);
-
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(100, int_desc_, false, true);
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(100, string_desc_, false, true);
-
-  TestIntValuesInterleaved(1, 1, true);
-  TestIntValuesInterleaved(10, 5, true);
-  TestIntValuesInterleaved(100, 15, true);
-}
-
-void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(3 * buffer_size, buffer_size);
-  RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_;
-
-  BufferedTupleStream stream(
-      runtime_state_, row_desc, runtime_state_->block_mgr(), client_, true, false);
-  ASSERT_OK(stream.Init(-1, NULL, true));
-  bool got_write_buffer;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-  ASSERT_TRUE(got_write_buffer);
-
-  int offset = 0;
-  bool full = false;
-  while (!full) {
-    RowBatch* batch = varlen_data ? CreateStringBatch(offset, BATCH_SIZE, false)
-                                  : CreateIntBatch(offset, BATCH_SIZE, false);
-    int j = 0;
-    for (; j < batch->num_rows(); ++j) {
-      Status status;
-      full = !stream.AddRow(batch->GetRow(j), &status);
-      ASSERT_OK(status);
-      if (full) break;
-    }
-    offset += j;
-  }
-
-  ASSERT_OK(stream.UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-
-  bool pinned = false;
-  ASSERT_OK(stream.PinStream(false, &pinned));
-  ASSERT_TRUE(pinned);
-
-
-  // Read and verify result a few times. We should be able to reread the stream if
-  // we don't use delete on read mode.
-  int read_iters = 3;
-  for (int i = 0; i < read_iters; ++i) {
-    bool delete_on_read = i == read_iters - 1;
-    bool got_read_buffer;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer));
-    ASSERT_TRUE(got_read_buffer);
-
-    if (varlen_data) {
-      vector<StringValue> results;
-      ReadValues(&stream, row_desc, &results);
-      VerifyResults<StringValue>(*string_desc_, results, offset, false);
-    } else {
-      vector<int> results;
-      ReadValues(&stream, row_desc, &results);
-      VerifyResults<int>(*int_desc_, results, offset, false);
-    }
-  }
-
-  // After delete_on_read, all blocks aside from the last should be deleted.
-  // Note: this should really be 0, but the BufferedTupleStream returns eos before
-  // deleting the last block, rather than after, so the last block isn't deleted
-  // until the stream is closed.
-  ASSERT_EQ(stream.bytes_in_mem(false), buffer_size);
-
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-
-  ASSERT_EQ(stream.bytes_in_mem(false), 0);
-}
-
-TEST_F(SimpleTupleStreamTest, UnpinPin) {
-  TestUnpinPin(false);
-}
-
-TEST_F(SimpleTupleStreamTest, UnpinPinVarlen) {
-  TestUnpinPin(false);
-}
-
-TEST_F(SimpleTupleStreamTest, SmallBuffers) {
-  int buffer_size = IO_BLOCK_SIZE;
-  InitBlockMgr(2 * buffer_size, buffer_size);
-
-  BufferedTupleStream stream(
-      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, true, false);
-  ASSERT_OK(stream.Init(-1, NULL, false));
-  bool got_write_buffer;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-  ASSERT_TRUE(got_write_buffer);
-
-  // Initial buffer should be small.
-  EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
-
-  RowBatch* batch = CreateIntBatch(0, 1024, false);
-
-  Status status;
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    bool ret = stream.AddRow(batch->GetRow(i), &status);
-    EXPECT_TRUE(ret);
-    ASSERT_OK(status);
-  }
-  EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
-  EXPECT_LT(stream.byte_size(), buffer_size);
-  ASSERT_TRUE(stream.using_small_buffers());
-
-  // 40 MB of ints
-  batch = CreateIntBatch(0, 10 * 1024 * 1024, false);
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    bool ret = stream.AddRow(batch->GetRow(i), &status);
-    ASSERT_OK(status);
-    if (!ret) {
-      ASSERT_TRUE(stream.using_small_buffers());
-      bool got_buffer;
-      ASSERT_OK(stream.SwitchToIoBuffers(&got_buffer));
-      ASSERT_TRUE(got_buffer);
-      ret = stream.AddRow(batch->GetRow(i), &status);
-      ASSERT_OK(status);
-    }
-    ASSERT_TRUE(ret);
-  }
-  EXPECT_EQ(stream.bytes_in_mem(false), buffer_size);
-
-  // TODO: Test for IMPALA-2330. In case SwitchToIoBuffers() fails to get buffer then
-  // using_small_buffers() should still return true.
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) {
-  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
-  // make the batch at capacity.
-  int buffer_size = 4 * 1024;
-  InitBlockMgr(100 * buffer_size, buffer_size);
-
-  BufferedTupleStream stream(
-      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, read_write);
-  ASSERT_OK(stream.Init(-1, NULL, pin_stream));
-  bool got_write_buffer;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-  ASSERT_TRUE(got_write_buffer);
-  RowBatch* batch = CreateIntBatch(0, 1024, false);
-
-  // Construct a stream with 4 blocks.
-  const int total_num_blocks = 4;
-  while (stream.byte_size() < total_num_blocks * buffer_size) {
-    Status status;
-    for (int i = 0; i < batch->num_rows(); ++i) {
-      bool ret = stream.AddRow(batch->GetRow(i), &status);
-      EXPECT_TRUE(ret);
-      ASSERT_OK(status);
-    }
-  }
-
-  bool got_read_buffer;
-  ASSERT_OK(stream.PrepareForRead(true, &got_read_buffer));
-  ASSERT_TRUE(got_read_buffer);
-
-  batch->Reset();
-  stream.Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
-  if (pin_stream) {
-    DCHECK_EQ(total_num_blocks, batch->num_blocks());
-  } else if (read_write) {
-    // Read and write block should be attached.
-    DCHECK_EQ(2, batch->num_blocks());
-  } else {
-    // Read block should be attached.
-    DCHECK_EQ(1, batch->num_blocks());
-  }
-  DCHECK(batch->AtCapacity()); // Flush resources flag should have been set.
-  batch->Reset();
-  DCHECK_EQ(0, batch->num_blocks());
-}
-
-/// Test attaching memory to a row batch from a pinned stream.
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) {
-  TestTransferMemory(true, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) {
-  TestTransferMemory(true, false);
-}
-
-/// Test attaching memory to a row batch from an unpinned stream.
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) {
-  TestTransferMemory(false, true);
-}
-
-TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
-  TestTransferMemory(false, false);
-}
-
-// Test that tuple stream functions if it references strings outside stream. The
-// aggregation node relies on this since it updates tuples in-place.
-TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
-  int buffer_size = 8 * 1024 * 1024;
-  InitBlockMgr(2 * buffer_size, buffer_size);
-  Status status = Status::OK();
-
-  int num_batches = 100;
-  int rows_added = 0;
-  DCHECK_EQ(string_desc_->tuple_descriptors().size(), 1);
-  TupleDescriptor& tuple_desc = *string_desc_->tuple_descriptors()[0];
-
-  set<SlotId> external_slots;
-  for (int i = 0; i < tuple_desc.string_slots().size(); ++i) {
-    external_slots.insert(tuple_desc.string_slots()[i]->id());
-  }
-
-  BufferedTupleStream stream(runtime_state_, string_desc_, runtime_state_->block_mgr(),
-      client_, true, false, external_slots);
-  for (int i = 0; i < num_batches; ++i) {
-    RowBatch* batch = CreateStringBatch(rows_added, BATCH_SIZE, false);
-    for (int j = 0; j < batch->num_rows(); ++j) {
-      uint8_t* varlen_data;
-      int fixed_size = tuple_desc.byte_size();
-      uint8_t* tuple = stream.AllocateRow(fixed_size, 0, &varlen_data, &status);
-      ASSERT_TRUE(tuple != NULL);
-      ASSERT_TRUE(status.ok());
-      // Copy fixed portion in, but leave it pointing to row batch's varlen data.
-      memcpy(tuple, batch->GetRow(j)->GetTuple(0), fixed_size);
-    }
-    rows_added += batch->num_rows();
-  }
-
-  DCHECK_EQ(rows_added, stream.num_rows());
-
-  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
-    // Keep stream in memory and test we can read ok.
-    vector<StringValue> results;
-    bool got_read_buffer;
-    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_buffer));
-    ASSERT_TRUE(got_read_buffer);
-    ReadValues(&stream, string_desc_, &results);
-    VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
-  }
-
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-// Construct a big row by stiching together many tuples so the total row size
-// will be close to the IO block size. With null indicators, stream will fail to
-// be initialized; Without null indicators, things should work fine.
-TEST_F(SimpleTupleStreamTest, BigRow) {
-  InitBlockMgr(2 * IO_BLOCK_SIZE, IO_BLOCK_SIZE);
-  vector<TupleId> tuple_ids;
-  vector<bool> nullable_tuples;
-  vector<bool> non_nullable_tuples;
-
-  DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_);
-  // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator.
-  const int num_tuples = IO_BLOCK_SIZE / (8 * sizeof(int) + 1);
-  for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-    big_row_builder.DeclareTuple() << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT
-        << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT;
-    tuple_ids.push_back(static_cast<TTupleId>(tuple_idx));
-    nullable_tuples.push_back(true);
-    non_nullable_tuples.push_back(false);
-  }
-  DescriptorTbl *desc = big_row_builder.Build();
-
-  // Construct a big row with all non-nullable tuples.
-  RowDescriptor* row_desc = pool_.Add(new RowDescriptor(
-      *desc, tuple_ids, non_nullable_tuples));
-  ASSERT_FALSE(row_desc->IsAnyTupleNullable());
-  // Test writing this row into the stream and then reading it back.
-  TestValues<int>(1, row_desc, false, false, 1, false);
-  TestValues<int>(1, row_desc, false, true, 1, false);
-
-  // Construct a big row with nullable tuples. This requires space for null indicators
-  // in the stream which, as a result, will fail to initialize.
-  RowDescriptor* nullable_row_desc = pool_.Add(new RowDescriptor(
-      *desc, tuple_ids, nullable_tuples));
-  ASSERT_TRUE(nullable_row_desc->IsAnyTupleNullable());
-  BufferedTupleStream nullable_stream(runtime_state_, nullable_row_desc,
-      runtime_state_->block_mgr(), client_, false, false);
-  Status status = nullable_stream.Init(-1, NULL, true);
-  ASSERT_FALSE(status.ok());
-  nullable_stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-// Test for IMPALA-3923: overflow of 32-bit int in GetRows().
-TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
-  InitBlockMgr(-1, 8 * 1024 * 1024);
-  BufferedTupleStream stream(
-      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, false);
-  ASSERT_OK(stream.Init(-1, NULL, true));
-
-  Status status;
-  // Add more rows than can be fit in a RowBatch (limited by its 32-bit row count).
-  // Actually adding the rows would take a very long time, so just set num_rows_.
-  // This puts the stream in an inconsistent state, but exercises the right code path.
-  stream.num_rows_ = 1L << 33;
-  bool got_rows;
-  scoped_ptr<RowBatch> overflow_batch;
-  ASSERT_FALSE(stream.GetRows(&overflow_batch, &got_rows).ok());
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-// Basic API test. No data should be going to disk.
-TEST_F(SimpleNullStreamTest, Basic) {
-  InitBlockMgr(-1, IO_BLOCK_SIZE);
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(100, int_desc_, false, true);
-  TestValues<int>(1, int_desc_, true, true);
-  TestValues<int>(10, int_desc_, true, true);
-  TestValues<int>(100, int_desc_, true, true);
-  TestValues<int>(1, int_desc_, false, false);
-  TestValues<int>(10, int_desc_, false, false);
-  TestValues<int>(100, int_desc_, false, false);
-  TestValues<int>(1, int_desc_, true, false);
-  TestValues<int>(10, int_desc_, true, false);
-  TestValues<int>(100, int_desc_, true, false);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(100, string_desc_, false, true);
-  TestValues<StringValue>(1, string_desc_, true, true);
-  TestValues<StringValue>(10, string_desc_, true, true);
-  TestValues<StringValue>(100, string_desc_, true, true);
-  TestValues<StringValue>(1, string_desc_, false, false);
-  TestValues<StringValue>(10, string_desc_, false, false);
-  TestValues<StringValue>(100, string_desc_, false, false);
-  TestValues<StringValue>(1, string_desc_, true, false);
-  TestValues<StringValue>(10, string_desc_, true, false);
-  TestValues<StringValue>(100, string_desc_, true, false);
-
-  TestIntValuesInterleaved(1, 1, true);
-  TestIntValuesInterleaved(10, 5, true);
-  TestIntValuesInterleaved(100, 15, true);
-  TestIntValuesInterleaved(1, 1, false);
-  TestIntValuesInterleaved(10, 5, false);
-  TestIntValuesInterleaved(100, 15, false);
-}
-
-// Test tuple stream with only 1 buffer and rows with multiple tuples.
-TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) {
-  // Each buffer can only hold 100 ints, so this spills quite often.
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(buffer_size, buffer_size);
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-}
-
-// Test with a few buffers and rows with multiple tuples.
-TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(10 * buffer_size, buffer_size);
-
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(100, int_desc_, false, true);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(100, string_desc_, false, true);
-
-  TestIntValuesInterleaved(1, 1, true);
-  TestIntValuesInterleaved(10, 5, true);
-  TestIntValuesInterleaved(100, 15, true);
-}
-
-// Test that we can allocate a row in the stream and copy in multiple tuples then
-// read it back from the stream.
-TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
-  // Use small buffers so it will be flushed to disk.
-  int buffer_size = 4 * 1024;
-  InitBlockMgr(2 * buffer_size, buffer_size);
-  Status status = Status::OK();
-
-  int num_batches = 1;
-  int rows_added = 0;
-  BufferedTupleStream stream(
-      runtime_state_, string_desc_, runtime_state_->block_mgr(), client_, false, false);
-  ASSERT_OK(stream.Init(-1, NULL, false));
-  bool got_write_buffer;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-  ASSERT_TRUE(got_write_buffer);
-
-  for (int i = 0; i < num_batches; ++i) {
-    RowBatch* batch = CreateStringBatch(rows_added, 1, false);
-    for (int j = 0; j < batch->num_rows(); ++j) {
-      TupleRow* row = batch->GetRow(j);
-      int64_t fixed_size = 0;
-      int64_t varlen_size = 0;
-      for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
-        TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
-        fixed_size += tuple_desc->byte_size();
-        varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
-      }
-      uint8_t* varlen_data;
-      uint8_t* fixed_data = stream.AllocateRow(fixed_size, varlen_size, &varlen_data,
-          &status);
-      ASSERT_TRUE(fixed_data != NULL);
-      ASSERT_TRUE(status.ok());
-      uint8_t* varlen_write_ptr = varlen_data;
-      for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
-        TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
-        Tuple* src = row->GetTuple(k);
-        Tuple* dst = reinterpret_cast<Tuple*>(fixed_data);
-        fixed_data += tuple_desc->byte_size();
-        memcpy(dst, src, tuple_desc->byte_size());
-        for (int l = 0; l < tuple_desc->slots().size(); l++) {
-          SlotDescriptor* slot = tuple_desc->slots()[l];
-          StringValue* src_string = src->GetStringSlot(slot->tuple_offset());
-          StringValue* dst_string = dst->GetStringSlot(slot->tuple_offset());
-          dst_string->ptr = reinterpret_cast<char*>(varlen_write_ptr);
-          memcpy(dst_string->ptr, src_string->ptr, src_string->len);
-          varlen_write_ptr += src_string->len;
-        }
-      }
-      ASSERT_EQ(varlen_data + varlen_size, varlen_write_ptr);
-    }
-    rows_added += batch->num_rows();
-  }
-
-  for (int i = 0; i < 3; ++i) {
-    bool delete_on_read = i == 2;
-    vector<StringValue> results;
-    bool got_read_buffer;
-    stream.PrepareForRead(delete_on_read, &got_read_buffer);
-    ASSERT_TRUE(got_read_buffer);
-    ReadValues(&stream, string_desc_, &results);
-    VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
-  }
-
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-// Test with rows with multiple nullable tuples.
-TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) {
-  // Each buffer can only hold 100 ints, so this spills quite often.
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(buffer_size, buffer_size);
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(1, int_desc_, true, true);
-  TestValues<int>(10, int_desc_, true, true);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(1, string_desc_, true, true);
-  TestValues<StringValue>(10, string_desc_, true, true);
-}
-
-// Test with a few buffers.
-TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) {
-  int buffer_size = 100 * sizeof(int);
-  InitBlockMgr(10 * buffer_size, buffer_size);
-
-  TestValues<int>(1, int_desc_, false, true);
-  TestValues<int>(10, int_desc_, false, true);
-  TestValues<int>(100, int_desc_, false, true);
-  TestValues<int>(1, int_desc_, true, true);
-  TestValues<int>(10, int_desc_, true, true);
-  TestValues<int>(100, int_desc_, true, true);
-
-  TestValues<StringValue>(1, string_desc_, false, true);
-  TestValues<StringValue>(10, string_desc_, false, true);
-  TestValues<StringValue>(100, string_desc_, false, true);
-  TestValues<StringValue>(1, string_desc_, true, true);
-  TestValues<StringValue>(10, string_desc_, true, true);
-  TestValues<StringValue>(100, string_desc_, true, true);
-
-  TestIntValuesInterleaved(1, 1, true);
-  TestIntValuesInterleaved(10, 5, true);
-  TestIntValuesInterleaved(100, 15, true);
-}
-
-/// Test that ComputeRowSize handles nulls
-TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
-  InitBlockMgr(-1, 8 * 1024 * 1024);
-  const vector<TupleDescriptor*>& tuple_descs = string_desc_->tuple_descriptors();
-  // String in second tuple is stored externally.
-  set<SlotId> external_slots;
-  const SlotDescriptor* external_string_slot = tuple_descs[1]->slots()[0];
-  external_slots.insert(external_string_slot->id());
-
-  BufferedTupleStream stream(runtime_state_, string_desc_, runtime_state_->block_mgr(),
-      client_, false, false, external_slots);
-  gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
-        malloc(tuple_descs.size() * sizeof(Tuple*))));
-  gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>(
-        malloc(tuple_descs[0]->byte_size())));
-  gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>(
-        malloc(tuple_descs[1]->byte_size())));
-  gscoped_ptr<Tuple, FreeDeleter> tuple2(reinterpret_cast<Tuple*>(
-        malloc(tuple_descs[2]->byte_size())));
-  memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
-  memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
-  memset(tuple2.get(), 0, tuple_descs[2]->byte_size());
-
-  // All nullable tuples are NULL.
-  row->SetTuple(0, tuple0.get());
-  row->SetTuple(1, NULL);
-  row->SetTuple(2, NULL);
-  EXPECT_EQ(tuple_descs[0]->byte_size(), stream.ComputeRowSize(row.get()));
-
-  // Tuples are initialized to empty and have no var-len data.
-  row->SetTuple(1, tuple1.get());
-  row->SetTuple(2, tuple2.get());
-  EXPECT_EQ(string_desc_->GetRowSize(), stream.ComputeRowSize(row.get()));
-
-  // Tuple 0 has some data.
-  const SlotDescriptor* string_slot = tuple_descs[0]->slots()[0];
-  StringValue* sv = tuple0->GetStringSlot(string_slot->tuple_offset());
-  *sv = STRINGS[0];
-  int64_t expected_len = string_desc_->GetRowSize() + sv->len;
-  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
-
-  // Check that external slots aren't included in count.
-  sv = tuple1->GetStringSlot(external_string_slot->tuple_offset());
-  sv->ptr = reinterpret_cast<char*>(1234);
-  sv->len = 1234;
-  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
-
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-/// Test that deep copy works with arrays by copying into a BufferedTupleStream, freeing
-/// the original rows, then reading back the rows and verifying the contents.
-TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
-  Status status;
-  InitBlockMgr(-1, IO_BLOCK_SIZE);
-  const int NUM_ROWS = 4000;
-  BufferedTupleStream stream(
-      runtime_state_, array_desc_, runtime_state_->block_mgr(), client_, false, false);
-  const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors();
-  // Write out a predictable pattern of data by iterating over arrays of constants.
-  int strings_index = 0; // we take the mod of this as index into STRINGS.
-  int array_lens[] = { 0, 1, 5, 10, 1000, 2, 49, 20 };
-  int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
-  int array_len_index = 0;
-  ASSERT_OK(stream.Init(-1, NULL, false));
-  bool got_write_buffer;
-  ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
-  ASSERT_TRUE(got_write_buffer);
-
-  for (int i = 0; i < NUM_ROWS; ++i) {
-    int expected_row_size = tuple_descs[0]->byte_size() + tuple_descs[1]->byte_size();
-    gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
-          malloc(tuple_descs.size() * sizeof(Tuple*))));
-    gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>(
-          malloc(tuple_descs[0]->byte_size())));
-    gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>(
-          malloc(tuple_descs[1]->byte_size())));
-    memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
-    memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
-    row->SetTuple(0, tuple0.get());
-    row->SetTuple(1, tuple1.get());
-
-    // Only array<string> is non-null.
-    tuple0->SetNull(tuple_descs[0]->slots()[1]->null_indicator_offset());
-    tuple1->SetNull(tuple_descs[1]->slots()[0]->null_indicator_offset());
-    const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
-    const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
-
-    int array_len = array_lens[array_len_index++ % num_array_lens];
-    CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
-    cv->ptr = NULL;
-    cv->num_tuples = 0;
-    CollectionValueBuilder builder(cv, *item_desc, mem_pool_.get(), runtime_state_,
-        array_len);
-    Tuple* array_data;
-    int num_rows;
-    builder.GetFreeMemory(&array_data, &num_rows);
-    expected_row_size += item_desc->byte_size() * array_len;
-
-    // Fill the array with pointers to our constant strings.
-    for (int j = 0; j < array_len; ++j) {
-      const StringValue* string = &STRINGS[strings_index++ % NUM_STRINGS];
-      array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset());
-      RawValue::Write(string, array_data, item_desc->slots()[0], mem_pool_.get());
-      array_data += item_desc->byte_size();
-      expected_row_size += string->len;
-    }
-    builder.CommitTuples(array_len);
-
-    // Check that internal row size computation gives correct result.
-    EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
-    bool b = stream.AddRow(row.get(), &status);
-    ASSERT_TRUE(b);
-    ASSERT_OK(status);
-    mem_pool_->FreeAll(); // Free data as soon as possible to smoke out issues.
-  }
-
-  // Read back and verify data.
-  bool got_read_buffer;
-  stream.PrepareForRead(false, &got_read_buffer);
-  ASSERT_TRUE(got_read_buffer);
-  strings_index = 0;
-  array_len_index = 0;
-  bool eos = false;
-  int rows_read = 0;
-  RowBatch batch(array_desc_, BATCH_SIZE, &tracker_);
-  do {
-    batch.Reset();
-    ASSERT_OK(stream.GetNext(&batch, &eos));
-    for (int i = 0; i < batch.num_rows(); ++i) {
-      TupleRow* row = batch.GetRow(i);
-      Tuple* tuple0 = row->GetTuple(0);
-      Tuple* tuple1 = row->GetTuple(1);
-      ASSERT_TRUE(tuple0 != NULL);
-      ASSERT_TRUE(tuple1 != NULL);
-      const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
-      ASSERT_FALSE(tuple0->IsNull(array_slot_desc->null_indicator_offset()));
-      ASSERT_TRUE(tuple0->IsNull(tuple_descs[0]->slots()[1]->null_indicator_offset()));
-      ASSERT_TRUE(tuple1->IsNull(tuple_descs[1]->slots()[0]->null_indicator_offset()));
-
-      const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
-      int expected_array_len = array_lens[array_len_index++ % num_array_lens];
-      CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
-      ASSERT_EQ(expected_array_len, cv->num_tuples);
-      for (int j = 0; j < cv->num_tuples; ++j) {
-        Tuple* item = reinterpret_cast<Tuple*>(cv->ptr + j * item_desc->byte_size());
-        const SlotDescriptor* string_desc = item_desc->slots()[0];
-        ASSERT_FALSE(item->IsNull(string_desc->null_indicator_offset()));
-        const StringValue* expected = &STRINGS[strings_index++ % NUM_STRINGS];
-        const StringValue* actual = item->GetStringSlot(string_desc->tuple_offset());
-        ASSERT_EQ(*expected, *actual);
-      }
-    }
-    rows_read += batch.num_rows();
-  } while (!eos);
-  ASSERT_EQ(NUM_ROWS, rows_read);
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-/// Test that ComputeRowSize handles nulls
-TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
-  InitBlockMgr(-1, 8 * 1024 * 1024);
-  const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors();
-  set<SlotId> external_slots;
-  // Second array slot in first tuple is stored externally.
-  const SlotDescriptor* external_array_slot = tuple_descs[0]->slots()[1];
-  external_slots.insert(external_array_slot->id());
-
-  BufferedTupleStream stream(runtime_state_, array_desc_, runtime_state_->block_mgr(),
-      client_, false, false, external_slots);
-  gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
-        malloc(tuple_descs.size() * sizeof(Tuple*))));
-  gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>(
-        malloc(tuple_descs[0]->byte_size())));
-  gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>(
-        malloc(tuple_descs[1]->byte_size())));
-  memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
-  memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
-
-  // All tuples are NULL.
-  row->SetTuple(0, NULL);
-  row->SetTuple(1, NULL);
-  EXPECT_EQ(0, stream.ComputeRowSize(row.get()));
-
-  // Tuples are initialized to empty and have no var-len data.
-  row->SetTuple(0, tuple0.get());
-  row->SetTuple(1, tuple1.get());
-  EXPECT_EQ(array_desc_->GetRowSize(), stream.ComputeRowSize(row.get()));
-
-  // Tuple 0 has an array.
-  int expected_row_size = array_desc_->GetRowSize();
-  const SlotDescriptor* array_slot = tuple_descs[0]->slots()[0];
-  const TupleDescriptor* item_desc = array_slot->collection_item_descriptor();
-  int array_len = 128;
-  CollectionValue* cv = tuple0->GetCollectionSlot(array_slot->tuple_offset());
-  CollectionValueBuilder builder(cv, *item_desc, mem_pool_.get(), runtime_state_,
-      array_len);
-  Tuple* array_data;
-  int num_rows;
-  builder.GetFreeMemory(&array_data, &num_rows);
-  expected_row_size += item_desc->byte_size() * array_len;
-
-  // Fill the array with pointers to our constant strings.
-  for (int i = 0; i < array_len; ++i) {
-    const StringValue* str = &STRINGS[i % NUM_STRINGS];
-    array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset());
-    RawValue::Write(str, array_data, item_desc->slots()[0], mem_pool_.get());
-    array_data += item_desc->byte_size();
-    expected_row_size += str->len;
-  }
-  builder.CommitTuples(array_len);
-  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
-
-  // Check that the external slot isn't included in size.
-  cv = tuple0->GetCollectionSlot(external_array_slot->tuple_offset());
-  // ptr of external slot shouldn't be dereferenced when computing size.
-  cv->ptr = reinterpret_cast<uint8_t*>(1234);
-  cv->num_tuples = 1234;
-  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
-
-  // Check that the array is excluded if tuple 0's array has its null indicator set.
-  tuple0->SetNull(array_slot->null_indicator_offset());
-  EXPECT_EQ(array_desc_->GetRowSize(), stream.ComputeRowSize(row.get()));
-
-  stream.Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-}
-
-// TODO: more tests.
-//  - The stream can operate in many modes
-
-}
-
-int main(int argc, char** argv) {
-  ::testing::InitGoogleTest(&argc, argv);
-  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
-  impala::InitFeSupport();
-  impala::LlvmCodeGen::InitializeLlvm();
-  return RUN_ALL_TESTS();
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
deleted file mode 100644
index cce6390..0000000
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ /dev/null
@@ -1,903 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/buffered-tuple-stream.inline.h"
-
-#include <boost/bind.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "runtime/collection-value.h"
-#include "runtime/descriptors.h"
-#include "runtime/string-value.h"
-#include "runtime/tuple-row.h"
-#include "util/bit-util.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-using namespace impala;
-using namespace strings;
-
-// The first NUM_SMALL_BLOCKS of the tuple stream are made of blocks less than the
-// IO size. These blocks never spill.
-// TODO: Consider adding a 4MB in-memory buffer that would split the gap between the
-// 512KB in-memory buffer and the 8MB (IO-sized) spillable buffer.
-static const int64_t INITIAL_BLOCK_SIZES[] = { 64 * 1024, 512 * 1024 };
-static const int NUM_SMALL_BLOCKS = sizeof(INITIAL_BLOCK_SIZES) / sizeof(int64_t);
-
-string BufferedTupleStream::RowIdx::DebugString() const {
-  stringstream ss;
-  ss << "RowIdx block=" << block() << " offset=" << offset() << " idx=" << idx();
-  return ss.str();
-}
-
-BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
-    const RowDescriptor* row_desc, BufferedBlockMgr* block_mgr,
-    BufferedBlockMgr::Client* client, bool use_initial_small_buffers, bool read_write,
-    const set<SlotId>& ext_varlen_slots)
-  : state_(state),
-    desc_(row_desc),
-    block_mgr_(block_mgr),
-    block_mgr_client_(client),
-    total_byte_size_(0),
-    read_tuple_idx_(-1),
-    read_ptr_(NULL),
-    read_end_ptr_(NULL),
-    write_tuple_idx_(-1),
-    write_ptr_(NULL),
-    write_end_ptr_(NULL),
-    rows_returned_(0),
-    read_block_idx_(-1),
-    write_block_(NULL),
-    num_pinned_(0),
-    num_small_blocks_(0),
-    num_rows_(0),
-    pin_timer_(NULL),
-    unpin_timer_(NULL),
-    get_new_block_timer_(NULL),
-    read_write_(read_write),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
-    use_small_buffers_(use_initial_small_buffers),
-    delete_on_read_(false),
-    closed_(false),
-    pinned_(true) {
-  read_block_null_indicators_size_ = -1;
-  write_block_null_indicators_size_ = -1;
-  max_null_indicators_size_ = -1;
-  read_block_ = blocks_.end();
-  fixed_tuple_row_size_ = 0;
-  for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
-    const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
-    const int tuple_byte_size = tuple_desc->byte_size();
-    fixed_tuple_sizes_.push_back(tuple_byte_size);
-    fixed_tuple_row_size_ += tuple_byte_size;
-
-    vector<SlotDescriptor*> tuple_string_slots;
-    vector<SlotDescriptor*> tuple_coll_slots;
-    for (int j = 0; j < tuple_desc->slots().size(); ++j) {
-      SlotDescriptor* slot = tuple_desc->slots()[j];
-      if (!slot->type().IsVarLenType()) continue;
-      if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
-        if (slot->type().IsVarLenStringType()) {
-          tuple_string_slots.push_back(slot);
-        } else {
-          DCHECK(slot->type().IsCollectionType());
-          tuple_coll_slots.push_back(slot);
-        }
-      }
-    }
-    if (!tuple_string_slots.empty()) {
-      inlined_string_slots_.push_back(make_pair(i, tuple_string_slots));
-    }
-
-    if (!tuple_coll_slots.empty()) {
-      inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
-    }
-  }
-}
-
-BufferedTupleStream::~BufferedTupleStream() {
-  DCHECK(closed_);
-}
-
-// Returns the number of pinned blocks in the list. Only called in DCHECKs to validate
-// num_pinned_.
-int NumPinned(const list<BufferedBlockMgr::Block*>& blocks) {
-  int num_pinned = 0;
-  for (BufferedBlockMgr::Block* block : blocks) {
-    if (block->is_pinned() && block->is_max_size()) ++num_pinned;
-  }
-  return num_pinned;
-}
-
-string BufferedTupleStream::DebugString() const {
-  stringstream ss;
-  ss << "BufferedTupleStream num_rows=" << num_rows_ << " rows_returned="
-     << rows_returned_ << " pinned=" << (pinned_ ? "true" : "false")
-     << " delete_on_read=" << (delete_on_read_ ? "true" : "false")
-     << " closed=" << (closed_ ? "true" : "false")
-     << " num_pinned=" << num_pinned_
-     << " write_block=" << write_block_ << " read_block_=";
-  if (read_block_ == blocks_.end()) {
-    ss << "<end>";
-  } else {
-    ss << *read_block_;
-  }
-  ss << " blocks=[\n";
-  for (BufferedBlockMgr::Block* block : blocks_) {
-    ss << "{" << block->DebugString() << "}";
-    if (block != blocks_.back()) ss << ",\n";
-  }
-  ss << "]";
-  return ss.str();
-}
-
-Status BufferedTupleStream::Init(int node_id, RuntimeProfile* profile, bool pinned) {
-  if (profile != NULL) {
-    pin_timer_ = ADD_TIMER(profile, "PinTime");
-    unpin_timer_ = ADD_TIMER(profile, "UnpinTime");
-    get_new_block_timer_ = ADD_TIMER(profile, "GetNewBlockTime");
-  }
-
-  max_null_indicators_size_ = ComputeNumNullIndicatorBytes(block_mgr_->max_block_size());
-  if (UNLIKELY(max_null_indicators_size_ < 0)) {
-    // The block cannot even fit in a row of tuples so just assume there is one row.
-    int null_indicators_size =
-        BitUtil::RoundUpNumi64(desc_->tuple_descriptors().size()) * 8;
-    return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
-        PrettyPrinter::Print(fixed_tuple_row_size_, TUnit::BYTES),
-        PrettyPrinter::Print(null_indicators_size,  TUnit::BYTES));
-  }
-
-  if (block_mgr_->max_block_size() < INITIAL_BLOCK_SIZES[0]) {
-    use_small_buffers_ = false;
-  }
-  if (!pinned) RETURN_IF_ERROR(UnpinStream(UNPIN_ALL_EXCEPT_CURRENT));
-  return Status::OK();
-}
-
-Status BufferedTupleStream::PrepareForWrite(bool* got_buffer) {
-  DCHECK(write_block_ == NULL);
-  return NewWriteBlockForRow(fixed_tuple_row_size_, got_buffer);
-}
-
-Status BufferedTupleStream::SwitchToIoBuffers(bool* got_buffer) {
-  if (!use_small_buffers_) {
-    *got_buffer = (write_block_ != NULL);
-    return Status::OK();
-  }
-  use_small_buffers_ = false;
-  Status status =
-      NewWriteBlock(block_mgr_->max_block_size(), max_null_indicators_size_, got_buffer);
-  // IMPALA-2330: Set the flag using small buffers back to false in case it failed to
-  // got a buffer.
-  DCHECK(status.ok() || !*got_buffer) << status.ok() << " " << *got_buffer;
-  use_small_buffers_ = !*got_buffer;
-  return status;
-}
-
-void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) {
-  for (BufferedBlockMgr::Block* block : blocks_) {
-    if (batch != NULL && block->is_pinned()) {
-      batch->AddBlock(block, flush);
-    } else {
-      block->Delete();
-    }
-  }
-  blocks_.clear();
-  num_pinned_ = 0;
-  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-  closed_ = true;
-}
-
-int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const {
-  int64_t result = 0;
-  for (BufferedBlockMgr::Block* block : blocks_) {
-    if (!block->is_pinned()) continue;
-    if (!block->is_max_size()) continue;
-    if (block == write_block_ && ignore_current) continue;
-    result += block->buffer_len();
-  }
-  return result;
-}
-
-Status BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) {
-  SCOPED_TIMER(unpin_timer_);
-  DCHECK(block->is_pinned());
-  if (!block->is_max_size()) return Status::OK();
-  RETURN_IF_ERROR(block->Unpin());
-  --num_pinned_;
-  DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-  return Status::OK();
-}
-
-Status BufferedTupleStream::NewWriteBlock(
-    int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept {
-  DCHECK(!closed_);
-  DCHECK_GE(null_indicators_size, 0);
-  *got_block = false;
-
-  BufferedBlockMgr::Block* unpin_block = write_block_;
-  if (write_block_ != NULL) {
-    DCHECK(write_block_->is_pinned());
-    if (pinned_ || write_block_ == *read_block_ || !write_block_->is_max_size()) {
-      // In these cases, don't unpin the current write block.
-      unpin_block = NULL;
-    }
-  }
-
-  BufferedBlockMgr::Block* new_block = NULL;
-  {
-    SCOPED_TIMER(get_new_block_timer_);
-    RETURN_IF_ERROR(block_mgr_->GetNewBlock(
-        block_mgr_client_, unpin_block, &new_block, block_len));
-  }
-  *got_block = new_block != NULL;
-
-  if (!*got_block) {
-    DCHECK(unpin_block == NULL);
-    return Status::OK();
-  }
-
-  if (unpin_block != NULL) {
-    DCHECK(unpin_block == write_block_);
-    DCHECK(!write_block_->is_pinned());
-    --num_pinned_;
-    DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-  }
-
-  // Mark the entire block as containing valid data to avoid updating it as we go.
-  new_block->Allocate<uint8_t>(block_len);
-
-  // Compute and allocate the block header with the null indicators.
-  DCHECK_EQ(null_indicators_size, ComputeNumNullIndicatorBytes(block_len));
-  write_block_null_indicators_size_ = null_indicators_size;
-  write_tuple_idx_ = 0;
-  write_ptr_ = new_block->buffer() + write_block_null_indicators_size_;
-  write_end_ptr_ = new_block->buffer() + block_len;
-
-  blocks_.push_back(new_block);
-  block_start_idx_.push_back(new_block->buffer());
-  write_block_ = new_block;
-  DCHECK(write_block_->is_pinned());
-  DCHECK_EQ(write_block_->num_rows(), 0);
-  if (write_block_->is_max_size()) {
-    ++num_pinned_;
-    DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-  } else {
-    ++num_small_blocks_;
-  }
-  total_byte_size_ += block_len;
-  return Status::OK();
-}
-
-Status BufferedTupleStream::NewWriteBlockForRow(
-    int64_t row_size, bool* got_block) noexcept {
-  int64_t block_len = 0;
-  int64_t null_indicators_size = 0;
-  if (use_small_buffers_) {
-    *got_block = false;
-    if (blocks_.size() < NUM_SMALL_BLOCKS) {
-      block_len = INITIAL_BLOCK_SIZES[blocks_.size()];
-      null_indicators_size = ComputeNumNullIndicatorBytes(block_len);
-      // Use small buffer only if:
-      // 1. the small buffer's size is smaller than the configured max block size.
-      // 2. a single row of tuples and null indicators (if any) fit in the small buffer.
-      //
-      // If condition 2 above is not met, we will bail. An alternative would be
-      // to try the next larger small buffer.
-      *got_block = block_len < block_mgr_->max_block_size() &&
-          null_indicators_size >= 0 && row_size + null_indicators_size <= block_len;
-    }
-    // Do not switch to IO-buffers automatically. Do not get a buffer.
-    if (!*got_block) return Status::OK();
-  } else {
-    DCHECK_GE(max_null_indicators_size_, 0);
-    block_len = block_mgr_->max_block_size();
-    null_indicators_size = max_null_indicators_size_;
-    // Check if the size of row and null indicators exceeds the IO block size.
-    if (UNLIKELY(row_size + null_indicators_size > block_len)) {
-      return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
-          PrettyPrinter::Print(row_size, TUnit::BYTES),
-          PrettyPrinter::Print(null_indicators_size, TUnit::BYTES));
-    }
-  }
-  return NewWriteBlock(block_len, null_indicators_size, got_block);
-}
-
-Status BufferedTupleStream::NextReadBlock() {
-  DCHECK(!closed_);
-  DCHECK(read_block_ != blocks_.end());
-  DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << pinned_;
-
-  // If non-NULL, this will be the current block if we are going to free it while
-  // grabbing the next block. This will stay NULL if we don't want to free the
-  // current block.
-  BufferedBlockMgr::Block* block_to_free =
-      (!pinned_ || delete_on_read_) ? *read_block_ : NULL;
-  if (delete_on_read_) {
-    DCHECK(read_block_ == blocks_.begin());
-    DCHECK(*read_block_ != write_block_);
-    blocks_.pop_front();
-    read_block_ = blocks_.begin();
-    read_block_idx_ = 0;
-    if (block_to_free != NULL && !block_to_free->is_max_size()) {
-      block_to_free->Delete();
-      block_to_free = NULL;
-      DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString();
-    }
-  } else {
-    ++read_block_;
-    ++read_block_idx_;
-    if (block_to_free != NULL && !block_to_free->is_max_size()) block_to_free = NULL;
-  }
-
-  bool pinned = false;
-  if (read_block_ == blocks_.end() || (*read_block_)->is_pinned()) {
-    // End of the blocks or already pinned, just handle block_to_free
-    if (block_to_free != NULL) {
-      SCOPED_TIMER(unpin_timer_);
-      if (delete_on_read_) {
-        block_to_free->Delete();
-        --num_pinned_;
-      } else {
-        RETURN_IF_ERROR(UnpinBlock(block_to_free));
-      }
-    }
-  } else {
-    // Call into the block mgr to atomically unpin/delete the old block and pin the
-    // new block.
-    SCOPED_TIMER(pin_timer_);
-    RETURN_IF_ERROR((*read_block_)->Pin(&pinned, block_to_free, !delete_on_read_));
-    if (!pinned) {
-      DCHECK(block_to_free == NULL) << "Should have been able to pin."
-          << endl << block_mgr_->DebugString(block_mgr_client_);;
-    }
-    if (block_to_free == NULL && pinned) ++num_pinned_;
-  }
-
-  if (read_block_ != blocks_.end() && (*read_block_)->is_pinned()) {
-    read_block_null_indicators_size_ =
-        ComputeNumNullIndicatorBytes((*read_block_)->buffer_len());
-    DCHECK_GE(read_block_null_indicators_size_, 0);
-    read_tuple_idx_ = 0;
-    read_ptr_ = (*read_block_)->buffer() + read_block_null_indicators_size_;
-    read_end_ptr_ = (*read_block_)->buffer() + (*read_block_)->buffer_len();
-  }
-  DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString();
-  return Status::OK();
-}
-
-Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_buffer) {
-  DCHECK(!closed_);
-  if (blocks_.empty()) return Status::OK();
-
-  if (!read_write_ && write_block_ != NULL) {
-    DCHECK(write_block_->is_pinned());
-    if (!pinned_ && write_block_ != blocks_.front()) {
-      RETURN_IF_ERROR(UnpinBlock(write_block_));
-    }
-    write_block_ = NULL;
-  }
-
-  // Walk the blocks and pin the first IO-sized block.
-  for (BufferedBlockMgr::Block* block : blocks_) {
-    if (!block->is_pinned()) {
-      SCOPED_TIMER(pin_timer_);
-      bool current_pinned;
-      RETURN_IF_ERROR(block->Pin(&current_pinned));
-      if (!current_pinned) {
-        *got_buffer = false;
-        return Status::OK();
-      }
-      ++num_pinned_;
-      DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-    }
-    if (block->is_max_size()) break;
-  }
-
-  read_block_ = blocks_.begin();
-  DCHECK(read_block_ != blocks_.end());
-  read_block_null_indicators_size_ =
-      ComputeNumNullIndicatorBytes((*read_block_)->buffer_len());
-  DCHECK_GE(read_block_null_indicators_size_, 0);
-  read_tuple_idx_ = 0;
-  read_ptr_ = (*read_block_)->buffer() + read_block_null_indicators_size_;
-  read_end_ptr_ = (*read_block_)->buffer() + (*read_block_)->buffer_len();
-  rows_returned_ = 0;
-  read_block_idx_ = 0;
-  delete_on_read_ = delete_on_read;
-  *got_buffer = true;
-  return Status::OK();
-}
-
-Status BufferedTupleStream::PinStream(bool already_reserved, bool* pinned) {
-  DCHECK(!closed_);
-  DCHECK(pinned != NULL);
-  if (!already_reserved) {
-    // If we can't get all the blocks, don't try at all.
-    if (!block_mgr_->TryAcquireTmpReservation(block_mgr_client_, blocks_unpinned())) {
-      *pinned = false;
-      return Status::OK();
-    }
-  }
-
-  for (BufferedBlockMgr::Block* block : blocks_) {
-    if (block->is_pinned()) continue;
-    {
-      SCOPED_TIMER(pin_timer_);
-      RETURN_IF_ERROR(block->Pin(pinned));
-    }
-    if (!*pinned) {
-      VLOG_QUERY << "Should have been reserved." << endl
-                 << block_mgr_->DebugString(block_mgr_client_);
-      return Status::OK();
-    }
-    ++num_pinned_;
-    DCHECK_EQ(num_pinned_, NumPinned(blocks_));
-  }
-
-  if (!delete_on_read_) {
-    // Populate block_start_idx_ on pin.
-    DCHECK_EQ(block_start_idx_.size(), blocks_.size());
-    block_start_idx_.clear();
-    for (BufferedBlockMgr::Block* block : blocks_) {
-      block_start_idx_.push_back(block->buffer());
-    }
-  }
-  *pinned = true;
-  pinned_ = true;
-  return Status::OK();
-}
-
-Status BufferedTupleStream::UnpinStream(UnpinMode mode) {
-  DCHECK(!closed_);
-  DCHECK(mode == UNPIN_ALL || mode == UNPIN_ALL_EXCEPT_CURRENT);
-  SCOPED_TIMER(unpin_timer_);
-
-  for (BufferedBlockMgr::Block* block: blocks_) {
-    if (!block->is_pinned()) continue;
-    if (mode == UNPIN_ALL_EXCEPT_CURRENT
-        && (block == write_block_ || (read_write_ && block == *read_block_))) {
-      continue;
-    }
-    RETURN_IF_ERROR(UnpinBlock(block));
-  }
-  if (mode == UNPIN_ALL) {
-    read_block_ = blocks_.end();
-    write_block_ = NULL;
-  }
-  pinned_ = false;
-  return Status::OK();
-}
-
-int BufferedTupleStream::ComputeNumNullIndicatorBytes(int block_size) const {
-  if (has_nullable_tuple_) {
-    // We assume that all rows will use their max size, so we may be underutilizing the
-    // space, i.e. we may have some unused space in case of rows with NULL tuples.
-    const uint32_t tuples_per_row = desc_->tuple_descriptors().size();
-    const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row;
-    const uint32_t block_size_in_bits = 8 * block_size;
-    const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits;
-    if (UNLIKELY(max_num_rows == 0)) return -1;
-    return BitUtil::RoundUpNumi64(max_num_rows * tuples_per_row) * 8;
-  } else {
-    // If there are no nullable tuples then no need to waste space for null indicators.
-    return 0;
-  }
-}
-
-Status BufferedTupleStream::GetRows(scoped_ptr<RowBatch>* batch, bool* got_rows) {
-  if (num_rows() > numeric_limits<int>::max()) {
-    // RowBatch::num_rows_ is a 32-bit int, avoid an overflow.
-    return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit "
-        "is $1", num_rows(), numeric_limits<int>::max()));
-  }
-  RETURN_IF_ERROR(PinStream(false, got_rows));
-  if (!*got_rows) return Status::OK();
-  bool got_read_buffer;
-  RETURN_IF_ERROR(PrepareForRead(false, &got_read_buffer));
-  DCHECK(got_read_buffer) << "Stream was pinned";
-  batch->reset(
-      new RowBatch(desc_, num_rows(), block_mgr_->get_tracker(block_mgr_client_)));
-  bool eos = false;
-  // Loop until GetNext fills the entire batch. Each call can stop at block
-  // boundaries. We generally want it to stop, so that blocks can be freed
-  // as we read. It is safe in this case because we pin the entire stream.
-  while (!eos) {
-    RETURN_IF_ERROR(GetNext(batch->get(), &eos));
-  }
-  return Status::OK();
-}
-
-Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) {
-  return GetNextInternal<false>(batch, eos, NULL);
-}
-
-Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos,
-    vector<RowIdx>* indices) {
-  return GetNextInternal<true>(batch, eos, indices);
-}
-
-template <bool FILL_INDICES>
-Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos,
-    vector<RowIdx>* indices) {
-  if (has_nullable_tuple_) {
-    return GetNextInternal<FILL_INDICES, true>(batch, eos, indices);
-  } else {
-    return GetNextInternal<FILL_INDICES, false>(batch, eos, indices);
-  }
-}
-
-template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE>
-Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos,
-    vector<RowIdx>* indices) {
-  DCHECK(!closed_);
-  DCHECK(batch->row_desc()->LayoutEquals(*desc_));
-  *eos = (rows_returned_ == num_rows_);
-  if (*eos) return Status::OK();
-  DCHECK_GE(read_block_null_indicators_size_, 0);
-
-  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
-  DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows());
-  DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0);
-  int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row;
-
-  if (UNLIKELY(rows_returned_curr_block == (*read_block_)->num_rows())) {
-    // Get the next block in the stream. We need to do this at the beginning of
-    // the GetNext() call to ensure the buffer management semantics. NextReadBlock()
-    // will recycle the memory for the rows returned from the *previous* call to
-    // GetNext().
-    RETURN_IF_ERROR(NextReadBlock());
-    DCHECK(read_block_ != blocks_.end()) << DebugString();
-    DCHECK_GE(read_block_null_indicators_size_, 0);
-    rows_returned_curr_block = 0;
-  }
-
-  DCHECK(read_block_ != blocks_.end());
-  DCHECK((*read_block_)->is_pinned()) << DebugString();
-  DCHECK_GE(read_tuple_idx_, 0);
-
-  int rows_left_in_block = (*read_block_)->num_rows() - rows_returned_curr_block;
-  int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_block);
-  DCHECK_GE(rows_to_fill, 1);
-  batch->AddRows(rows_to_fill);
-  uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
-
-  // Produce tuple rows from the current block and the corresponding position on the
-  // null tuple indicator.
-  if (FILL_INDICES) {
-    DCHECK(indices != NULL);
-    DCHECK(!delete_on_read_);
-    DCHECK_EQ(batch->num_rows(), 0);
-    indices->clear();
-    indices->reserve(rows_to_fill);
-  }
-
-  uint8_t* null_word = NULL;
-  uint32_t null_pos = 0;
-  // Start reading from position read_tuple_idx_ in the block.
-  // IMPALA-2256: Special case if there are no materialized slots.
-  bool increment_row = RowConsumesMemory();
-  uint64_t last_read_row = increment_row * (read_tuple_idx_ / tuples_per_row);
-  for (int i = 0; i < rows_to_fill; ++i) {
-    if (FILL_INDICES) {
-      indices->push_back(RowIdx());
-      DCHECK_EQ(indices->size(), i + 1);
-      (*indices)[i].set(read_block_idx_, read_ptr_ - (*read_block_)->buffer(),
-          last_read_row);
-    }
-    // Copy the row into the output batch.
-    TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
-    if (HAS_NULLABLE_TUPLE) {
-      for (int j = 0; j < tuples_per_row; ++j) {
-        // Stitch together the tuples from the block and the NULL ones.
-        null_word = (*read_block_)->buffer() + (read_tuple_idx_ >> 3);
-        null_pos = read_tuple_idx_ & 7;
-        ++read_tuple_idx_;
-        const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
-        // Copy tuple and advance read_ptr_. If it is a NULL tuple, it calls SetTuple
-        // with Tuple* being 0x0. To do that we multiply the current read_ptr_ with
-        // false (0x0).
-        output_row->SetTuple(j, reinterpret_cast<Tuple*>(
-            reinterpret_cast<uint64_t>(read_ptr_) * is_not_null));
-        read_ptr_ += fixed_tuple_sizes_[j] * is_not_null;
-      }
-    } else {
-      // When we know that there are no nullable tuples we can skip null checks.
-      for (int j = 0; j < tuples_per_row; ++j) {
-        output_row->SetTuple(j, reinterpret_cast<Tuple*>(read_ptr_));
-        read_ptr_ += fixed_tuple_sizes_[j];
-      }
-      read_tuple_idx_ += tuples_per_row;
-    }
-    tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
-
-    // Update string slot ptrs, skipping external strings.
-    for (int j = 0; j < inlined_string_slots_.size(); ++j) {
-      Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first);
-      if (HAS_NULLABLE_TUPLE && tuple == NULL) continue;
-      FixUpStringsForRead(inlined_string_slots_[j].second, tuple);
-    }
-
-    // Update collection slot ptrs, skipping external collections. We traverse the
-    // collection structure in the same order as it was written to the stream, allowing
-    // us to infer the data layout based on the length of collections and strings.
-    for (int j = 0; j < inlined_coll_slots_.size(); ++j) {
-      Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first);
-      if (HAS_NULLABLE_TUPLE && tuple == NULL) continue;
-      FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple);
-    }
-    last_read_row += increment_row;
-  }
-
-  batch->CommitRows(rows_to_fill);
-  rows_returned_ += rows_to_fill;
-  *eos = (rows_returned_ == num_rows_);
-  if ((!pinned_ || delete_on_read_)
-      && rows_returned_curr_block + rows_to_fill == (*read_block_)->num_rows()) {
-    // No more data in this block. The batch must be immediately returned up the operator
-    // tree and deep copied so that NextReadBlock() can reuse the read block's buffer.
-    batch->MarkNeedsDeepCopy();
-  }
-  if (FILL_INDICES) DCHECK_EQ(indices->size(), rows_to_fill);
-  DCHECK_LE(read_ptr_, read_end_ptr_);
-  return Status::OK();
-}
-
-void BufferedTupleStream::FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots,
-    Tuple* tuple) {
-  DCHECK(tuple != NULL);
-  for (int i = 0; i < string_slots.size(); ++i) {
-    const SlotDescriptor* slot_desc = string_slots[i];
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-
-    StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    DCHECK_LE(sv->len, read_block_bytes_remaining());
-    sv->ptr = reinterpret_cast<char*>(read_ptr_);
-    read_ptr_ += sv->len;
-  }
-}
-
-void BufferedTupleStream::FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots,
-    Tuple* tuple) {
-  DCHECK(tuple != NULL);
-  for (int i = 0; i < collection_slots.size(); ++i) {
-    const SlotDescriptor* slot_desc = collection_slots[i];
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-
-    CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
-    int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-    DCHECK_LE(coll_byte_size, read_block_bytes_remaining());
-    cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
-    read_ptr_ += coll_byte_size;
-
-    if (!item_desc.HasVarlenSlots()) continue;
-    uint8_t* coll_data = cv->ptr;
-    for (int j = 0; j < cv->num_tuples; ++j) {
-      Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-      FixUpStringsForRead(item_desc.string_slots(), item);
-      FixUpCollectionsForRead(item_desc.collection_slots(), item);
-      coll_data += item_desc.byte_size();
-    }
-  }
-}
-
-int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept {
-  int64_t size = 0;
-  if (has_nullable_tuple_) {
-    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
-      if (row->GetTuple(i) != NULL) size += fixed_tuple_sizes_[i];
-    }
-  } else {
-    size = fixed_tuple_row_size_;
-  }
-  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
-    Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
-    if (tuple == NULL) continue;
-    const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second;
-    for (auto it = slots.begin(); it != slots.end(); ++it) {
-      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
-      size += tuple->GetStringSlot((*it)->tuple_offset())->len;
-    }
-  }
-
-  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
-    Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
-    if (tuple == NULL) continue;
-    const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second;
-    for (auto it = slots.begin(); it != slots.end(); ++it) {
-      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
-      CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset());
-      const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor();
-      size += cv->num_tuples * item_desc.byte_size();
-
-      if (!item_desc.HasVarlenSlots()) continue;
-      for (int j = 0; j < cv->num_tuples; ++j) {
-        Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]);
-        size += item->VarlenByteSize(item_desc);
-      }
-    }
-  }
-  return size;
-}
-
-bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept {
-  bool got_block;
-  int64_t row_size = ComputeRowSize(row);
-  *status = NewWriteBlockForRow(row_size, &got_block);
-  if (!status->ok() || !got_block) return false;
-  return DeepCopy(row);
-}
-
-bool BufferedTupleStream::DeepCopy(TupleRow* row) noexcept {
-  if (has_nullable_tuple_) {
-    return DeepCopyInternal<true>(row);
-  } else {
-    return DeepCopyInternal<false>(row);
-  }
-}
-
-// TODO: this really needs codegen
-// TODO: in case of duplicate tuples, this can redundantly serialize data.
-template <bool HasNullableTuple>
-bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) noexcept {
-  if (UNLIKELY(write_block_ == NULL)) return false;
-  DCHECK_GE(write_block_null_indicators_size_, 0);
-  DCHECK(write_block_->is_pinned()) << DebugString() << std::endl
-      << write_block_->DebugString();
-
-  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
-  uint32_t bytes_remaining = write_block_bytes_remaining();
-  if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_) ||
-              (HasNullableTuple &&
-              (write_tuple_idx_ + tuples_per_row > write_block_null_indicators_size_ * 8)))) {
-    return false;
-  }
-
-  // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple
-  // indicator.
-  if (HasNullableTuple) {
-    DCHECK_GT(write_block_null_indicators_size_, 0);
-    uint8_t* null_word = NULL;
-    uint32_t null_pos = 0;
-    for (int i = 0; i < tuples_per_row; ++i) {
-      null_word = write_block_->buffer() + (write_tuple_idx_ >> 3); // / 8
-      null_pos = write_tuple_idx_ & 7;
-      ++write_tuple_idx_;
-      const int tuple_size = fixed_tuple_sizes_[i];
-      Tuple* t = row->GetTuple(i);
-      const uint8_t mask = 1 << (7 - null_pos);
-      if (t != NULL) {
-        *null_word &= ~mask;
-        memcpy(write_ptr_, t, tuple_size);
-        write_ptr_ += tuple_size;
-      } else {
-        *null_word |= mask;
-      }
-    }
-    DCHECK_LE(write_tuple_idx_ - 1, write_block_null_indicators_size_ * 8);
-  } else {
-    // If we know that there are no nullable tuples no need to set the nullability flags.
-    DCHECK_EQ(write_block_null_indicators_size_, 0);
-    for (int i = 0; i < tuples_per_row; ++i) {
-      const int tuple_size = fixed_tuple_sizes_[i];
-      Tuple* t = row->GetTuple(i);
-      // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots)
-      // is delivered, the check below should become DCHECK(t != NULL).
-      DCHECK(t != NULL || tuple_size == 0);
-      memcpy(write_ptr_, t, tuple_size);
-      write_ptr_ += tuple_size;
-    }
-  }
-
-  // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets
-  // on the write path, only on the read. The tuple data is immediately followed
-  // by the string data so only the len information is necessary.
-  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
-    const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
-    if (HasNullableTuple && tuple == NULL) continue;
-    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false;
-  }
-
-  // Copy inlined collection slots. We copy collection data in a well-defined order so
-  // we do not need to convert pointers to offsets on the write path.
-  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
-    const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
-    if (HasNullableTuple && tuple == NULL) continue;
-    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false;
-  }
-
-  write_block_->AddRow();
-  ++num_rows_;
-  return true;
-}
-
-bool BufferedTupleStream::CopyStrings(const Tuple* tuple,
-    const vector<SlotDescriptor*>& string_slots) {
-  for (int i = 0; i < string_slots.size(); ++i) {
-    const SlotDescriptor* slot_desc = string_slots[i];
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-    const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    if (LIKELY(sv->len > 0)) {
-      if (UNLIKELY(write_block_bytes_remaining() < sv->len)) return false;
-
-      memcpy(write_ptr_, sv->ptr, sv->len);
-      write_ptr_ += sv->len;
-    }
-  }
-  return true;
-}
-
-bool BufferedTupleStream::CopyCollections(const Tuple* tuple,
-    const vector<SlotDescriptor*>& collection_slots) {
-  for (int i = 0; i < collection_slots.size(); ++i) {
-    const SlotDescriptor* slot_desc = collection_slots[i];
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-    const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
-    if (LIKELY(cv->num_tuples > 0)) {
-      int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-      if (UNLIKELY(write_block_bytes_remaining() < coll_byte_size)) return false;
-      uint8_t* coll_data = write_ptr_;
-      memcpy(coll_data, cv->ptr, coll_byte_size);
-      write_ptr_ += coll_byte_size;
-
-      if (!item_desc.HasVarlenSlots()) continue;
-      // Copy variable length data when present in collection items.
-      for (int j = 0; j < cv->num_tuples; ++j) {
-        const Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false;
-        if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false;
-        coll_data += item_desc.byte_size();
-      }
-    }
-  }
-  return true;
-}
-
-void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const {
-  DCHECK(row != NULL);
-  DCHECK(!closed_);
-  DCHECK(is_pinned());
-  DCHECK(!delete_on_read_);
-  DCHECK_EQ(blocks_.size(), block_start_idx_.size());
-  DCHECK_LT(idx.block(), blocks_.size());
-
-  uint8_t* data = block_start_idx_[idx.block()] + idx.offset();
-  if (has_nullable_tuple_) {
-    // Stitch together the tuples from the block and the NULL ones.
-    const int tuples_per_row = desc_->tuple_descriptors().size();
-    uint32_t tuple_idx = idx.idx() * tuples_per_row;
-    for (int i = 0; i < tuples_per_row; ++i) {
-      const uint8_t* null_word = block_start_idx_[idx.block()] + (tuple_idx >> 3);
-      const uint32_t null_pos = tuple_idx & 7;
-      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
-      row->SetTuple(i, reinterpret_cast<Tuple*>(
-          reinterpret_cast<uint64_t>(data) * is_not_null));
-      data += desc_->tuple_descriptors()[i]->byte_size() * is_not_null;
-      ++tuple_idx;
-    }
-  } else {
-    for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
-      row->SetTuple(i, reinterpret_cast<Tuple*>(data));
-      data += desc_->tuple_descriptors()[i]->byte_size();
-    }
-  }
-}