You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/03/16 22:52:41 UTC

[1/3] incubator-impala git commit: IMPALA-4674: Part 1: port BufferedTupleStream to BufferPool

Repository: incubator-impala
Updated Branches:
  refs/heads/master 0ff1e6e8d -> 663285244


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 0d7f262..c7b916b 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -32,14 +32,16 @@
 #include "exprs/expr.h"
 #include "exprs/scalar-fn-call.h"
 #include "runtime/buffered-block-mgr.h"
-#include "runtime/exec-env.h"
-#include "runtime/descriptors.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/data-stream-recvr.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/query-state.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/timestamp-value.h"
-#include "runtime/query-state.h"
+#include "util/auth-util.h" // for GetEffectiveUser()
 #include "util/bitmap.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
@@ -48,7 +50,6 @@
 #include "util/jni-util.h"
 #include "util/mem-info.h"
 #include "util/pretty-printer.h"
-#include "util/auth-util.h" // for GetEffectiveUser()
 
 #include "common/names.h"
 
@@ -83,6 +84,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag
     exec_env_(exec_env),
     profile_(obj_pool_.get(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)),
     query_mem_tracker_(query_state_->query_mem_tracker()),
+    instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
@@ -100,6 +102,7 @@ RuntimeState::RuntimeState(
     profile_(obj_pool_.get(), "<unnamed>"),
     query_mem_tracker_(MemTracker::CreateQueryMemTracker(
         query_id(), query_options(), request_pool, obj_pool_.get())),
+    instance_buffer_reservation_(nullptr),
     is_cancelled_(false),
     root_node_id_(-1) {
   Init();
@@ -113,10 +116,8 @@ void RuntimeState::Init() {
   SCOPED_TIMER(profile_.total_time_counter());
 
   // Register with the thread mgr
-  if (exec_env_ != NULL) {
-    resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
-    DCHECK(resource_pool_ != NULL);
-  }
+  resource_pool_ = exec_env_->thread_mgr()->RegisterPool();
+  DCHECK(resource_pool_ != NULL);
 
   total_thread_statistics_ = ADD_THREAD_COUNTERS(runtime_profile(), "TotalThreads");
   total_storage_wait_timer_ = ADD_TIMER(runtime_profile(), "TotalStorageWaitTime");
@@ -125,6 +126,13 @@ void RuntimeState::Init() {
 
   instance_mem_tracker_.reset(new MemTracker(
       runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_));
+
+  if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) {
+    instance_buffer_reservation_ = obj_pool_->Add(new ReservationTracker);
+    instance_buffer_reservation_->InitChildTracker(&profile_,
+        query_state_->buffer_reservation(), instance_mem_tracker_.get(),
+        numeric_limits<int64_t>::max());
+  }
 }
 
 void RuntimeState::InitFilterBank() {
@@ -291,6 +299,9 @@ void RuntimeState::ReleaseResources() {
   block_mgr_.reset(); // Release any block mgr memory, if this is the last reference.
   codegen_.reset(); // Release any memory associated with codegen.
 
+  // Release the reservation, which should be unused at the point.
+  if (instance_buffer_reservation_ != nullptr) instance_buffer_reservation_->Close();
+
   // 'query_mem_tracker_' must be valid as long as 'instance_mem_tracker_' is so
   // delete 'instance_mem_tracker_' first.
   // LogUsage() walks the MemTracker tree top-down when the memory limit is exceeded, so

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index 97014aa..009fee5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -41,6 +41,7 @@ class Expr;
 class LlvmCodeGen;
 class MemTracker;
 class ObjectPool;
+class ReservationTracker;
 class RuntimeFilterBank;
 class ScalarFnCall;
 class Status;
@@ -128,6 +129,9 @@ class RuntimeState {
   DiskIoMgr* io_mgr();
   MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); }
   MemTracker* query_mem_tracker() { return query_mem_tracker_; }
+  ReservationTracker* instance_buffer_reservation() {
+    return instance_buffer_reservation_;
+  }
   ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; }
 
   FileMoveMap* hdfs_files_to_move() { return &hdfs_files_to_move_; }
@@ -385,6 +389,11 @@ class RuntimeState {
   /// Memory usage of this fragment instance, a child of 'query_mem_tracker_'.
   boost::scoped_ptr<MemTracker> instance_mem_tracker_;
 
+  /// Buffer reservation for this fragment instance - a child of the query buffer
+  /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_
+  /// was created by a backend test. Owned by 'obj_pool_'.
+  ReservationTracker* instance_buffer_reservation_;
+
   /// if true, execution should stop with a CANCELLED status
   bool is_cancelled_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 1c28acd..026b2ee 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -17,7 +17,11 @@
 
 #include "runtime/test-env.h"
 
+#include <limits>
+
+#include "runtime/buffered-block-mgr.h"
 #include "runtime/query-exec-mgr.h"
+#include "runtime/tmp-file-mgr.h"
 #include "util/disk-info.h"
 #include "util/impalad-metrics.h"
 
@@ -28,46 +32,56 @@
 #include "common/names.h"
 
 using boost::scoped_ptr;
+using std::numeric_limits;
 
 namespace impala {
 
 scoped_ptr<MetricGroup> TestEnv::static_metrics_;
 
-TestEnv::TestEnv() {
+TestEnv::TestEnv()
+  : have_tmp_file_mgr_args_(false),
+    buffer_pool_min_buffer_len_(1024),
+    buffer_pool_capacity_(0) {}
+
+Status TestEnv::Init() {
   if (static_metrics_ == NULL) {
     static_metrics_.reset(new MetricGroup("test-env-static-metrics"));
     ImpaladMetrics::CreateMetrics(static_metrics_.get());
   }
+
   exec_env_.reset(new ExecEnv);
-  exec_env_->InitForFeTests();
-  io_mgr_tracker_.reset(new MemTracker(-1));
-  Status status = exec_env_->disk_io_mgr()->Init(io_mgr_tracker_.get());
-  CHECK(status.ok()) << status.msg().msg();
-  InitMetrics();
-  tmp_file_mgr_.reset(new TmpFileMgr);
-  status = tmp_file_mgr_->Init(metrics_.get());
-  CHECK(status.ok()) << status.msg().msg();
+  // Populate the ExecEnv state that the backend tests need.
+  exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
+  RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker()));
+  exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
+  exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
+  if (have_tmp_file_mgr_args_) {
+    RETURN_IF_ERROR(
+        tmp_file_mgr()->InitCustom(tmp_dirs_, one_tmp_dir_per_device_, metrics()));
+  } else {
+    RETURN_IF_ERROR(tmp_file_mgr()->Init(metrics()));
+  }
+  exec_env_->InitBufferPool(buffer_pool_min_buffer_len_, buffer_pool_capacity_);
+  return Status::OK();
 }
 
-void TestEnv::InitMetrics() {
-  metrics_.reset(new MetricGroup("test-env-metrics"));
+void TestEnv::SetTmpFileMgrArgs(
+    const std::vector<std::string>& tmp_dirs, bool one_dir_per_device) {
+  have_tmp_file_mgr_args_ = true;
+  tmp_dirs_ = tmp_dirs;
+  one_tmp_dir_per_device_ = one_dir_per_device;
 }
 
-void TestEnv::InitTmpFileMgr(const vector<string>& tmp_dirs, bool one_dir_per_device) {
-  // Need to recreate metrics to avoid error when registering metric twice.
-  InitMetrics();
-  tmp_file_mgr_.reset(new TmpFileMgr);
-  Status status = tmp_file_mgr_->InitCustom(tmp_dirs, one_dir_per_device, metrics_.get());
-  CHECK(status.ok()) << status.msg().msg();
+void TestEnv::SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity) {
+  buffer_pool_min_buffer_len_ = min_buffer_len;
+  buffer_pool_capacity_ = capacity;
 }
 
 TestEnv::~TestEnv() {
   // Queries must be torn down first since they are dependent on global state.
   TearDownQueries();
+  exec_env_->disk_io_mgr_.reset();
   exec_env_.reset();
-  io_mgr_tracker_.reset();
-  tmp_file_mgr_.reset();
-  metrics_.reset();
 }
 
 void TestEnv::TearDownQueries() {
@@ -93,8 +107,8 @@ int64_t TestEnv::TotalQueryMemoryConsumption() {
   return total;
 }
 
-Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-    const TQueryOptions* query_options, RuntimeState** runtime_state) {
+Status TestEnv::CreateQueryState(
+    int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state) {
   TQueryCtx query_ctx;
   if (query_options != nullptr) query_ctx.client_request.query_options = *query_options;
   query_ctx.query_id.hi = 0;
@@ -110,13 +124,20 @@ Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_si
       new RuntimeState(qs, fis->fragment_ctx(), fis->instance_ctx(), exec_env_.get()));
   runtime_states_.push_back(rs);
 
+  *runtime_state = rs;
+  return Status::OK();
+}
+
+Status TestEnv::CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers,
+    int block_size, const TQueryOptions* query_options, RuntimeState** runtime_state) {
+  RETURN_IF_ERROR(CreateQueryState(query_id, query_options, runtime_state));
+
   shared_ptr<BufferedBlockMgr> mgr;
-  RETURN_IF_ERROR(BufferedBlockMgr::Create(rs, qs->query_mem_tracker(),
-      rs->runtime_profile(), tmp_file_mgr_.get(),
+  RETURN_IF_ERROR(BufferedBlockMgr::Create(*runtime_state,
+      (*runtime_state)->query_state()->query_mem_tracker(),
+      (*runtime_state)->runtime_profile(), tmp_file_mgr(),
       CalculateMemLimit(max_buffers, block_size), block_size, &mgr));
-  rs->set_block_mgr(mgr);
-
-  if (runtime_state != nullptr) *runtime_state = rs;
+  (*runtime_state)->set_block_mgr(mgr);
   return Status::OK();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/test-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.h b/be/src/runtime/test-env.h
index 3f2eaec..30e9309 100644
--- a/be/src/runtime/test-env.h
+++ b/be/src/runtime/test-env.h
@@ -18,7 +18,6 @@
 #ifndef IMPALA_RUNTIME_TEST_ENV
 #define IMPALA_RUNTIME_TEST_ENV
 
-#include "runtime/buffered-block-mgr.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
@@ -28,52 +27,69 @@
 
 namespace impala {
 
-/// Helper testing class that creates an environment with a buffered-block-mgr similar
-/// to the one Impala's runtime is using. Only one TestEnv can be active at a time,
-/// because it replaces the global ExecEnv singleton.
+/// Helper testing class that creates an environment with runtime memory management
+/// similar to the one used by the Impala runtime. Only one TestEnv can be active at a
+/// time, because it modifies the global ExecEnv singleton.
 class TestEnv {
  public:
   TestEnv();
   ~TestEnv();
 
-  /// Reinitialize tmp_file_mgr with custom configuration. Only valid to call before
-  /// query states have been created.
-  void InitTmpFileMgr(const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
+  /// Set custom configuration for TmpFileMgr. Only has effect if called before Init().
+  /// If not called, the default configuration is used.
+  void SetTmpFileMgrArgs(
+      const std::vector<std::string>& tmp_dirs, bool one_dir_per_device);
 
-  /// Create a QueryState and a RuntimeState for a query with a new block manager and
-  /// the given query options. The states are owned by the TestEnv. Returns an error if
-  /// CreateQueryState() has been called with the same query ID already.
-  /// If non-null, 'runtime_state' are set to the newly created RuntimeState. The
-  /// QueryState can be obtained via 'runtime_state'.
-  Status CreateQueryState(int64_t query_id, int max_buffers, int block_size,
-      const TQueryOptions* query_options, RuntimeState** runtime_state);
+  /// Set configuration for BufferPool. Only has effect if called before Init().
+  /// If not called, a buffer pool with no capacity is created.
+  void SetBufferPoolArgs(int64_t min_buffer_len, int64_t capacity);
+
+  /// Initialize the TestEnv with the specified arguments.
+  Status Init();
+
+  /// Create a QueryState and a RuntimeState for a query with the given query options.
+  /// The states are owned by the TestEnv. Returns an error if CreateQueryState() has
+  /// been called with the same query ID already. 'runtime_state' is set to the newly
+  /// created RuntimeState. The QueryState can be obtained via 'runtime_state'.
+  Status CreateQueryState(
+      int64_t query_id, const TQueryOptions* query_options, RuntimeState** runtime_state);
 
+  /// Same as CreateQueryState() but also creates a BufferedBlockMgr with the provided
+  /// parameters. If 'max_buffers' is -1, there is no limit, otherwise the limit is
+  /// max_buffers * block_size.
+  Status CreateQueryStateWithBlockMgr(int64_t query_id, int max_buffers, int block_size,
+      const TQueryOptions* query_options, RuntimeState** runtime_state);
   /// Destroy all query states and associated RuntimeStates, BufferedBlockMgrs,
   /// etc, that were created since the last TearDownQueries() call.
   void TearDownQueries();
 
   /// Calculate memory limit accounting for overflow and negative values.
   /// If max_buffers is -1, no memory limit will apply.
-  int64_t CalculateMemLimit(int max_buffers, int block_size);
+  int64_t CalculateMemLimit(int max_buffers, int page_len);
 
   /// Return total of mem tracker consumption for all queries.
   int64_t TotalQueryMemoryConsumption();
 
   ExecEnv* exec_env() { return exec_env_.get(); }
-  MemTracker* io_mgr_tracker() { return io_mgr_tracker_.get(); }
-  MetricGroup* metrics() { return metrics_.get(); }
-  TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
+  MetricGroup* metrics() { return exec_env_->metrics(); }
+  TmpFileMgr* tmp_file_mgr() { return exec_env_->tmp_file_mgr(); }
 
  private:
   /// Recreate global metric groups.
   void InitMetrics();
 
+  /// Arguments for TmpFileMgr, used in Init().
+  bool have_tmp_file_mgr_args_;
+  std::vector<std::string> tmp_dirs_;
+  bool one_tmp_dir_per_device_;
+
+  /// Arguments for BufferPool, used in Init().
+  int64_t buffer_pool_min_buffer_len_;
+  int64_t buffer_pool_capacity_;
+
   /// Global state for test environment.
   static boost::scoped_ptr<MetricGroup> static_metrics_;
   boost::scoped_ptr<ExecEnv> exec_env_;
-  boost::scoped_ptr<MemTracker> io_mgr_tracker_;
-  boost::scoped_ptr<MetricGroup> metrics_;
-  boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
 
   /// Per-query states. TestEnv holds 1 refcount per QueryState in this map.
   std::vector<QueryState*> query_states_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 59b5af4..b220fff 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -52,6 +52,7 @@ class TmpFileMgrTest : public ::testing::Test {
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "tmp-file-mgr-test"));
     test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
     cb_counter_ = 0;
 
     // Reset query options that are modified by tests.


[3/3] incubator-impala git commit: IMPALA-4674: Part 1: port BufferedTupleStream to BufferPool

Posted by ta...@apache.org.
IMPALA-4674: Part 1: port BufferedTupleStream to BufferPool

Add a copy of BufferedTupleStream that allocates memory from BufferPool.
This will replace the original implementation when IMPALA-3200 is
completed.

The major changes are:
* Terminology is updated from "blocks" to "pages"
* No small buffer support is needed (hooray!).
* BufferedTupleStream needs to do its own tracking of # of rows per
  page, etc instead of relying on BufferedBlockMgr to do it. A
  wrapper around PageHandle is used.
* Profile counters (unpin, pin, get new block time) are removed -
  similar counters in the BufferPool client are more useful.
* Changed the tuple null indicators so that they are allocated
  before each tuple, rather than in a block at the start of the
  page. This slightly reduces the memory density, but greatly
  simplifies much logic. In particular, it avoids problems with
  RowIdx and larger pages with offsets that don't fit in 16 bits.
* Buffer management of read/write streams uses the new pin-counting
  support to separate pinning of the read and write pages.
  This means that the reservation usage of an unpinned read/write
  stream does not fluctuate and the read/write iterators can always
  advance without requiring additional reservation.

Testing this required some further changes. TestEnv was refactored
so it can set up either BufferPool or BufferedBlockMgr. Some
BufferPool-related state is added to ExecEnv, QueryState and
RuntimeState, but is only created for backend tests that explicitly
create a BufferPool.

The following is left for future work:
* IMPALA-3808 (large row support) is not added. I've added
  TODOs to the code to places that will require changes.
* IMPALA-4179 (remove MarkNeedsDeepCopy()) is not fixed, since
  it requires global changes to operators that accumulate memory.

Testing:
All of the BufferedTupleStream unit tests are ported to the new
implementation, except for ones specifically testing the small
buffer functionality.

Change-Id: I7bb47a818564ac19a6be4ca88db0578f4ea0b709
Reviewed-on: http://gerrit.cloudera.org:8080/5811
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/66328524
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/66328524
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/66328524

Branch: refs/heads/master
Commit: 663285244e413cfccb03a281d80403b15fdae663
Parents: 0ff1e6e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Thu Jan 26 16:41:27 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 16 21:07:58 2017 +0000

----------------------------------------------------------------------
 be/src/benchmarks/hash-benchmark.cc             |    7 +-
 be/src/codegen/llvm-codegen-test.cc             |    4 +-
 be/src/exec/hash-table-test.cc                  |    5 +-
 be/src/exprs/expr-codegen-test.cc               |    4 +-
 be/src/runtime/CMakeLists.txt                   |    2 +
 be/src/runtime/buffered-block-mgr-test.cc       |   11 +-
 be/src/runtime/buffered-tuple-stream-test.cc    |    3 +-
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 1240 ++++++++++++++++++
 be/src/runtime/buffered-tuple-stream-v2.cc      |  812 ++++++++++++
 be/src/runtime/buffered-tuple-stream-v2.h       |  592 +++++++++
 .../runtime/buffered-tuple-stream-v2.inline.h   |   62 +
 be/src/runtime/bufferpool/buffer-pool-test.cc   |    5 +-
 be/src/runtime/exec-env.cc                      |   30 +-
 be/src/runtime/exec-env.h                       |   18 +-
 be/src/runtime/query-state.cc                   |   70 +-
 be/src/runtime/query-state.h                    |   18 +
 be/src/runtime/row-batch.cc                     |   31 +-
 be/src/runtime/row-batch.h                      |   31 +-
 be/src/runtime/runtime-state.cc                 |   27 +-
 be/src/runtime/runtime-state.h                  |    9 +
 be/src/runtime/test-env.cc                      |   75 +-
 be/src/runtime/test-env.h                       |   58 +-
 be/src/runtime/tmp-file-mgr-test.cc             |    1 +
 23 files changed, 3011 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/benchmarks/hash-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/hash-benchmark.cc b/be/src/benchmarks/hash-benchmark.cc
index fb37bed..5f3a8d8 100644
--- a/be/src/benchmarks/hash-benchmark.cc
+++ b/be/src/benchmarks/hash-benchmark.cc
@@ -423,7 +423,12 @@ int main(int argc, char **argv) {
   Status status;
   RuntimeState* state;
   TestEnv test_env;
-  status = test_env.CreateQueryState(0, 0, 0, nullptr, &state);
+  status = test_env.Init();
+  if (!status.ok()) {
+    cout << "Could not init TestEnv";
+    return -1;
+  }
+  status = test_env.CreateQueryState(0, nullptr, &state);
   if (!status.ok()) {
     cout << "Could not create RuntimeState";
     return -1;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/codegen/llvm-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/codegen/llvm-codegen-test.cc b/be/src/codegen/llvm-codegen-test.cc
index 59c66f4..719a84e 100644
--- a/be/src/codegen/llvm-codegen-test.cc
+++ b/be/src/codegen/llvm-codegen-test.cc
@@ -45,8 +45,8 @@ class LlvmCodeGenTest : public testing:: Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
-    EXPECT_OK(test_env_->CreateQueryState(0, 1, 8 * 1024 * 1024, nullptr,
-        &runtime_state_));
+    ASSERT_OK(test_env_->Init());
+    ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
   }
 
   virtual void TearDown() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/exec/hash-table-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-table-test.cc b/be/src/exec/hash-table-test.cc
index 230ebb5..e776a68 100644
--- a/be/src/exec/hash-table-test.cc
+++ b/be/src/exec/hash-table-test.cc
@@ -62,6 +62,7 @@ class HashTableTest : public testing::Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
+    ASSERT_OK(test_env_->Init());
 
     RowDescriptor desc;
     Status status;
@@ -185,7 +186,7 @@ class HashTableTest : public testing::Test {
   bool CreateHashTable(bool quadratic, int64_t initial_num_buckets,
       scoped_ptr<HashTable>* table, int block_size = 8 * 1024 * 1024,
       int max_num_blocks = 100, int reserved_blocks = 10) {
-    EXPECT_OK(test_env_->CreateQueryState(
+    EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
         next_query_id_++, max_num_blocks, block_size, nullptr, &runtime_state_));
     MemTracker* client_tracker =
         pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
@@ -603,7 +604,7 @@ TEST_F(HashTableTest, QuadraticInsertFullTest) {
 
 // Test that hashing empty string updates hash value.
 TEST_F(HashTableTest, HashEmpty) {
-  EXPECT_OK(test_env_->CreateQueryState(
+  EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
       0, 100, 8 * 1024 * 1024, nullptr, &runtime_state_));
   scoped_ptr<HashTableCtx> ht_ctx;
   Status status = HashTableCtx::Create(runtime_state_, build_expr_ctxs_, probe_expr_ctxs_,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/exprs/expr-codegen-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-codegen-test.cc b/be/src/exprs/expr-codegen-test.cc
index c73c89e..c027492 100644
--- a/be/src/exprs/expr-codegen-test.cc
+++ b/be/src/exprs/expr-codegen-test.cc
@@ -110,8 +110,8 @@ class ExprCodegenTest : public ::testing::Test {
     TQueryOptions query_options;
     query_options.__set_decimal_v2(true);
     test_env_.reset(new TestEnv());
-    EXPECT_OK(test_env_->CreateQueryState(0, 1, 8 * 1024 * 1024, &query_options,
-        &runtime_state_));
+    ASSERT_OK(test_env_->Init());
+    ASSERT_OK(test_env_->CreateQueryState(0, &query_options, &runtime_state_));
 
     FunctionContext::TypeDesc return_type;
     return_type.type = FunctionContext::TYPE_DECIMAL;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 640ab39..773192d 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -26,6 +26,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 add_library(Runtime
   buffered-block-mgr.cc
   buffered-tuple-stream.cc
+  buffered-tuple-stream-v2.cc
   client-cache.cc
   coordinator.cc
   data-stream-mgr.cc
@@ -92,6 +93,7 @@ ADD_BE_TEST(mem-tracker-test)
 ADD_BE_TEST(multi-precision-test)
 ADD_BE_TEST(decimal-test)
 ADD_BE_TEST(buffered-tuple-stream-test)
+ADD_BE_TEST(buffered-tuple-stream-v2-test)
 ADD_BE_TEST(hdfs-fs-cache-test)
 ADD_BE_TEST(tmp-file-mgr-test)
 ADD_BE_TEST(row-batch-serialize-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 48b2f6a..bd60a2c 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -77,6 +77,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
 
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
+    ASSERT_OK(test_env_->Init());
   }
 
   virtual void TearDown() {
@@ -103,7 +104,9 @@ class BufferedBlockMgrTest : public ::testing::Test {
       tmp_dirs.push_back(dir);
       created_tmp_dirs_.push_back(dir);
     }
-    test_env_->InitTmpFileMgr(tmp_dirs, false);
+    test_env_.reset(new TestEnv);
+    test_env_->SetTmpFileMgrArgs(tmp_dirs, false);
+    EXPECT_OK(test_env_->Init());
     EXPECT_EQ(num_dirs, test_env_->tmp_file_mgr()->NumActiveTmpDevices());
     return tmp_dirs;
   }
@@ -146,7 +149,7 @@ class BufferedBlockMgrTest : public ::testing::Test {
   BufferedBlockMgr* CreateMgr(int64_t query_id, int max_buffers, int block_size,
       RuntimeState** query_state = NULL, TQueryOptions* query_options = NULL) {
     RuntimeState* state;
-    EXPECT_OK(test_env_->CreateQueryState(
+    EXPECT_OK(test_env_->CreateQueryStateWithBlockMgr(
         query_id, max_buffers, block_size, query_options, &state));
     if (query_state != NULL) *query_state = state;
     return state->block_mgr();
@@ -1271,8 +1274,8 @@ TEST_F(BufferedBlockMgrTest, ScratchLimitZero) {
   BufferedBlockMgr::Client* client;
   TQueryOptions query_options;
   query_options.scratch_limit = 0;
-  BufferedBlockMgr* block_mgr = CreateMgrAndClient(0, max_num_buffers, block_size_,
-      0, false, &client, NULL, &query_options);
+  BufferedBlockMgr* block_mgr = CreateMgrAndClient(
+      0, max_num_buffers, block_size_, 0, false, &client, NULL, &query_options);
   vector<BufferedBlockMgr::Block*> blocks;
   AllocateBlocks(block_mgr, client, max_num_buffers, &blocks);
   DeleteBlocks(blocks);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 862a9a2..7793767 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -69,6 +69,7 @@ class SimpleTupleStreamTest : public testing::Test {
  protected:
   virtual void SetUp() {
     test_env_.reset(new TestEnv());
+    ASSERT_OK(test_env_->Init());
 
     CreateDescriptors();
 
@@ -101,7 +102,7 @@ class SimpleTupleStreamTest : public testing::Test {
   /// Setup a block manager with the provided settings and client with no reservation,
   /// tracked by tracker_.
   void InitBlockMgr(int64_t limit, int block_size) {
-    ASSERT_OK(test_env_->CreateQueryState(
+    ASSERT_OK(test_env_->CreateQueryStateWithBlockMgr(
         0, limit, block_size, nullptr, &runtime_state_));
     MemTracker* client_tracker =
         pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc b/be/src/runtime/buffered-tuple-stream-v2-test.cc
new file mode 100644
index 0000000..5ff6471
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -0,0 +1,1240 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <boost/bind.hpp>
+#include <boost/filesystem.hpp>
+#include <boost/scoped_ptr.hpp>
+
+#include <limits> // for std::numeric_limits<int>::max()
+#include <set>
+#include <string>
+
+#include "codegen/llvm-codegen.h"
+#include "gutil/gscoped_ptr.h"
+#include "runtime/buffered-tuple-stream-v2.inline.h"
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/collection-value-builder.h"
+#include "runtime/collection-value.h"
+#include "runtime/raw-value.h"
+#include "runtime/row-batch.h"
+#include "runtime/string-value.inline.h"
+#include "runtime/test-env.h"
+#include "runtime/tmp-file-mgr.h"
+#include "service/fe-support.h"
+#include "testutil/desc-tbl-builder.h"
+#include "testutil/gtest-util.h"
+#include "util/test-info.h"
+
+#include "gen-cpp/ImpalaInternalService_types.h"
+#include "gen-cpp/Types_types.h"
+
+#include "common/names.h"
+
+using base::FreeDeleter;
+using std::numeric_limits;
+
+static const int BATCH_SIZE = 250;
+// Allow arbitrarily small pages in our test buffer pool.
+static const int MIN_PAGE_LEN = 1;
+// Limit the size of the buffer pool to bound memory consumption.
+static const int64_t BUFFER_POOL_LIMIT = 1024L * 1024L * 1024L;
+
+// The page length to use for the streams.
+static const int PAGE_LEN = 2 * 1024 * 1024;
+static const uint32_t PRIME = 479001599;
+
+namespace impala {
+
+static const StringValue STRINGS[] = {
+    StringValue("ABC"), StringValue("HELLO"), StringValue("123456789"),
+    StringValue("FOOBAR"), StringValue("ONE"), StringValue("THREE"),
+    StringValue("abcdefghijklmno"), StringValue("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
+    StringValue("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
+};
+
+static const int NUM_STRINGS = sizeof(STRINGS) / sizeof(StringValue);
+
+class SimpleTupleStreamTest : public testing::Test {
+ protected:
+  virtual void SetUp() {}
+
+  virtual void CreateDescriptors() {
+    vector<bool> nullable_tuples(1, false);
+    vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
+
+    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_desc_ =
+        pool_.Add(new RowDescriptor(*int_builder.Build(), tuple_ids, nullable_tuples));
+
+    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_desc_ =
+        pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples));
+  }
+
+  virtual void TearDown() {
+    if (client_.is_registered()) {
+      test_env_->exec_env()->buffer_pool()->DeregisterClient(&client_);
+    }
+    runtime_state_ = nullptr;
+    pool_.Clear();
+    mem_pool_->FreeAll();
+    test_env_.reset();
+  }
+
+  /// Set up all of the test state: the buffer pool, a query state, a client with no
+  /// reservation and any other descriptors, etc.
+  /// The buffer pool's capacity is limited to 'buffer_pool_limit'.
+  void Init(int64_t buffer_pool_limit) {
+    test_env_.reset(new TestEnv());
+    test_env_->SetBufferPoolArgs(MIN_PAGE_LEN, buffer_pool_limit);
+    ASSERT_OK(test_env_->Init());
+
+    CreateDescriptors();
+    mem_pool_.reset(new MemPool(&tracker_));
+
+    ASSERT_OK(test_env_->CreateQueryState(0, nullptr, &runtime_state_));
+    query_state_ = runtime_state_->query_state();
+
+    RuntimeProfile* client_profile = pool_.Add(new RuntimeProfile(&pool_, "client"));
+    MemTracker* client_tracker =
+        pool_.Add(new MemTracker(-1, "client", runtime_state_->instance_mem_tracker()));
+    ASSERT_OK(test_env_->exec_env()->buffer_pool()->RegisterClient("",
+        query_state_->file_group(), runtime_state_->instance_buffer_reservation(),
+        client_tracker, numeric_limits<int>::max(), client_profile, &client_));
+  }
+
+  /// Generate the ith element of a sequence of int values.
+  int GenIntValue(int i) {
+    // Multiply by large prime to get varied bit patterns.
+    return i * PRIME;
+  }
+
+  /// Generate the ith element of a sequence of bool values.
+  bool GenBoolValue(int i) {
+    // Use a middle bit of the int value.
+    return ((GenIntValue(i) >> 8) & 0x1) != 0;
+  }
+
+  /// Count the total number of slots per row based on the given row descriptor.
+  int CountSlotsPerRow(const RowDescriptor& row_desc) {
+    int slots_per_row = 0;
+    for (int i = 0; i < row_desc.tuple_descriptors().size(); ++i) {
+      TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[i];
+      slots_per_row += tuple_desc->slots().size();
+    }
+    return slots_per_row;
+  }
+
+  /// Allocate a row batch with 'num_rows' of rows with layout described by 'row_desc'.
+  /// 'offset' is used to account for rows occupied by any previous row batches. This is
+  /// needed to match the values generated in VerifyResults(). If 'gen_null' is true,
+  /// some tuples will be set to NULL.
+  virtual RowBatch* CreateBatch(
+      const RowDescriptor& row_desc, int offset, int num_rows, bool gen_null) {
+    RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_));
+    int num_tuples = row_desc.tuple_descriptors().size();
+
+    int idx = offset * CountSlotsPerRow(row_desc);
+    for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+      TupleRow* row = batch->GetRow(row_idx);
+      for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+        TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
+        Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), batch->tuple_data_pool());
+        bool is_null = gen_null && !GenBoolValue(idx);
+        for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); ++slot_idx, ++idx) {
+          SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx];
+          void* slot = tuple->GetSlot(slot_desc->tuple_offset());
+          switch (slot_desc->type().type) {
+            case TYPE_INT:
+              *reinterpret_cast<int*>(slot) = GenIntValue(idx);
+              break;
+            case TYPE_STRING:
+              *reinterpret_cast<StringValue*>(slot) = STRINGS[idx % NUM_STRINGS];
+              break;
+            default:
+              // The memory has been zero'ed out already by Tuple::Create().
+              break;
+          }
+        }
+        if (is_null) {
+          row->SetTuple(tuple_idx, nullptr);
+        } else {
+          row->SetTuple(tuple_idx, tuple);
+        }
+      }
+      batch->CommitLastRow();
+    }
+    return batch;
+  }
+
+  virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) {
+    return CreateBatch(*int_desc_, offset, num_rows, gen_null);
+  }
+
+  virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) {
+    return CreateBatch(*string_desc_, offset, num_rows, gen_null);
+  }
+
+  void AppendValue(uint8_t* ptr, vector<int>* results) {
+    if (ptr == nullptr) {
+      // For the tests indicate null-ability using the max int value
+      results->push_back(numeric_limits<int>::max());
+    } else {
+      results->push_back(*reinterpret_cast<int*>(ptr));
+    }
+  }
+
+  void AppendValue(uint8_t* ptr, vector<StringValue>* results) {
+    if (ptr == nullptr) {
+      results->push_back(StringValue());
+    } else {
+      StringValue sv = *reinterpret_cast<StringValue*>(ptr);
+      uint8_t* copy = mem_pool_->Allocate(sv.len);
+      memcpy(copy, sv.ptr, sv.len);
+      sv.ptr = reinterpret_cast<char*>(copy);
+      results->push_back(sv);
+    }
+  }
+
+  template <typename T>
+  void AppendRowTuples(TupleRow* row, RowDescriptor* row_desc, vector<T>* results) {
+    DCHECK(row != nullptr);
+    const int num_tuples = row_desc->tuple_descriptors().size();
+
+    for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+      TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx];
+      Tuple* tuple = row->GetTuple(tuple_idx);
+      const int num_slots = tuple_desc->slots().size();
+      for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx) {
+        SlotDescriptor* slot_desc = tuple_desc->slots()[slot_idx];
+        if (tuple == nullptr) {
+          AppendValue(nullptr, results);
+        } else {
+          void* slot = tuple->GetSlot(slot_desc->tuple_offset());
+          AppendValue(reinterpret_cast<uint8_t*>(slot), results);
+        }
+      }
+    }
+  }
+
+  template <typename T>
+  void ReadValues(BufferedTupleStreamV2* stream, RowDescriptor* desc, vector<T>* results,
+      int num_batches = -1) {
+    bool eos = false;
+    RowBatch batch(*desc, BATCH_SIZE, &tracker_);
+    int batches_read = 0;
+    do {
+      batch.Reset();
+      EXPECT_OK(stream->GetNext(&batch, &eos));
+      ++batches_read;
+      for (int i = 0; i < batch.num_rows(); ++i) {
+        AppendRowTuples(batch.GetRow(i), desc, results);
+      }
+    } while (!eos && (num_batches < 0 || batches_read <= num_batches));
+  }
+
+  void GetExpectedValue(int idx, bool is_null, int* val) {
+    if (is_null) {
+      *val = numeric_limits<int>::max();
+    } else {
+      *val = GenIntValue(idx);
+    }
+  }
+
+  void GetExpectedValue(int idx, bool is_null, StringValue* val) {
+    if (is_null) {
+      *val = StringValue();
+    } else {
+      *val = STRINGS[idx % NUM_STRINGS];
+    }
+  }
+
+  template <typename T>
+  void VerifyResults(const RowDescriptor& row_desc, const vector<T>& results,
+      int num_rows, bool gen_null) {
+    int idx = 0;
+    for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
+      const int num_tuples = row_desc.tuple_descriptors().size();
+      for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+        const TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
+        const int num_slots = tuple_desc->slots().size();
+        bool is_null = gen_null && !GenBoolValue(idx);
+        for (int slot_idx = 0; slot_idx < num_slots; ++slot_idx, ++idx) {
+          T expected_val;
+          GetExpectedValue(idx, is_null, &expected_val);
+          ASSERT_EQ(results[idx], expected_val)
+              << "results[" << idx << "] " << results[idx] << " != " << expected_val
+              << " row_idx=" << row_idx << " tuple_idx=" << tuple_idx
+              << " slot_idx=" << slot_idx << " gen_null=" << gen_null;
+        }
+      }
+    }
+    DCHECK_EQ(results.size(), idx);
+  }
+
+  // Test adding num_batches of ints to the stream and reading them back.
+  // If unpin_stream is true, operate the stream in unpinned mode.
+  // Assumes that enough buffers are available to read and write the stream.
+  template <typename T>
+  void TestValues(int num_batches, RowDescriptor* desc, bool gen_null, bool unpin_stream,
+      int64_t page_len = PAGE_LEN, int num_rows = BATCH_SIZE) {
+    BufferedTupleStreamV2 stream(runtime_state_, *desc, &client_, page_len);
+    ASSERT_OK(stream.Init(-1, true));
+    bool got_write_reservation;
+    ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+    ASSERT_TRUE(got_write_reservation);
+
+    if (unpin_stream) {
+      stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    }
+    // Add rows to the stream
+    int offset = 0;
+    for (int i = 0; i < num_batches; ++i) {
+      RowBatch* batch = nullptr;
+
+      Status status;
+      ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == sizeof(StringValue));
+      batch = CreateBatch(*desc, offset, num_rows, gen_null);
+      for (int j = 0; j < batch->num_rows(); ++j) {
+        // TODO: test that AddRow succeeds after freeing memory.
+        bool b = stream.AddRow(batch->GetRow(j), &status);
+        ASSERT_OK(status);
+        ASSERT_TRUE(b);
+      }
+      offset += batch->num_rows();
+      // Reset the batch to make sure the stream handles the memory correctly.
+      batch->Reset();
+    }
+
+    bool got_read_reservation;
+    ASSERT_OK(stream.PrepareForRead(false, &got_read_reservation));
+    ASSERT_TRUE(got_read_reservation);
+
+    // Read all the rows back
+    vector<T> results;
+    ReadValues(&stream, desc, &results);
+
+    // Verify result
+    VerifyResults<T>(*desc, results, num_rows * num_batches, gen_null);
+
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
+      bool unpin_stream, int64_t page_len = PAGE_LEN) {
+    BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, page_len);
+    ASSERT_OK(stream.Init(-1, true));
+    bool got_reservation;
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+    if (unpin_stream) {
+      stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+    }
+
+    vector<int> results;
+    for (int i = 0; i < num_batches; ++i) {
+      RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false);
+      for (int j = 0; j < batch->num_rows(); ++j) {
+        Status status;
+        bool b = stream.AddRow(batch->GetRow(j), &status);
+        ASSERT_TRUE(b);
+        ASSERT_OK(status);
+      }
+      // Reset the batch to make sure the stream handles the memory correctly.
+      batch->Reset();
+      if (i % num_batches_before_read == 0) {
+        ReadValues(&stream, int_desc_, &results, (rand() % num_batches_before_read) + 1);
+      }
+    }
+    ReadValues(&stream, int_desc_, &results);
+
+    VerifyResults<int>(*int_desc_, results, BATCH_SIZE * num_batches, false);
+
+    stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+
+  void TestUnpinPin(bool varlen_data, bool read_write);
+
+  void TestTransferMemory(bool pinned_stream, bool read_write);
+
+  // The temporary runtime environment used for the test.
+  scoped_ptr<TestEnv> test_env_;
+  RuntimeState* runtime_state_;
+  QueryState* query_state_;
+
+  // Buffer pool client - automatically deregistered in TearDown().
+  BufferPool::ClientHandle client_;
+
+  // Dummy MemTracker used for miscellaneous memory.
+  MemTracker tracker_;
+  ObjectPool pool_;
+  RowDescriptor* int_desc_;
+  RowDescriptor* string_desc_;
+  scoped_ptr<MemPool> mem_pool_;
+};
+
+// Tests with a non-NULLable tuple per row.
+class SimpleNullStreamTest : public SimpleTupleStreamTest {
+ protected:
+  virtual void CreateDescriptors() {
+    vector<bool> nullable_tuples(1, true);
+    vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
+
+    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_desc_ =
+        pool_.Add(new RowDescriptor(*int_builder.Build(), tuple_ids, nullable_tuples));
+
+    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_desc_ =
+        pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples));
+  }
+}; // SimpleNullStreamTest
+
+// Tests with multiple non-NULLable tuples per row.
+class MultiTupleStreamTest : public SimpleTupleStreamTest {
+ protected:
+  virtual void CreateDescriptors() {
+    vector<bool> nullable_tuples;
+    nullable_tuples.push_back(false);
+    nullable_tuples.push_back(false);
+    nullable_tuples.push_back(false);
+
+    vector<TTupleId> tuple_ids;
+    tuple_ids.push_back(static_cast<TTupleId>(0));
+    tuple_ids.push_back(static_cast<TTupleId>(1));
+    tuple_ids.push_back(static_cast<TTupleId>(2));
+
+    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_desc_ =
+        pool_.Add(new RowDescriptor(*int_builder.Build(), tuple_ids, nullable_tuples));
+
+    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_desc_ =
+        pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples));
+  }
+};
+
+// Tests with multiple NULLable tuples per row.
+class MultiNullableTupleStreamTest : public SimpleTupleStreamTest {
+ protected:
+  virtual void CreateDescriptors() {
+    vector<bool> nullable_tuples;
+    nullable_tuples.push_back(false);
+    nullable_tuples.push_back(true);
+    nullable_tuples.push_back(true);
+
+    vector<TTupleId> tuple_ids;
+    tuple_ids.push_back(static_cast<TTupleId>(0));
+    tuple_ids.push_back(static_cast<TTupleId>(1));
+    tuple_ids.push_back(static_cast<TTupleId>(2));
+
+    DescriptorTblBuilder int_builder(test_env_->exec_env()->frontend(), &pool_);
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_builder.DeclareTuple() << TYPE_INT;
+    int_desc_ =
+        pool_.Add(new RowDescriptor(*int_builder.Build(), tuple_ids, nullable_tuples));
+
+    DescriptorTblBuilder string_builder(test_env_->exec_env()->frontend(), &pool_);
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_builder.DeclareTuple() << TYPE_STRING;
+    string_desc_ =
+        pool_.Add(new RowDescriptor(*string_builder.Build(), tuple_ids, nullable_tuples));
+  }
+};
+
+/// Tests with collection types.
+class ArrayTupleStreamTest : public SimpleTupleStreamTest {
+ protected:
+  RowDescriptor* array_desc_;
+
+  virtual void CreateDescriptors() {
+    // tuples: (array<string>, array<array<int>>) (array<int>)
+    vector<bool> nullable_tuples(2, true);
+    vector<TTupleId> tuple_ids;
+    tuple_ids.push_back(static_cast<TTupleId>(0));
+    tuple_ids.push_back(static_cast<TTupleId>(1));
+    ColumnType string_array_type;
+    string_array_type.type = TYPE_ARRAY;
+    string_array_type.children.push_back(TYPE_STRING);
+
+    ColumnType int_array_type;
+    int_array_type.type = TYPE_ARRAY;
+    int_array_type.children.push_back(TYPE_STRING);
+
+    ColumnType nested_array_type;
+    nested_array_type.type = TYPE_ARRAY;
+    nested_array_type.children.push_back(int_array_type);
+
+    DescriptorTblBuilder builder(test_env_->exec_env()->frontend(), &pool_);
+    builder.DeclareTuple() << string_array_type << nested_array_type;
+    builder.DeclareTuple() << int_array_type;
+    array_desc_ =
+        pool_.Add(new RowDescriptor(*builder.Build(), tuple_ids, nullable_tuples));
+  }
+};
+
+// Basic API test. No data should be going to disk.
+TEST_F(SimpleTupleStreamTest, Basic) {
+  Init(numeric_limits<int64_t>::max());
+  TestValues<int>(1, int_desc_, false, true);
+  TestValues<int>(10, int_desc_, false, true);
+  TestValues<int>(100, int_desc_, false, true);
+  TestValues<int>(1, int_desc_, false, false);
+  TestValues<int>(10, int_desc_, false, false);
+  TestValues<int>(100, int_desc_, false, false);
+
+  TestValues<StringValue>(1, string_desc_, false, true);
+  TestValues<StringValue>(10, string_desc_, false, true);
+  TestValues<StringValue>(100, string_desc_, false, true);
+  TestValues<StringValue>(1, string_desc_, false, false);
+  TestValues<StringValue>(10, string_desc_, false, false);
+  TestValues<StringValue>(100, string_desc_, false, false);
+
+  TestIntValuesInterleaved(1, 1, true);
+  TestIntValuesInterleaved(10, 5, true);
+  TestIntValuesInterleaved(100, 15, true);
+  TestIntValuesInterleaved(1, 1, false);
+  TestIntValuesInterleaved(10, 5, false);
+  TestIntValuesInterleaved(100, 15, false);
+}
+
+// Test with only 1 buffer.
+TEST_F(SimpleTupleStreamTest, OneBufferSpill) {
+  // Each buffer can only hold 128 ints, so this spills quite often.
+  int buffer_size = 128 * sizeof(int);
+  Init(buffer_size);
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+}
+
+// Test with a few buffers.
+TEST_F(SimpleTupleStreamTest, ManyBufferSpill) {
+  int buffer_size = 128 * sizeof(int);
+  Init(10 * buffer_size);
+
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+  TestValues<int>(100, int_desc_, false, true, buffer_size);
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
+
+  TestIntValuesInterleaved(1, 1, true, buffer_size);
+  TestIntValuesInterleaved(10, 5, true, buffer_size);
+  TestIntValuesInterleaved(100, 15, true, buffer_size);
+}
+
+void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
+  int buffer_size = 128 * sizeof(int);
+  int num_buffers = 10;
+  Init(num_buffers * buffer_size);
+  RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_;
+
+  BufferedTupleStreamV2 stream(runtime_state_, *row_desc, &client_, buffer_size);
+  ASSERT_OK(stream.Init(-1, true));
+  if (read_write) {
+    bool got_reservation = false;
+    ASSERT_OK(stream.PrepareForReadWrite(false, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+  } else {
+    bool got_write_reservation;
+    ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+    ASSERT_TRUE(got_write_reservation);
+  }
+
+  int offset = 0;
+  bool full = false;
+  int num_batches = 0;
+  while (!full) {
+    // Make sure we can switch between pinned and unpinned states while writing.
+    if (num_batches % 10 == 0) {
+      bool pinned;
+      stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+      ASSERT_OK(stream.PinStream(&pinned));
+      DCHECK(pinned);
+    }
+
+    RowBatch* batch = varlen_data ? CreateStringBatch(offset, BATCH_SIZE, false) :
+                                    CreateIntBatch(offset, BATCH_SIZE, false);
+    int j = 0;
+    for (; j < batch->num_rows(); ++j) {
+      Status status;
+      full = !stream.AddRow(batch->GetRow(j), &status);
+      ASSERT_OK(status);
+      if (full) break;
+    }
+    offset += j;
+    ++num_batches;
+  }
+
+  stream.UnpinStream(BufferedTupleStreamV2::UNPIN_ALL_EXCEPT_CURRENT);
+
+  bool pinned = false;
+  ASSERT_OK(stream.PinStream(&pinned));
+  ASSERT_TRUE(pinned);
+
+  // Read and verify result a few times. We should be able to reread the stream if
+  // we don't use delete on read mode.
+  int read_iters = 3;
+  for (int i = 0; i < read_iters; ++i) {
+    bool delete_on_read = i == read_iters - 1;
+    if (i > 0 || !read_write) {
+      bool got_read_reservation;
+      ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+      ASSERT_TRUE(got_read_reservation);
+    }
+
+    if (varlen_data) {
+      vector<StringValue> results;
+      ReadValues(&stream, row_desc, &results);
+      VerifyResults<StringValue>(*string_desc_, results, offset, false);
+    } else {
+      vector<int> results;
+      ReadValues(&stream, row_desc, &results);
+      VerifyResults<int>(*int_desc_, results, offset, false);
+    }
+  }
+
+  // After delete_on_read, all blocks aside from the last should be deleted.
+  // Note: this should really be 0, but the BufferedTupleStreamV2 returns eos before
+  // deleting the last block, rather than after, so the last block isn't deleted
+  // until the stream is closed.
+  ASSERT_EQ(stream.BytesPinned(false), buffer_size);
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+
+  ASSERT_EQ(stream.BytesPinned(false), 0);
+}
+
+TEST_F(SimpleTupleStreamTest, UnpinPin) {
+  TestUnpinPin(false, false);
+}
+
+TEST_F(SimpleTupleStreamTest, UnpinPinReadWrite) {
+  TestUnpinPin(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, UnpinPinVarlen) {
+  TestUnpinPin(false, false);
+}
+
+void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write) {
+  // Use smaller buffers so that the explicit FLUSH_RESOURCES flag is required to
+  // make the batch at capacity.
+  int buffer_size = 4 * 1024;
+  Init(100 * buffer_size);
+
+  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, buffer_size);
+  ASSERT_OK(stream.Init(-1, pin_stream));
+  if (read_write) {
+    bool got_reservation;
+    ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
+    ASSERT_TRUE(got_reservation);
+  } else {
+    bool got_write_reservation;
+    ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+    ASSERT_TRUE(got_write_reservation);
+  }
+  RowBatch* batch = CreateIntBatch(0, 1024, false);
+
+  // Construct a stream with 4 blocks.
+  const int total_num_buffers = 4;
+  while (stream.byte_size() < total_num_buffers * buffer_size) {
+    Status status;
+    for (int i = 0; i < batch->num_rows(); ++i) {
+      bool ret = stream.AddRow(batch->GetRow(i), &status);
+      EXPECT_TRUE(ret);
+      ASSERT_OK(status);
+    }
+  }
+
+  batch->Reset();
+  stream.Close(batch, RowBatch::FlushMode::FLUSH_RESOURCES);
+  if (pin_stream) {
+    DCHECK_EQ(total_num_buffers, batch->num_buffers());
+  } else if (read_write) {
+    // Read and write block should be attached.
+    DCHECK_EQ(2, batch->num_buffers());
+  } else {
+    // Read block should be attached.
+    DCHECK_EQ(1, batch->num_buffers());
+  }
+  DCHECK(batch->AtCapacity()); // Flush resources flag should have been set.
+  batch->Reset();
+  DCHECK_EQ(0, batch->num_buffers());
+}
+
+/// Test attaching memory to a row batch from a pinned stream.
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamReadWrite) {
+  TestTransferMemory(true, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromPinnedStreamNoReadWrite) {
+  TestTransferMemory(true, false);
+}
+
+/// Test attaching memory to a row batch from an unpinned stream.
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamReadWrite) {
+  TestTransferMemory(false, true);
+}
+
+TEST_F(SimpleTupleStreamTest, TransferMemoryFromUnpinnedStreamNoReadWrite) {
+  TestTransferMemory(false, false);
+}
+
+// Test that tuple stream functions if it references strings outside stream. The
+// aggregation node relies on this since it updates tuples in-place.
+TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
+  int buffer_size = 8 * 1024 * 1024;
+  Init(2 * buffer_size);
+  Status status = Status::OK();
+
+  int num_batches = 100;
+  int rows_added = 0;
+  DCHECK_EQ(string_desc_->tuple_descriptors().size(), 1);
+  TupleDescriptor& tuple_desc = *string_desc_->tuple_descriptors()[0];
+
+  set<SlotId> external_slots;
+  for (int i = 0; i < tuple_desc.string_slots().size(); ++i) {
+    external_slots.insert(tuple_desc.string_slots()[i]->id());
+  }
+
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *string_desc_, &client_, buffer_size, external_slots);
+  ASSERT_OK(stream.Init(0, false));
+
+  for (int i = 0; i < num_batches; ++i) {
+    RowBatch* batch = CreateStringBatch(rows_added, BATCH_SIZE, false);
+    for (int j = 0; j < batch->num_rows(); ++j) {
+      uint8_t* varlen_data;
+      int fixed_size = tuple_desc.byte_size();
+      uint8_t* tuple = stream.AllocateRow(fixed_size, 0, &varlen_data, &status);
+      ASSERT_TRUE(tuple != nullptr);
+      ASSERT_TRUE(status.ok());
+      // Copy fixed portion in, but leave it pointing to row batch's varlen data.
+      memcpy(tuple, batch->GetRow(j)->GetTuple(0), fixed_size);
+    }
+    rows_added += batch->num_rows();
+  }
+
+  DCHECK_EQ(rows_added, stream.num_rows());
+
+  for (int delete_on_read = 0; delete_on_read <= 1; ++delete_on_read) {
+    // Keep stream in memory and test we can read ok.
+    vector<StringValue> results;
+    bool got_read_reservation;
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_TRUE(got_read_reservation);
+    ReadValues(&stream, string_desc_, &results);
+    VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
+  }
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+// Construct a big row by stiching together many tuples so the total row size
+// will be close to the IO block size. With null indicators, stream will fail to
+// be initialized; Without null indicators, things should work fine.
+TEST_F(SimpleTupleStreamTest, BigRow) {
+  Init(2 * PAGE_LEN);
+  vector<TupleId> tuple_ids;
+  vector<bool> nullable_tuples;
+  vector<bool> non_nullable_tuples;
+
+  DescriptorTblBuilder big_row_builder(test_env_->exec_env()->frontend(), &pool_);
+  // Each tuple contains 8 slots of TYPE_INT and a single byte for null indicator.
+  const int num_tuples = PAGE_LEN / (8 * sizeof(int) + 1);
+  for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
+    big_row_builder.DeclareTuple() << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT
+                                   << TYPE_INT << TYPE_INT << TYPE_INT << TYPE_INT;
+    tuple_ids.push_back(static_cast<TTupleId>(tuple_idx));
+    nullable_tuples.push_back(true);
+    non_nullable_tuples.push_back(false);
+  }
+  DescriptorTbl* desc = big_row_builder.Build();
+
+  // Construct a big row with all non-nullable tuples.
+  RowDescriptor* row_desc =
+      pool_.Add(new RowDescriptor(*desc, tuple_ids, non_nullable_tuples));
+  ASSERT_FALSE(row_desc->IsAnyTupleNullable());
+  // Test writing this row into the stream and then reading it back.
+  TestValues<int>(1, row_desc, false, false, PAGE_LEN, 1);
+  TestValues<int>(1, row_desc, false, true, PAGE_LEN, 1);
+
+  // Construct a big row with nullable tuples. This requires extra space for null
+  // indicators in the stream so adding the row will fail.
+  RowDescriptor* nullable_row_desc =
+      pool_.Add(new RowDescriptor(*desc, tuple_ids, nullable_tuples));
+  ASSERT_TRUE(nullable_row_desc->IsAnyTupleNullable());
+  BufferedTupleStreamV2 nullable_stream(
+      runtime_state_, *nullable_row_desc, &client_, PAGE_LEN);
+  ASSERT_OK(nullable_stream.Init(-1, true));
+  bool got_reservation;
+  Status status = nullable_stream.PrepareForWrite(&got_reservation);
+  EXPECT_EQ(TErrorCode::BTS_BLOCK_OVERFLOW, status.code());
+  nullable_stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+// Test for IMPALA-3923: overflow of 32-bit int in GetRows().
+TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
+  Init(BUFFER_POOL_LIMIT);
+  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, PAGE_LEN);
+  ASSERT_OK(stream.Init(-1, true));
+
+  Status status;
+  // Add more rows than can be fit in a RowBatch (limited by its 32-bit row count).
+  // Actually adding the rows would take a very long time, so just set num_rows_.
+  // This puts the stream in an inconsistent state, but exercises the right code path.
+  stream.num_rows_ = 1L << 33;
+  bool got_rows;
+  scoped_ptr<RowBatch> overflow_batch;
+  ASSERT_FALSE(stream.GetRows(&tracker_, &overflow_batch, &got_rows).ok());
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+// Basic API test. No data should be going to disk.
+TEST_F(SimpleNullStreamTest, Basic) {
+  Init(BUFFER_POOL_LIMIT);
+  TestValues<int>(1, int_desc_, false, true);
+  TestValues<int>(10, int_desc_, false, true);
+  TestValues<int>(100, int_desc_, false, true);
+  TestValues<int>(1, int_desc_, true, true);
+  TestValues<int>(10, int_desc_, true, true);
+  TestValues<int>(100, int_desc_, true, true);
+  TestValues<int>(1, int_desc_, false, false);
+  TestValues<int>(10, int_desc_, false, false);
+  TestValues<int>(100, int_desc_, false, false);
+  TestValues<int>(1, int_desc_, true, false);
+  TestValues<int>(10, int_desc_, true, false);
+  TestValues<int>(100, int_desc_, true, false);
+
+  TestValues<StringValue>(1, string_desc_, false, true);
+  TestValues<StringValue>(10, string_desc_, false, true);
+  TestValues<StringValue>(100, string_desc_, false, true);
+  TestValues<StringValue>(1, string_desc_, true, true);
+  TestValues<StringValue>(10, string_desc_, true, true);
+  TestValues<StringValue>(100, string_desc_, true, true);
+  TestValues<StringValue>(1, string_desc_, false, false);
+  TestValues<StringValue>(10, string_desc_, false, false);
+  TestValues<StringValue>(100, string_desc_, false, false);
+  TestValues<StringValue>(1, string_desc_, true, false);
+  TestValues<StringValue>(10, string_desc_, true, false);
+  TestValues<StringValue>(100, string_desc_, true, false);
+
+  TestIntValuesInterleaved(1, 1, true);
+  TestIntValuesInterleaved(10, 5, true);
+  TestIntValuesInterleaved(100, 15, true);
+  TestIntValuesInterleaved(1, 1, false);
+  TestIntValuesInterleaved(10, 5, false);
+  TestIntValuesInterleaved(100, 15, false);
+}
+
+// Test tuple stream with only 1 buffer and rows with multiple tuples.
+TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) {
+  // Each buffer can only hold 128 ints, so this spills quite often.
+  int buffer_size = 128 * sizeof(int);
+  Init(buffer_size);
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+}
+
+// Test with a few buffers and rows with multiple tuples.
+TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
+  int buffer_size = 128 * sizeof(int);
+  Init(10 * buffer_size);
+
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+  TestValues<int>(100, int_desc_, false, true, buffer_size);
+
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
+
+  TestIntValuesInterleaved(1, 1, true, buffer_size);
+  TestIntValuesInterleaved(10, 5, true, buffer_size);
+  TestIntValuesInterleaved(100, 15, true, buffer_size);
+}
+
+// Test that we can allocate a row in the stream and copy in multiple tuples then
+// read it back from the stream.
+TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
+  // Use small buffers so it will be flushed to disk.
+  int buffer_size = 4 * 1024;
+  Init(2 * buffer_size);
+  Status status = Status::OK();
+
+  int num_batches = 1;
+  int rows_added = 0;
+  BufferedTupleStreamV2 stream(runtime_state_, *string_desc_, &client_, buffer_size);
+  ASSERT_OK(stream.Init(-1, false));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+
+  for (int i = 0; i < num_batches; ++i) {
+    RowBatch* batch = CreateStringBatch(rows_added, 1, false);
+    for (int j = 0; j < batch->num_rows(); ++j) {
+      TupleRow* row = batch->GetRow(j);
+      int64_t fixed_size = 0;
+      int64_t varlen_size = 0;
+      for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
+        TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
+        fixed_size += tuple_desc->byte_size();
+        varlen_size += row->GetTuple(k)->VarlenByteSize(*tuple_desc);
+      }
+      uint8_t* varlen_data;
+      uint8_t* fixed_data =
+          stream.AllocateRow(fixed_size, varlen_size, &varlen_data, &status);
+      ASSERT_TRUE(fixed_data != nullptr);
+      ASSERT_TRUE(status.ok());
+      uint8_t* varlen_write_ptr = varlen_data;
+      for (int k = 0; k < string_desc_->tuple_descriptors().size(); k++) {
+        TupleDescriptor* tuple_desc = string_desc_->tuple_descriptors()[k];
+        Tuple* src = row->GetTuple(k);
+        Tuple* dst = reinterpret_cast<Tuple*>(fixed_data);
+        fixed_data += tuple_desc->byte_size();
+        memcpy(dst, src, tuple_desc->byte_size());
+        for (int l = 0; l < tuple_desc->slots().size(); l++) {
+          SlotDescriptor* slot = tuple_desc->slots()[l];
+          StringValue* src_string = src->GetStringSlot(slot->tuple_offset());
+          StringValue* dst_string = dst->GetStringSlot(slot->tuple_offset());
+          dst_string->ptr = reinterpret_cast<char*>(varlen_write_ptr);
+          memcpy(dst_string->ptr, src_string->ptr, src_string->len);
+          varlen_write_ptr += src_string->len;
+        }
+      }
+      ASSERT_EQ(varlen_data + varlen_size, varlen_write_ptr);
+    }
+    rows_added += batch->num_rows();
+  }
+
+  for (int i = 0; i < 3; ++i) {
+    bool delete_on_read = i == 2;
+    vector<StringValue> results;
+    bool got_read_reservation;
+    ASSERT_OK(stream.PrepareForRead(delete_on_read, &got_read_reservation));
+    ASSERT_TRUE(got_read_reservation);
+    ReadValues(&stream, string_desc_, &results);
+    VerifyResults<StringValue>(*string_desc_, results, rows_added, false);
+  }
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+// Test with rows with multiple nullable tuples.
+TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) {
+  // Each buffer can only hold 128 ints, so this spills quite often.
+  int buffer_size = 128 * sizeof(int);
+  Init(buffer_size);
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+  TestValues<int>(1, int_desc_, true, true, buffer_size);
+  TestValues<int>(10, int_desc_, true, true, buffer_size);
+
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(1, string_desc_, true, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, true, true, buffer_size);
+}
+
+// Test with a few buffers.
+TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) {
+  int buffer_size = 128 * sizeof(int);
+  Init(10 * buffer_size);
+
+  TestValues<int>(1, int_desc_, false, true, buffer_size);
+  TestValues<int>(10, int_desc_, false, true, buffer_size);
+  TestValues<int>(100, int_desc_, false, true, buffer_size);
+  TestValues<int>(1, int_desc_, true, true, buffer_size);
+  TestValues<int>(10, int_desc_, true, true, buffer_size);
+  TestValues<int>(100, int_desc_, true, true, buffer_size);
+
+  TestValues<StringValue>(1, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(100, string_desc_, false, true, buffer_size);
+  TestValues<StringValue>(1, string_desc_, true, true, buffer_size);
+  TestValues<StringValue>(10, string_desc_, true, true, buffer_size);
+  TestValues<StringValue>(100, string_desc_, true, true, buffer_size);
+
+  TestIntValuesInterleaved(1, 1, true, buffer_size);
+  TestIntValuesInterleaved(10, 5, true, buffer_size);
+  TestIntValuesInterleaved(100, 15, true, buffer_size);
+}
+
+/// Test that ComputeRowSize handles nulls
+TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
+  Init(BUFFER_POOL_LIMIT);
+  const vector<TupleDescriptor*>& tuple_descs = string_desc_->tuple_descriptors();
+  // String in second tuple is stored externally.
+  set<SlotId> external_slots;
+  const SlotDescriptor* external_string_slot = tuple_descs[1]->slots()[0];
+  external_slots.insert(external_string_slot->id());
+
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *string_desc_, &client_, PAGE_LEN, external_slots);
+  gscoped_ptr<TupleRow, FreeDeleter> row(
+      reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*))));
+  gscoped_ptr<Tuple, FreeDeleter> tuple0(
+      reinterpret_cast<Tuple*>(malloc(tuple_descs[0]->byte_size())));
+  gscoped_ptr<Tuple, FreeDeleter> tuple1(
+      reinterpret_cast<Tuple*>(malloc(tuple_descs[1]->byte_size())));
+  gscoped_ptr<Tuple, FreeDeleter> tuple2(
+      reinterpret_cast<Tuple*>(malloc(tuple_descs[2]->byte_size())));
+  memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
+  memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
+  memset(tuple2.get(), 0, tuple_descs[2]->byte_size());
+  const int tuple_null_indicator_bytes = 1; // Need 1 bytes for 3 tuples.
+
+  // All nullable tuples are NULL.
+  row->SetTuple(0, tuple0.get());
+  row->SetTuple(1, nullptr);
+  row->SetTuple(2, nullptr);
+  EXPECT_EQ(tuple_null_indicator_bytes + tuple_descs[0]->byte_size(),
+      stream.ComputeRowSize(row.get()));
+
+  // Tuples are initialized to empty and have no var-len data.
+  row->SetTuple(1, tuple1.get());
+  row->SetTuple(2, tuple2.get());
+  EXPECT_EQ(tuple_null_indicator_bytes + string_desc_->GetRowSize(),
+      stream.ComputeRowSize(row.get()));
+
+  // Tuple 0 has some data.
+  const SlotDescriptor* string_slot = tuple_descs[0]->slots()[0];
+  StringValue* sv = tuple0->GetStringSlot(string_slot->tuple_offset());
+  *sv = STRINGS[0];
+  int64_t expected_len =
+      tuple_null_indicator_bytes + string_desc_->GetRowSize() + sv->len;
+  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
+
+  // Check that external slots aren't included in count.
+  sv = tuple1->GetStringSlot(external_string_slot->tuple_offset());
+  sv->ptr = reinterpret_cast<char*>(1234);
+  sv->len = 1234;
+  EXPECT_EQ(expected_len, stream.ComputeRowSize(row.get()));
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+/// Test that deep copy works with arrays by copying into a BufferedTupleStream, freeing
+/// the original rows, then reading back the rows and verifying the contents.
+TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
+  Status status;
+  Init(BUFFER_POOL_LIMIT);
+  const int NUM_ROWS = 4000;
+  BufferedTupleStreamV2 stream(runtime_state_, *array_desc_, &client_, PAGE_LEN);
+  const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors();
+  // Write out a predictable pattern of data by iterating over arrays of constants.
+  int strings_index = 0; // we take the mod of this as index into STRINGS.
+  int array_lens[] = {0, 1, 5, 10, 1000, 2, 49, 20};
+  int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
+  int array_len_index = 0;
+  ASSERT_OK(stream.Init(-1, false));
+  bool got_write_reservation;
+  ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
+  ASSERT_TRUE(got_write_reservation);
+
+  for (int i = 0; i < NUM_ROWS; ++i) {
+    const int tuple_null_indicator_bytes = 1; // Need 1 bytes for 2 tuples.
+    int expected_row_size = tuple_null_indicator_bytes + tuple_descs[0]->byte_size()
+        + tuple_descs[1]->byte_size();
+    gscoped_ptr<TupleRow, FreeDeleter> row(
+        reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*))));
+    gscoped_ptr<Tuple, FreeDeleter> tuple0(
+        reinterpret_cast<Tuple*>(malloc(tuple_descs[0]->byte_size())));
+    gscoped_ptr<Tuple, FreeDeleter> tuple1(
+        reinterpret_cast<Tuple*>(malloc(tuple_descs[1]->byte_size())));
+    memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
+    memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
+    row->SetTuple(0, tuple0.get());
+    row->SetTuple(1, tuple1.get());
+
+    // Only array<string> is non-null.
+    tuple0->SetNull(tuple_descs[0]->slots()[1]->null_indicator_offset());
+    tuple1->SetNull(tuple_descs[1]->slots()[0]->null_indicator_offset());
+    const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
+    const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
+
+    int array_len = array_lens[array_len_index++ % num_array_lens];
+    CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
+    cv->ptr = nullptr;
+    cv->num_tuples = 0;
+    CollectionValueBuilder builder(
+        cv, *item_desc, mem_pool_.get(), runtime_state_, array_len);
+    Tuple* array_data;
+    int num_rows;
+    builder.GetFreeMemory(&array_data, &num_rows);
+    expected_row_size += item_desc->byte_size() * array_len;
+
+    // Fill the array with pointers to our constant strings.
+    for (int j = 0; j < array_len; ++j) {
+      const StringValue* string = &STRINGS[strings_index++ % NUM_STRINGS];
+      array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset());
+      RawValue::Write(string, array_data, item_desc->slots()[0], mem_pool_.get());
+      array_data += item_desc->byte_size();
+      expected_row_size += string->len;
+    }
+    builder.CommitTuples(array_len);
+
+    // Check that internal row size computation gives correct result.
+    EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
+    bool b = stream.AddRow(row.get(), &status);
+    ASSERT_TRUE(b);
+    ASSERT_OK(status);
+    mem_pool_->FreeAll(); // Free data as soon as possible to smoke out issues.
+  }
+
+  // Read back and verify data.
+  bool got_read_reservation;
+  ASSERT_OK(stream.PrepareForRead(false, &got_read_reservation));
+  ASSERT_TRUE(got_read_reservation);
+  strings_index = 0;
+  array_len_index = 0;
+  bool eos = false;
+  int rows_read = 0;
+  RowBatch batch(*array_desc_, BATCH_SIZE, &tracker_);
+  do {
+    batch.Reset();
+    ASSERT_OK(stream.GetNext(&batch, &eos));
+    for (int i = 0; i < batch.num_rows(); ++i) {
+      TupleRow* row = batch.GetRow(i);
+      Tuple* tuple0 = row->GetTuple(0);
+      Tuple* tuple1 = row->GetTuple(1);
+      ASSERT_TRUE(tuple0 != nullptr);
+      ASSERT_TRUE(tuple1 != nullptr);
+      const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
+      ASSERT_FALSE(tuple0->IsNull(array_slot_desc->null_indicator_offset()));
+      ASSERT_TRUE(tuple0->IsNull(tuple_descs[0]->slots()[1]->null_indicator_offset()));
+      ASSERT_TRUE(tuple1->IsNull(tuple_descs[1]->slots()[0]->null_indicator_offset()));
+
+      const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
+      int expected_array_len = array_lens[array_len_index++ % num_array_lens];
+      CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
+      ASSERT_EQ(expected_array_len, cv->num_tuples);
+      for (int j = 0; j < cv->num_tuples; ++j) {
+        Tuple* item = reinterpret_cast<Tuple*>(cv->ptr + j * item_desc->byte_size());
+        const SlotDescriptor* string_desc = item_desc->slots()[0];
+        ASSERT_FALSE(item->IsNull(string_desc->null_indicator_offset()));
+        const StringValue* expected = &STRINGS[strings_index++ % NUM_STRINGS];
+        const StringValue* actual = item->GetStringSlot(string_desc->tuple_offset());
+        ASSERT_EQ(*expected, *actual);
+      }
+    }
+    rows_read += batch.num_rows();
+  } while (!eos);
+  ASSERT_EQ(NUM_ROWS, rows_read);
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+
+/// Test that ComputeRowSize handles nulls
+TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
+  Init(BUFFER_POOL_LIMIT);
+  const vector<TupleDescriptor*>& tuple_descs = array_desc_->tuple_descriptors();
+  set<SlotId> external_slots;
+  // Second array slot in first tuple is stored externally.
+  const SlotDescriptor* external_array_slot = tuple_descs[0]->slots()[1];
+  external_slots.insert(external_array_slot->id());
+
+  BufferedTupleStreamV2 stream(
+      runtime_state_, *array_desc_, &client_, PAGE_LEN, external_slots);
+  gscoped_ptr<TupleRow, FreeDeleter> row(
+      reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * sizeof(Tuple*))));
+  gscoped_ptr<Tuple, FreeDeleter> tuple0(
+      reinterpret_cast<Tuple*>(malloc(tuple_descs[0]->byte_size())));
+  gscoped_ptr<Tuple, FreeDeleter> tuple1(
+      reinterpret_cast<Tuple*>(malloc(tuple_descs[1]->byte_size())));
+  memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
+  memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
+
+  const int tuple_null_indicator_bytes = 1; // Need 1 bytes for 3 tuples.
+
+  // All tuples are NULL - only need null indicators.
+  row->SetTuple(0, nullptr);
+  row->SetTuple(1, nullptr);
+  EXPECT_EQ(tuple_null_indicator_bytes, stream.ComputeRowSize(row.get()));
+
+  // Tuples are initialized to empty and have no var-len data.
+  row->SetTuple(0, tuple0.get());
+  row->SetTuple(1, tuple1.get());
+  EXPECT_EQ(tuple_null_indicator_bytes + array_desc_->GetRowSize(),
+      stream.ComputeRowSize(row.get()));
+
+  // Tuple 0 has an array.
+  int expected_row_size = tuple_null_indicator_bytes + array_desc_->GetRowSize();
+  const SlotDescriptor* array_slot = tuple_descs[0]->slots()[0];
+  const TupleDescriptor* item_desc = array_slot->collection_item_descriptor();
+  int array_len = 128;
+  CollectionValue* cv = tuple0->GetCollectionSlot(array_slot->tuple_offset());
+  CollectionValueBuilder builder(
+      cv, *item_desc, mem_pool_.get(), runtime_state_, array_len);
+  Tuple* array_data;
+  int num_rows;
+  builder.GetFreeMemory(&array_data, &num_rows);
+  expected_row_size += item_desc->byte_size() * array_len;
+
+  // Fill the array with pointers to our constant strings.
+  for (int i = 0; i < array_len; ++i) {
+    const StringValue* str = &STRINGS[i % NUM_STRINGS];
+    array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset());
+    RawValue::Write(str, array_data, item_desc->slots()[0], mem_pool_.get());
+    array_data += item_desc->byte_size();
+    expected_row_size += str->len;
+  }
+  builder.CommitTuples(array_len);
+  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
+
+  // Check that the external slot isn't included in size.
+  cv = tuple0->GetCollectionSlot(external_array_slot->tuple_offset());
+  // ptr of external slot shouldn't be dereferenced when computing size.
+  cv->ptr = reinterpret_cast<uint8_t*>(1234);
+  cv->num_tuples = 1234;
+  EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
+
+  // Check that the array is excluded if tuple 0's array has its null indicator set.
+  tuple0->SetNull(array_slot->null_indicator_offset());
+  EXPECT_EQ(tuple_null_indicator_bytes + array_desc_->GetRowSize(),
+      stream.ComputeRowSize(row.get()));
+
+  stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+}
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+  impala::LlvmCodeGen::InitializeLlvm();
+  return RUN_ALL_TESTS();
+}


[2/3] incubator-impala git commit: IMPALA-4674: Part 1: port BufferedTupleStream to BufferPool

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
new file mode 100644
index 0000000..083b59e
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -0,0 +1,812 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/buffered-tuple-stream-v2.inline.h"
+
+#include <boost/bind.hpp>
+#include <gutil/strings/substitute.h>
+
+#include "runtime/bufferpool/reservation-tracker.h"
+#include "runtime/collection-value.h"
+#include "runtime/descriptors.h"
+#include "runtime/exec-env.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/string-value.h"
+#include "runtime/tuple-row.h"
+#include "util/bit-util.h"
+#include "util/debug-util.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+#ifdef NDEBUG
+#define CHECK_CONSISTENCY()
+#else
+#define CHECK_CONSISTENCY() CheckConsistency()
+#endif
+
+using namespace impala;
+using namespace strings;
+
+BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
+    const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client,
+    int64_t page_len, const set<SlotId>& ext_varlen_slots)
+  : state_(state),
+    desc_(row_desc),
+    buffer_pool_(state->exec_env()->buffer_pool()),
+    buffer_pool_client_(buffer_pool_client),
+    total_byte_size_(0),
+    read_page_rows_returned_(-1),
+    read_ptr_(nullptr),
+    write_ptr_(nullptr),
+    write_end_ptr_(nullptr),
+    rows_returned_(0),
+    write_page_(nullptr),
+    bytes_pinned_(0),
+    num_rows_(0),
+    page_len_(page_len),
+    has_nullable_tuple_(row_desc.IsAnyTupleNullable()),
+    delete_on_read_(false),
+    closed_(false),
+    pinned_(true) {
+  read_page_ = pages_.end();
+  fixed_tuple_row_size_ = 0;
+  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
+    const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
+    const int tuple_byte_size = tuple_desc->byte_size();
+    fixed_tuple_sizes_.push_back(tuple_byte_size);
+    fixed_tuple_row_size_ += tuple_byte_size;
+
+    vector<SlotDescriptor*> tuple_string_slots;
+    vector<SlotDescriptor*> tuple_coll_slots;
+    for (int j = 0; j < tuple_desc->slots().size(); ++j) {
+      SlotDescriptor* slot = tuple_desc->slots()[j];
+      if (!slot->type().IsVarLenType()) continue;
+      if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
+        if (slot->type().IsVarLenStringType()) {
+          tuple_string_slots.push_back(slot);
+        } else {
+          DCHECK(slot->type().IsCollectionType());
+          tuple_coll_slots.push_back(slot);
+        }
+      }
+    }
+    if (!tuple_string_slots.empty()) {
+      inlined_string_slots_.push_back(make_pair(i, tuple_string_slots));
+    }
+
+    if (!tuple_coll_slots.empty()) {
+      inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
+    }
+  }
+  if (has_nullable_tuple_) fixed_tuple_row_size_ += NullIndicatorBytesPerRow();
+}
+
+BufferedTupleStreamV2::~BufferedTupleStreamV2() {
+  DCHECK(closed_);
+}
+
+void BufferedTupleStreamV2::CheckConsistency() const {
+  DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
+  for (const Page& page : pages_) {
+    DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString();
+  }
+  if (has_write_iterator()) {
+    DCHECK(write_page_->is_pinned());
+    DCHECK_GE(write_ptr_, write_page_->data());
+    DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len());
+    DCHECK_GE(write_end_ptr_, write_ptr_);
+  }
+  if (has_read_iterator()) {
+    DCHECK(read_page_->is_pinned());
+    uint8_t* read_end_ptr = read_page_->data() + read_page_->len();
+    DCHECK_GE(read_ptr_, read_page_->data());
+    DCHECK_GE(read_end_ptr, read_ptr_);
+  }
+}
+
+string BufferedTupleStreamV2::DebugString() const {
+  stringstream ss;
+  ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
+     << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
+     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_
+     << " bytes_pinned=" << bytes_pinned_ << " write_page=" << write_page_
+     << " read_page=";
+  if (!has_read_iterator()) {
+    ss << "<end>";
+  } else {
+    ss << &*read_page_;
+  }
+  ss << " pages=[\n";
+  for (const Page& page : pages_) {
+    ss << "{" << page.DebugString() << "}";
+    if (&page != &pages_.back()) ss << ",\n";
+  }
+  ss << "]";
+  return ss.str();
+}
+
+string BufferedTupleStreamV2::Page::DebugString() const {
+  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
+}
+
+Status BufferedTupleStreamV2::Init(int node_id, bool pinned) {
+  if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) {
+  // This must be the first iterator created.
+  DCHECK(pages_.empty());
+  DCHECK(!delete_on_read_);
+  DCHECK(!has_write_iterator());
+  DCHECK(!has_read_iterator());
+  CHECK_CONSISTENCY();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  if (!*got_reservation) return Status::OK();
+  RETURN_IF_ERROR(NewWritePage());
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PrepareForReadWrite(
+    bool delete_on_read, bool* got_reservation) {
+  // This must be the first iterator created.
+  DCHECK(pages_.empty());
+  DCHECK(!delete_on_read_);
+  DCHECK(!has_write_iterator());
+  DCHECK(!has_read_iterator());
+  CHECK_CONSISTENCY();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_));
+  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * page_len_);
+  if (!*got_reservation) return Status::OK();
+  RETURN_IF_ERROR(NewWritePage());
+  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
+  for (Page& page : pages_) {
+    if (batch != nullptr && page.is_pinned()) {
+      BufferPool::BufferHandle buffer;
+      buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
+    } else {
+      buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
+    }
+  }
+  pages_.clear();
+  bytes_pinned_ = 0;
+  closed_ = true;
+}
+
+int64_t BufferedTupleStreamV2::CalcBytesPinned() const {
+  int64_t result = 0;
+  for (const Page& page : pages_) result += page.pin_count() * page.len();
+  return result;
+}
+
+Status BufferedTupleStreamV2::PinPage(Page* page) {
+  RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle));
+  bytes_pinned_ += page->len();
+  return Status::OK();
+}
+
+int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* page) const {
+  int pin_count = 0;
+  if (stream_pinned && has_write_iterator() && has_read_iterator()) {
+    // The stream is pinned, so all pages have a pin for that (and this pin will be used
+    // as the read iterator when the stream is unpinned)
+    pin_count++;
+    // The write iterator gets it's own pin so that we can unpin the stream without
+    // needing additional reservation.
+    if (is_write_page(page)) pin_count++;
+  } else if (stream_pinned) {
+    // The stream is pinned and only has one iterator. When it's unpinned, either the read
+    // or write iterator can use this pin count.
+    pin_count++;
+  } else {
+    // The stream is unpinned. Each iterator gets a pin count.
+    if (is_read_page(page)) pin_count++;
+    if (is_write_page(page)) pin_count++;
+  }
+  return pin_count;
+}
+
+Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) {
+  int new_pin_count = ExpectedPinCount(stream_pinned, page);
+  if (new_pin_count != page->pin_count()) {
+    DCHECK_EQ(new_pin_count, page->pin_count() + 1);
+    RETURN_IF_ERROR(PinPage(page));
+  }
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
+  int new_pin_count = ExpectedPinCount(stream_pinned, page);
+  if (new_pin_count != page->pin_count()) {
+    DCHECK_EQ(new_pin_count, page->pin_count() - 1);
+    buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
+    bytes_pinned_ -= page->len();
+  }
+}
+
+Status BufferedTupleStreamV2::NewWritePage() noexcept {
+  DCHECK(!closed_);
+  DCHECK(!has_write_iterator());
+
+  Page new_page;
+  RETURN_IF_ERROR(
+      buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle));
+  bytes_pinned_ += page_len_;
+  total_byte_size_ += page_len_;
+
+  pages_.push_back(std::move(new_page));
+  write_page_ = &pages_.back();
+  DCHECK_EQ(write_page_->num_rows, 0);
+  write_ptr_ = write_page_->data();
+  write_end_ptr_ = write_page_->data() + page_len_;
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::CheckPageSizeForRow(int64_t row_size) {
+  // TODO: IMPALA-3208: need to rework this logic to support large pages - should pick
+  // next power-of-two size.
+  if (UNLIKELY(row_size > page_len_)) {
+    // TODO: IMPALA-3208: change the message to reference the query option controlling
+    // max row size.
+    return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
+        PrettyPrinter::Print(row_size, TUnit::BYTES),
+        PrettyPrinter::Print(0, TUnit::BYTES));
+  }
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::AdvanceWritePage(
+    int64_t row_size, bool* got_reservation) noexcept {
+  CHECK_CONSISTENCY();
+
+  // Get ready to move to the next write page by unsetting 'write_page_' and
+  // potentially (depending on the mode of this stream) freeing up reservation for the
+  // next write page.
+  ResetWritePage();
+
+  RETURN_IF_ERROR(CheckPageSizeForRow(row_size));
+  // May need to pin the new page for both reading and writing. See ExpectedPinCount();
+  bool pin_for_read = has_read_iterator() && pinned_;
+  int64_t new_page_reservation = pin_for_read ? 2 * page_len_ : page_len_;
+  if (!buffer_pool_client_->IncreaseReservationToFit(new_page_reservation)) {
+    *got_reservation = false;
+    return Status::OK();
+  }
+  RETURN_IF_ERROR(NewWritePage());
+  // We may need to pin the page for reading also.
+  if (pin_for_read) RETURN_IF_ERROR(PinPage(write_page_));
+
+  CHECK_CONSISTENCY();
+  *got_reservation = true;
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::ResetWritePage() {
+  if (!has_write_iterator()) return;
+  // Unpin the write page if we're reading in unpinned mode.
+  Page* prev_write_page = write_page_;
+  write_page_ = nullptr;
+
+  // May need to decrement pin count now that it's not the write page, depending on
+  // the stream's mode.
+  UnpinPageIfNeeded(prev_write_page, pinned_);
+}
+
+Status BufferedTupleStreamV2::NextReadPage() {
+  DCHECK(!closed_);
+  CHECK_CONSISTENCY();
+
+  if (delete_on_read_) {
+    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " << DebugString();
+    DCHECK_NE(&*read_page_, write_page_);
+    bytes_pinned_ -= pages_.front().len();
+    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
+    pages_.pop_front();
+    read_page_ = pages_.begin();
+  } else {
+    // Unpin pages after reading them if needed.
+    Page* prev_read_page = &*read_page_;
+    ++read_page_;
+    UnpinPageIfNeeded(prev_read_page, pinned_);
+  }
+
+  if (!has_read_iterator()) {
+    CHECK_CONSISTENCY();
+    return Status::OK();
+  }
+
+  // Ensure the next page is pinned for reading. If the stream is unpinned, we freed up
+  // enough reservation by deleting or unpinning the previous page.
+  // TODO: IMPALA-3208: this page may be larger than the previous, so this could
+  // actually fail once we have variable-length pages.
+  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+
+  read_page_rows_returned_ = 0;
+  read_ptr_ = read_page_->data();
+
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::ResetReadPage() {
+  if (!has_read_iterator()) return;
+  // Unpin the write page if we're reading in unpinned mode.
+  Page* prev_read_page = &*read_page_;
+  read_page_ = pages_.end();
+
+  // May need to decrement pin count after destroying read iterator.
+  UnpinPageIfNeeded(prev_read_page, pinned_);
+}
+
+Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) {
+  CHECK_CONSISTENCY();
+  ResetWritePage();
+  ResetReadPage();
+  // If already pinned, no additional pin is needed (see ExpectedPinCount()).
+  *got_reservation = pinned_ || buffer_pool_client_->IncreaseReservationToFit(page_len_);
+  if (!*got_reservation) return Status::OK();
+  return PrepareForReadInternal(delete_on_read);
+}
+
+Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
+  DCHECK(!closed_);
+  DCHECK(!delete_on_read_);
+  DCHECK(!pages_.empty());
+  DCHECK(!has_read_iterator());
+
+  // Check if we need to increment the pin count of the read page.
+  read_page_ = pages_.begin();
+  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+
+  DCHECK(has_read_iterator());
+  DCHECK(read_page_->is_pinned());
+  read_page_rows_returned_ = 0;
+  read_ptr_ = read_page_->data();
+  rows_returned_ = 0;
+  delete_on_read_ = delete_on_read;
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::PinStream(bool* pinned) {
+  DCHECK(!closed_);
+  CHECK_CONSISTENCY();
+  if (pinned_) {
+    *pinned = true;
+    return Status::OK();
+  }
+  *pinned = false;
+  // First, make sure we have the reservation to pin all the pages for reading.
+  int64_t bytes_to_pin = 0;
+  for (Page& page : pages_) {
+    bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len();
+  }
+  bool reservation_granted = buffer_pool_client_->IncreaseReservationToFit(bytes_to_pin);
+  if (!reservation_granted) return Status::OK();
+
+  // At this point success is guaranteed - go through to pin the pages we need to pin.
+  for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true));
+
+  pinned_ = true;
+  *pinned = true;
+  CHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
+  DCHECK(!closed_);
+  if (mode == UNPIN_ALL) {
+    // Invalidate the iterators so they don't keep pages pinned.
+    ResetWritePage();
+    ResetReadPage();
+  }
+
+  if (pinned_) {
+    // If the stream was pinned, there may be some remaining pinned pages that should
+    // be unpinned at this point.
+    for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
+    pinned_ = false;
+  }
+  CHECK_CONSISTENCY();
+}
+
+Status BufferedTupleStreamV2::GetRows(
+    MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) {
+  if (num_rows() > numeric_limits<int>::max()) {
+    // RowBatch::num_rows_ is a 32-bit int, avoid an overflow.
+    return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit "
+                             "is $1",
+        num_rows(), numeric_limits<int>::max()));
+  }
+  RETURN_IF_ERROR(PinStream(got_rows));
+  if (!*got_rows) return Status::OK();
+  bool got_reservation;
+  RETURN_IF_ERROR(PrepareForRead(false, &got_reservation));
+  DCHECK(got_reservation) << "Stream was pinned";
+  batch->reset(new RowBatch(desc_, num_rows(), tracker));
+  bool eos = false;
+  // Loop until GetNext fills the entire batch. Each call can stop at page
+  // boundaries. We generally want it to stop, so that pages can be freed
+  // as we read. It is safe in this case because we pin the entire stream.
+  while (!eos) {
+    RETURN_IF_ERROR(GetNext(batch->get(), &eos));
+  }
+  return Status::OK();
+}
+
+Status BufferedTupleStreamV2::GetNext(RowBatch* batch, bool* eos) {
+  return GetNextInternal<false>(batch, eos, nullptr);
+}
+
+Status BufferedTupleStreamV2::GetNext(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  return GetNextInternal<true>(batch, eos, flat_rows);
+}
+
+template <bool FILL_FLAT_ROWS>
+Status BufferedTupleStreamV2::GetNextInternal(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  if (has_nullable_tuple_) {
+    return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows);
+  } else {
+    return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows);
+  }
+}
+
+template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
+Status BufferedTupleStreamV2::GetNextInternal(
+    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
+  DCHECK(!closed_);
+  DCHECK(batch->row_desc().Equals(desc_));
+  DCHECK(is_pinned() || !FILL_FLAT_ROWS)
+      << "FlatRowPtrs are only valid for pinned streams";
+  *eos = (rows_returned_ == num_rows_);
+  if (*eos) return Status::OK();
+
+  if (UNLIKELY(read_page_rows_returned_ == read_page_->num_rows)) {
+    // Get the next page in the stream. We need to do this at the beginning of the
+    // GetNext() call to ensure the buffer management semantics. NextReadPage() may
+    // unpin or delete the buffer backing the rows returned from the *previous* call
+    // to GetNext().
+    RETURN_IF_ERROR(NextReadPage());
+  }
+
+  DCHECK(has_read_iterator());
+  DCHECK(read_page_->is_pinned()) << DebugString();
+  DCHECK_GE(read_page_rows_returned_, 0);
+
+  int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_;
+  int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page);
+  DCHECK_GE(rows_to_fill, 1);
+  uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
+
+  // Produce tuple rows from the current page and the corresponding position on the
+  // null tuple indicator.
+  if (FILL_FLAT_ROWS) {
+    DCHECK(flat_rows != nullptr);
+    DCHECK(!delete_on_read_);
+    DCHECK_EQ(batch->num_rows(), 0);
+    flat_rows->clear();
+    flat_rows->reserve(rows_to_fill);
+  }
+
+  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  // Start reading from the current position in 'read_page_'.
+  for (int i = 0; i < rows_to_fill; ++i) {
+    if (FILL_FLAT_ROWS) {
+      flat_rows->push_back(read_ptr_);
+      DCHECK_EQ(flat_rows->size(), i + 1);
+    }
+    // Copy the row into the output batch.
+    TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
+    tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
+    UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row);
+
+    // Update string slot ptrs, skipping external strings.
+    for (int j = 0; j < inlined_string_slots_.size(); ++j) {
+      Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first);
+      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+      FixUpStringsForRead(inlined_string_slots_[j].second, tuple);
+    }
+
+    // Update collection slot ptrs, skipping external collections. We traverse the
+    // collection structure in the same order as it was written to the stream, allowing
+    // us to infer the data layout based on the length of collections and strings.
+    for (int j = 0; j < inlined_coll_slots_.size(); ++j) {
+      Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first);
+      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+      FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple);
+    }
+  }
+
+  batch->CommitRows(rows_to_fill);
+  rows_returned_ += rows_to_fill;
+  read_page_rows_returned_ += rows_to_fill;
+  *eos = (rows_returned_ == num_rows_);
+  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) {
+    // No more data in this page. The batch must be immediately returned up the operator
+    // tree and deep copied so that NextReadPage() can reuse the read page's buffer.
+    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
+    batch->MarkNeedsDeepCopy();
+  }
+  if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
+  DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len());
+  return Status::OK();
+}
+
+void BufferedTupleStreamV2::FixUpStringsForRead(
+    const vector<SlotDescriptor*>& string_slots, Tuple* tuple) {
+  DCHECK(tuple != nullptr);
+  for (const SlotDescriptor* slot_desc : string_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+
+    StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
+    DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len());
+    sv->ptr = reinterpret_cast<char*>(read_ptr_);
+    read_ptr_ += sv->len;
+  }
+}
+
+void BufferedTupleStreamV2::FixUpCollectionsForRead(
+    const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) {
+  DCHECK(tuple != nullptr);
+  for (const SlotDescriptor* slot_desc : collection_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+
+    CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
+    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    int coll_byte_size = cv->num_tuples * item_desc.byte_size();
+    DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len());
+    cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
+    read_ptr_ += coll_byte_size;
+
+    if (!item_desc.HasVarlenSlots()) continue;
+    uint8_t* coll_data = cv->ptr;
+    for (int i = 0; i < cv->num_tuples; ++i) {
+      Tuple* item = reinterpret_cast<Tuple*>(coll_data);
+      FixUpStringsForRead(item_desc.string_slots(), item);
+      FixUpCollectionsForRead(item_desc.collection_slots(), item);
+      coll_data += item_desc.byte_size();
+    }
+  }
+}
+
+int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept {
+  int64_t size = 0;
+  if (has_nullable_tuple_) {
+    size += NullIndicatorBytesPerRow();
+    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
+      if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i];
+    }
+  } else {
+    size = fixed_tuple_row_size_;
+  }
+  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
+    Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
+    if (tuple == nullptr) continue;
+    const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second;
+    for (auto it = slots.begin(); it != slots.end(); ++it) {
+      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
+      size += tuple->GetStringSlot((*it)->tuple_offset())->len;
+    }
+  }
+
+  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
+    Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
+    if (tuple == nullptr) continue;
+    const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second;
+    for (auto it = slots.begin(); it != slots.end(); ++it) {
+      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
+      CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset());
+      const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor();
+      size += cv->num_tuples * item_desc.byte_size();
+
+      if (!item_desc.HasVarlenSlots()) continue;
+      for (int j = 0; j < cv->num_tuples; ++j) {
+        Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]);
+        size += item->VarlenByteSize(item_desc);
+      }
+    }
+  }
+  return size;
+}
+
+bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept {
+  bool got_reservation;
+  *status = AdvanceWritePage(ComputeRowSize(row), &got_reservation);
+  if (!status->ok() || !got_reservation) return false;
+  return DeepCopy(row);
+}
+
+uint8_t* BufferedTupleStreamV2::AllocateRowSlow(
+    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept {
+  int64_t row_size = static_cast<int64_t>(fixed_size) + varlen_size;
+  bool got_reservation;
+  *status = AdvanceWritePage(row_size, &got_reservation);
+  if (!status->ok() || !got_reservation) return nullptr;
+
+  // We have a large-enough page so now success is guaranteed.
+  uint8_t* result = AllocateRow(fixed_size, varlen_size, varlen_data, status);
+  DCHECK(result != nullptr);
+  return result;
+}
+
+bool BufferedTupleStreamV2::DeepCopy(TupleRow* row) noexcept {
+  if (has_nullable_tuple_) {
+    return DeepCopyInternal<true>(row);
+  } else {
+    return DeepCopyInternal<false>(row);
+  }
+}
+
+// TODO: consider codegening this.
+// TODO: in case of duplicate tuples, this can redundantly serialize data.
+template <bool HAS_NULLABLE_TUPLE>
+bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept {
+  if (UNLIKELY(write_page_ == nullptr)) return false;
+  DCHECK(write_page_->is_pinned()) << DebugString() << std::endl
+                                   << write_page_->DebugString();
+
+  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  uint32_t bytes_remaining = write_end_ptr_ - write_ptr_;
+
+  // Move to the next page we may not have enough space to append the fixed-length part
+  // of the row.
+  if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_))) return false;
+
+  // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple
+  // indicator.
+  if (HAS_NULLABLE_TUPLE) {
+    uint8_t* null_indicators = write_ptr_;
+    int null_indicator_bytes = NullIndicatorBytesPerRow();
+    memset(null_indicators, 0, null_indicator_bytes);
+    write_ptr_ += NullIndicatorBytesPerRow();
+    for (int i = 0; i < tuples_per_row; ++i) {
+      uint8_t* null_word = null_indicators + (i >> 3);
+      const uint32_t null_pos = i & 7;
+      const int tuple_size = fixed_tuple_sizes_[i];
+      Tuple* t = row->GetTuple(i);
+      const uint8_t mask = 1 << (7 - null_pos);
+      if (t != nullptr) {
+        memcpy(write_ptr_, t, tuple_size);
+        write_ptr_ += tuple_size;
+      } else {
+        *null_word |= mask;
+      }
+    }
+  } else {
+    // If we know that there are no nullable tuples no need to set the nullability flags.
+    for (int i = 0; i < tuples_per_row; ++i) {
+      const int tuple_size = fixed_tuple_sizes_[i];
+      Tuple* t = row->GetTuple(i);
+      // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots)
+      // is delivered, the check below should become DCHECK(t != nullptr).
+      DCHECK(t != nullptr || tuple_size == 0);
+      memcpy(write_ptr_, t, tuple_size);
+      write_ptr_ += tuple_size;
+    }
+  }
+
+  // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets
+  // on the write path, only on the read. The tuple data is immediately followed
+  // by the string data so only the len information is necessary.
+  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
+    const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
+    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false;
+  }
+
+  // Copy inlined collection slots. We copy collection data in a well-defined order so
+  // we do not need to convert pointers to offsets on the write path.
+  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
+    const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
+    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
+    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false;
+  }
+
+  ++num_rows_;
+  ++write_page_->num_rows;
+  return true;
+}
+
+bool BufferedTupleStreamV2::CopyStrings(
+    const Tuple* tuple, const vector<SlotDescriptor*>& string_slots) {
+  for (const SlotDescriptor* slot_desc : string_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
+    if (LIKELY(sv->len > 0)) {
+      if (UNLIKELY(write_ptr_ + sv->len > write_end_ptr_)) return false;
+
+      memcpy(write_ptr_, sv->ptr, sv->len);
+      write_ptr_ += sv->len;
+    }
+  }
+  return true;
+}
+
+bool BufferedTupleStreamV2::CopyCollections(
+    const Tuple* tuple, const vector<SlotDescriptor*>& collection_slots) {
+  for (const SlotDescriptor* slot_desc : collection_slots) {
+    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
+    const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
+    const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
+    if (LIKELY(cv->num_tuples > 0)) {
+      int coll_byte_size = cv->num_tuples * item_desc.byte_size();
+      if (UNLIKELY(write_ptr_ + coll_byte_size > write_end_ptr_)) return false;
+      uint8_t* coll_data = write_ptr_;
+      memcpy(coll_data, cv->ptr, coll_byte_size);
+      write_ptr_ += coll_byte_size;
+
+      if (!item_desc.HasVarlenSlots()) continue;
+      // Copy variable length data when present in collection items.
+      for (int i = 0; i < cv->num_tuples; ++i) {
+        const Tuple* item = reinterpret_cast<Tuple*>(coll_data);
+        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false;
+        if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false;
+        coll_data += item_desc.byte_size();
+      }
+    }
+  }
+  return true;
+}
+
+void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const {
+  DCHECK(row != nullptr);
+  DCHECK(!closed_);
+  DCHECK(is_pinned());
+  DCHECK(!delete_on_read_);
+  uint8_t* data = flat_row;
+  return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
+                               UnflattenTupleRow<false>(&data, row);
+}
+
+template <bool HAS_NULLABLE_TUPLE>
+void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) const {
+  const int tuples_per_row = desc_.tuple_descriptors().size();
+  uint8_t* ptr = *data;
+  if (has_nullable_tuple_) {
+    // Stitch together the tuples from the page and the NULL ones.
+    const uint8_t* null_indicators = ptr;
+    ptr += NullIndicatorBytesPerRow();
+    for (int i = 0; i < tuples_per_row; ++i) {
+      const uint8_t* null_word = null_indicators + (i >> 3);
+      const uint32_t null_pos = i & 7;
+      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
+      row->SetTuple(
+          i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null));
+      ptr += fixed_tuple_sizes_[i] * is_not_null;
+    }
+  } else {
+    for (int i = 0; i < tuples_per_row; ++i) {
+      row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
+      ptr += fixed_tuple_sizes_[i];
+    }
+  }
+  *data = ptr;
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
new file mode 100644
index 0000000..d707604
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
+#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
+
+#include <set>
+#include <vector>
+#include <boost/scoped_ptr.hpp>
+
+#include "common/global-types.h"
+#include "common/status.h"
+#include "gutil/macros.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class MemTracker;
+class RuntimeState;
+class RowDescriptor;
+class SlotDescriptor;
+class Tuple;
+class TupleRow;
+
+/// Class that provides an abstraction for a stream of tuple rows backed by BufferPool
+/// Pages. Rows can be added to the stream and read back. Rows are returned in the order
+/// they are added.
+///
+/// The BufferedTupleStream is *not* thread safe from the caller's point of view.
+/// Different threads should not concurrently call methods of the same BufferedTupleStream
+/// object.
+///
+/// Reading and writing the stream:
+/// The stream supports two modes of reading/writing, depending on whether
+/// PrepareForWrite() is called to initialize a write iterator only or
+/// PrepareForReadWrite() is called to initialize both read and write iterators to enable
+/// interleaved reads and writes.
+///
+/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AllocateRow()
+/// are called repeatedly to initialize then advance a write iterator through the stream.
+/// Once the stream is fully written, it can be read back by calling PrepareForRead()
+/// then GetNext() repeatedly to advance a read iterator through the stream, or by
+/// calling GetRows() to get all of the rows at once.
+///
+/// To use read/write mode, PrepareForReadWrite() is called once to initialize the read
+/// and write iterators. AddRow()/AllocateRow() then advance a write iterator through the
+/// stream, and GetNext() advances a trailing read iterator through the stream.
+///
+/// Buffer management:
+/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses
+/// the client's reservation to pin pages in memory. It will automatically try to
+/// increase the client's reservation whenever it needs to do so to make progress.
+///
+/// The stream has both pinned and unpinned modes. In the pinned mode all pages are
+/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory
+/// and allows clients to save pointers to rows in the stream and randomly access them.
+/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only
+/// pages currently being read and written are pinned and other pages are unpinned and
+/// therefore do not use the client's reservation and can be spilled to disk.
+///
+/// When the stream is in read/write mode, the stream always uses one buffer's worth
+/// of reservation of writing and at least one buffer's worth of reservation for reading,
+/// even if the same page is currently being read and written. This means that
+/// UnpinStream() always succeeds, and moving to the next write page or read page on an
+/// unpinned stream does not require additional reservation.
+/// TODO: IMPALA-3208: variable-length pages will add a caveat here.
+///
+/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag
+/// to PrepareForRead() which deletes the stream's pages as it does a final read
+/// pass over the stream.
+///
+/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach
+/// buffers to RowBatches.
+///
+/// Page layout:
+/// Rows are stored back to back starting at the first byte of each page's buffer, with
+/// no interleaving of data from different rows. There is no padding or alignment
+/// between rows.
+///
+/// Tuple row layout:
+/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a
+/// bitstring at the start of each row with null indicators for all tuples in each row
+/// (including non-nullable tuples). The bitstring occupies ceil(num_tuples_per_row / 8)
+/// bytes. A 1 indicates the tuple is null.
+///
+/// The fixed length parts of the row's tuples are stored first, followed by var len data
+/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can
+/// point to var len data outside the stream. When reading the stream, the length of each
+/// row's var len data in the stream must be computed to find the next row's start.
+///
+/// The tuple stream supports reading from the stream into RowBatches without copying
+/// out any data: the RowBatches' Tuple pointers will point directly into the stream's
+/// pages' buffers. The fixed length parts follow Impala's internal tuple format, so for
+/// the tuple to be valid, we only need to update pointers to point to the var len data
+/// in the stream. These pointers need to be updated by the stream because a spilled
+/// page's data may be relocated to a different buffer. The pointers are updated lazily
+/// upon reading the stream via GetNext() or GetRows().
+///
+/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world"))
+/// with all var len data stored in the stream:
+///  <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ...
+/// +--------+-----------+-----------+-----------+-------------+
+/// | IntVal | StringVal | BigIntVal | StringVal |             | ...
+/// +--------+-----------+-----------+-----------++------------+
+/// | val: 1 | len: 5    | val: 2    | len: 5    | helloworld  | ...
+/// |        | ptr: 0x.. |           | ptr: 0x.. |             | ...
+/// +--------+-----------+-----------+-----------+-------------+
+///  <--4b--> <---12b---> <----8b---> <---12b---> <----10b---->
+///
+/// Example layout for a row with the second tuple nullable ((1, "hello"), NULL)
+/// with all var len data stored in the stream:
+/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row ...
+/// +-------------------------+--------+-----------+------------+
+/// |                         | IntVal | StringVal |            | ...
+/// +-------------------------+--------+-----------+------------+
+/// | 0000 0010               | val: 1 | len: 5    | hello      | ...
+/// |                         |        | ptr: 0x.. |            | ...
+/// +-------------------------+--------+-----------+------------+
+///  <---------1b------------> <--4b--> <---12b---> <----5b---->
+///
+/// Example layout for a row with a single non-nullable tuple (("hello", "world")) with
+/// the second string slot stored externally to the stream:
+///  <------ tuple 1 ------> <- var len ->  <- next row ...
+/// +-----------+-----------+-------------+
+/// | StringVal | StringVal |             | ...
+/// +-----------+-----------+-------------+
+/// | len: 5    | len: 5    |  hello      | ...
+/// | ptr: 0x.. | ptr: 0x.. |             | ...
+/// +-----------+-----------+-------------+
+///  <---12b---> <---12b---> <-----5b---->
+///
+/// The behavior of reads and writes is as follows:
+/// Read:
+///   1. Unpinned: Only a single read page is pinned at a time. This means that only
+///     enough reservation to pin a single page is needed to read the stream, regardless
+///     of the stream's size. Each page is deleted or unpinned (if delete on read is true
+///     or false respectively) before advancing to the next page.
+///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
+///     unpinned when reading from the stream. If delete on read is true, pages are
+///     deleted after being read.
+/// Write:
+///   1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation
+///     to pin a single write page is required to write to the stream, regardless of the
+///     stream's size.
+///   2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned
+///     because the caller's reservation is insufficient (and could not be increased by
+///     the stream), the read call will fail and the caller can either unpin the stream
+///     or free up other memory before retrying.
+///
+/// Memory lifetime of rows read from stream:
+/// If the stream is pinned and delete on read is false, it is valid to access any tuples
+/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is
+/// unpinned or delete on read is true, then the batch returned from GetNext() may have
+/// the needs_deep_copy flag set, which means that any tuple memory returned so far from
+/// the stream may be freed on the next call to GetNext().
+/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch.
+///
+/// Manual construction of rows with AllocateRow():
+/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow().
+/// AllocateRow() is called instead of AddRow() if the caller wants to manually construct
+/// a row. The caller of AllocateRow() is responsible for writing the row with exactly the
+/// layout described above.
+///
+/// If a caller constructs a tuple in this way, the caller can set the pointers and they
+/// will not be modified until the stream is read via GetNext() or GetRows().
+/// TODO: IMPALA-5007: try to remove AllocateRow() by unifying with AddRow().
+///
+/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a
+/// page will need to be pinned soon.
+class BufferedTupleStreamV2 {
+ public:
+  /// A pointer to the start of a flattened TupleRow in the stream.
+  typedef uint8_t* FlatRowPtr;
+
+  /// row_desc: description of rows stored in the stream. This is the desc for rows
+  /// that are added and the rows being returned.
+  /// page_len: the size of pages to use in the stream
+  /// TODO:IMPALA-3208: support a default and maximum page length
+  /// ext_varlen_slots: set of varlen slots with data stored externally to the stream
+  BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t page_len,
+      const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
+
+  virtual ~BufferedTupleStreamV2();
+
+  /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
+  /// once before any of the other APIs.
+  /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned.
+  /// 'node_id' is only used for error reporting.
+  Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for writing by attempting to allocate a write buffer. Tries to
+  /// increase reservation if there is not enough unused reservation for the buffer.
+  /// Called after Init() and before the first AddRow() or AllocateRow() call.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     first write page and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for interleaved reads and writes by allocating read and write
+  /// buffers. Called after Init() and before the first AddRow() or AllocateRow() call.
+  /// delete_on_read: Pages are deleted after they are read.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     read and write pages and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForReadWrite(
+      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Prepares the stream for reading, invalidating the write iterator (if there is one).
+  /// Therefore must be called after the last AddRow() or AllocateRow() and before
+  /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes
+  /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was previously
+  /// called with delete_on_read = true.
+  /// delete_on_read: Pages are deleted after they are read.
+  /// 'got_reservation': set to true if there was enough reservation to initialize the
+  ///     first read page and false if there was not enough reservation and no other
+  ///     error was encountered. Undefined if an error status is returned.
+  Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
+
+  /// Adds a single row to the stream. There are three possible outcomes:
+  /// a) The append succeeds. True is returned.
+  /// b) The append fails because the unused reservation was not sufficient to add
+  ///   a new page to the stream and the stream could not increase the reservation
+  ///   sufficiently. Returns false and sets 'status' to OK. The append can be retried
+  ///   after freeing up memory or unpinning the stream.
+  /// c) The append fails with a runtime error. Returns false and sets 'status' to an
+  ///   error.
+  /// d) The append fails becase the row is too large to fit in a page of a stream.
+  ///   Returns false and sets 'status' to an error.
+  ///
+  /// Unpinned streams avoid case b) because memory is automatically freed up by
+  /// unpinning the current write page.
+  /// TODO: IMPALA-3808: update to reflect behaviour with variable-length pages
+  ///
+  /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow()
+  /// returns an error, it should not be called again.
+  bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
+
+  /// Allocates space to store a row of with fixed length 'fixed_size' and variable
+  /// length data 'varlen_size'. If successful, returns the pointer where fixed length
+  /// data should be stored and assigns 'varlen_data' to where var-len data should
+  /// be stored.  AllocateRow does not currently support nullable tuples.
+  ///
+  /// The meaning of the return values are the same as AddRow(), except failure is
+  /// indicated by returning NULL instead of false.
+  uint8_t* AllocateRow(
+      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status);
+
+  /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the
+  /// stream is pinned. The row must have been allocated with the stream's row desc.
+  /// The returned 'row' is backed by memory from the stream so is only valid as long
+  /// as the stream is pinned.
+  void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const;
+
+  /// Pins all pages in this stream and switches to pinned mode. Has no effect if the
+  /// stream is already pinned.
+  /// If the current unused reservation is not sufficient to pin the stream in memory,
+  /// this will try to increase the reservation. If that fails, 'pinned' is set to false
+  /// and the stream is left unpinned. Otherwise 'pinned' is set to true.
+  Status PinStream(bool* pinned) WARN_UNUSED_RESULT;
+
+  /// Modes for UnpinStream().
+  enum UnpinMode {
+    /// All pages in the stream are unpinned and the read/write positions in the stream
+    /// are reset. No more rows can be written to the stream after this. The stream can
+    /// be re-read from the beginning by calling PrepareForRead().
+    UNPIN_ALL,
+    /// All pages are unpinned aside from the current read and write pages (if any),
+    /// which is left in the same state. The unpinned stream can continue being read
+    /// or written from the current read or write positions.
+    UNPIN_ALL_EXCEPT_CURRENT,
+  };
+
+  /// Unpins stream with the given 'mode' as described above.
+  void UnpinStream(UnpinMode mode);
+
+  /// Get the next batch of output rows, which are backed by the stream's memory.
+  /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy'
+  /// flag may be set on 'batch' to signal that memory will be freed on the next
+  /// call to GetNext() and that the caller should copy out any data it needs from
+  /// rows in 'batch' or in previous batches returned from GetNext().
+  ///
+  /// If the stream is pinned and 'delete_on_read' is false, the memory backing the
+  /// rows will remain valid until the stream is unpinned, destroyed, etc.
+  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
+  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
+
+  /// Same as above, but populate 'flat_rows' with a pointer to the flat version of
+  /// each returned row in the pinned stream. The pointers in 'flat_rows' are only
+  /// valid as long as the stream remains pinned.
+  Status GetNext(
+      RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT;
+
+  /// Returns all the rows in the stream in batch. This pins the entire stream in the
+  /// process. If the current unused reservation is not sufficient to pin the stream in
+  /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set
+  /// to false.
+  Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch,
+      bool* got_rows) WARN_UNUSED_RESULT;
+
+  /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
+  /// attaches buffers from any pinned pages to the batch and deletes unpinned
+  /// pages. Otherwise deletes all pages. Does nothing if the stream was already
+  /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching
+  /// buffers.
+  void Close(RowBatch* batch, RowBatch::FlushMode flush);
+
+  /// Number of rows in the stream.
+  int64_t num_rows() const { return num_rows_; }
+
+  /// Number of rows returned via GetNext().
+  int64_t rows_returned() const { return rows_returned_; }
+
+  /// Returns the byte size necessary to store the entire stream in memory.
+  int64_t byte_size() const { return total_byte_size_; }
+
+  /// Returns the number of bytes currently pinned in memory by the stream.
+  /// If ignore_current is true, the write_page_ memory is not included.
+  int64_t BytesPinned(bool ignore_current) const {
+    if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) {
+      return bytes_pinned_ - write_page_->len();
+    }
+    return bytes_pinned_;
+  }
+
+  bool is_closed() const { return closed_; }
+  bool is_pinned() const { return pinned_; }
+  bool has_read_iterator() const { return read_page_ != pages_.end(); }
+  bool has_write_iterator() const { return write_page_ != nullptr; }
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferedTupleStreamV2);
+  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
+  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
+  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
+  friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
+
+  /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
+  struct Page {
+    Page() : num_rows(0) {}
+
+    inline int len() const { return handle.len(); }
+    inline uint8_t* data() const { return handle.data(); }
+    inline bool is_pinned() const { return handle.is_pinned(); }
+    inline int pin_count() const { return handle.pin_count(); }
+    std::string DebugString() const;
+
+    BufferPool::PageHandle handle;
+
+    /// Number of rows written to the page.
+    int num_rows;
+  };
+
+  /// Runtime state instance used to check for cancellation. Not owned.
+  RuntimeState* const state_;
+
+  /// Description of rows stored in the stream.
+  const RowDescriptor& desc_;
+
+  /// Sum of the fixed length portion of all the tuples in desc_, including any null
+  /// indicators.
+  int fixed_tuple_row_size_;
+
+  /// The size of the fixed length portion for each tuple in the row.
+  std::vector<int> fixed_tuple_sizes_;
+
+  /// Vectors of all the strings slots that have their varlen data stored in stream
+  /// grouped by tuple_idx.
+  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_;
+
+  /// Vectors of all the collection slots that have their varlen data stored in the
+  /// stream, grouped by tuple_idx.
+  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_;
+
+  /// Buffer pool and client used to allocate, pin and release pages. Not owned.
+  BufferPool* buffer_pool_;
+  BufferPool::ClientHandle* buffer_pool_client_;
+
+  /// List of pages in the stream.
+  /// Empty before PrepareForWrite() is called or after the stream has been destructively
+  /// read in 'delete_on_read' mode. Non-empty otherwise.
+  std::list<Page> pages_;
+
+  /// Total size of pages_, including any pages already deleted in 'delete_on_read'
+  /// mode.
+  int64_t total_byte_size_;
+
+  /// Iterator pointing to the current page for reading. Equal to list.end() when no
+  /// read iterator is active. GetNext() does not advance this past the end of
+  /// the stream, so upon eos 'read_page_' points to the last page and
+  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an
+  /// error status was returned.
+  std::list<Page>::iterator read_page_;
+
+  /// Number of rows returned from the current read_page_.
+  uint32_t read_page_rows_returned_;
+
+  /// Pointer into read_page_ to the byte after the last row read.
+  uint8_t* read_ptr_;
+
+  /// Pointer into write_page_ to the byte after the last row written.
+  uint8_t* write_ptr_;
+
+  /// Pointer to one byte past the end of write_page_. Cached to speed up computation
+  uint8_t* write_end_ptr_;
+
+  /// Number of rows returned to the caller from GetNext() since the last
+  /// PrepareForRead() call.
+  int64_t rows_returned_;
+
+  /// The current page for writing. NULL if there is no available page to write to.
+  /// Always pinned. If 'read_page_' and 'write_page_' reference the same page, then
+  /// that page is only pinned once.
+  Page* write_page_;
+
+  /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
+  /// to compute it.
+  int64_t bytes_pinned_;
+
+  /// Number of rows stored in the stream. Includes rows that were already deleted during
+  /// a destructive 'delete_on_read' pass over the stream.
+  int64_t num_rows_;
+
+  /// The length in bytes of pages used to store the stream's rows.
+  /// TODO: IMPALA-3808: support variable-length pages
+  const int64_t page_len_;
+
+  /// Whether any tuple in the rows is nullable.
+  const bool has_nullable_tuple_;
+
+  /// If true, pages are deleted after they are read.
+  bool delete_on_read_;
+
+  bool closed_; // Used for debugging.
+
+  /// If true, this stream has been explicitly pinned by the caller and all pages are
+  /// kept pinned until the caller calls UnpinStream().
+  bool pinned_;
+
+  bool is_read_page(const Page* page) const {
+    return has_read_iterator() && &*read_page_ == page;
+  }
+
+  bool is_write_page(const Page* page) const { return write_page_ == page; }
+
+  /// The slow path for AddRow() that is called if there is not sufficient space in
+  /// the current page.
+  bool AddRowSlow(TupleRow* row, Status* status) noexcept;
+
+  /// The slow path for AllocateRow() that is called if there is not sufficient space in
+  /// the current page.
+  uint8_t* AllocateRowSlow(
+      int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept;
+
+  /// Copies 'row' into write_page_. Returns false if there is not enough space in
+  /// 'write_page_'. After returning false, write_ptr_ may be left pointing to the
+  /// partially-written row, and no more data can be written to write_page_.
+  template <bool HAS_NULLABLE_TUPLE>
+  bool DeepCopyInternal(TupleRow* row) noexcept;
+
+  /// Helper function to copy strings in string_slots from tuple into write_page_.
+  /// Updates write_ptr_ to the end of the string data added. Returns false if the data
+  /// does not fit in the current write page. After returning false, write_ptr_ is left
+  /// pointing to the partially-written row, and no more data can be written to
+  /// write_page_.
+  bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots);
+
+  /// Helper function to deep copy collections in collection_slots from tuple into
+  /// write_page_. Updates write_ptr_ to the end of the collection data added. Returns
+  /// false if the data does not fit in the current write page. After returning false,
+  /// write_ptr_ is left pointing to the partially-written row, and no more data can be
+  /// written to write_page_.
+  bool CopyCollections(
+      const Tuple* tuple, const std::vector<SlotDescriptor*>& collection_slots);
+
+  /// Wrapper of the templated DeepCopyInternal() function.
+  bool DeepCopy(TupleRow* row) noexcept;
+
+  /// Gets a new page of 'page_len_' bytes from buffer_pool_, updating write_page_,
+  /// write_ptr_ and write_end_ptr_. The caller must ensure there is sufficient unused
+  /// reservation to allocate the page. The caller must reset the write iterator (if
+  /// there is one).
+  Status NewWritePage() noexcept WARN_UNUSED_RESULT;
+
+  /// Validates that a page can fit a row of 'row_size' bytes.
+  /// Returns an error if the row cannot fit in a page.
+  Status CheckPageSizeForRow(int64_t row_size);
+
+  /// Wrapper around NewWritePage() that allocates a new write page that fits a row of
+  /// 'row_size' bytes. Increases reservation if needed to allocate the next page.
+  /// Returns OK and sets 'got_reservation' to true if the write page was successfully
+  /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets
+  /// 'got_reservation' to false if the reservation could not be increased and no other
+  /// error was encountered.
+  Status AdvanceWritePage(
+      int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT;
+
+  /// Reset the write page, if there is one, and unpin pages accordingly.
+  void ResetWritePage();
+
+  /// Same as PrepareForRead(), except the iterators are not invalidated and
+  /// the caller is assumed to have checked there is sufficient unused reservation.
+  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
+
+  /// Pins the next read page. This blocks reading from disk if necessary to bring the
+  /// page's data into memory. Updates read_page_, read_ptr_, and
+  /// read_page_rows_returned_.
+  Status NextReadPage() WARN_UNUSED_RESULT;
+
+  /// Reset the read page, if there is one, and unpin pages accordingly.
+  void ResetReadPage();
+
+  /// Returns the total additional bytes that this row will consume in write_page_ if
+  /// appended to the page. This includes the row's null indicators, the fixed length
+  /// part of the row and the data for inlined_string_slots_ and inlined_coll_slots_.
+  int64_t ComputeRowSize(TupleRow* row) const noexcept;
+
+  /// Pins page and updates tracking stats.
+  Status PinPage(Page* page) WARN_UNUSED_RESULT;
+
+  /// Increment the page's pin count if this page needs a higher pin count given the
+  /// current read and write iterator positions and whether the stream will be pinned
+  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to
+  /// be incremented multiple times. The caller is responsible for ensuring sufficient
+  /// reservation is available.
+  Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT;
+
+  /// Decrement the page's pin count if this page needs a lower pin count given the
+  /// current read and write iterator positions and whether the stream will be pinned
+  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to
+  /// be decremented multiple times.
+  void UnpinPageIfNeeded(Page* page, bool stream_pinned);
+
+  /// Return the expected pin count for 'page' in the current stream based on the current
+  /// read and write pages and whether the stream is pinned.
+  int ExpectedPinCount(bool stream_pinned, const Page* page) const;
+
+  /// Templated GetNext implementations.
+  template <bool FILL_FLAT_ROWS>
+  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+  template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
+  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
+
+  /// Helper function to convert a flattened TupleRow stored starting at '*data' into
+  /// 'row'. *data is updated to point to the first byte past the end of the row.
+  template <bool HAS_NULLABLE_TUPLE>
+  void UnflattenTupleRow(uint8_t** data, TupleRow* row) const;
+
+  /// Helper function for GetNextInternal(). For each string slot in string_slots,
+  /// update StringValue's ptr field to point to the corresponding string data stored
+  /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the
+  /// StringValue's length field.
+  void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple);
+
+  /// Helper function for GetNextInternal(). For each collection slot in collection_slots,
+  /// recursively update any pointers in the CollectionValue to point to the corresponding
+  /// var len data stored inline in the stream, advancing read_ptr_ as data is read.
+  /// Assumes that the collection was serialized to the stream in DeepCopy()'s format.
+  void FixUpCollectionsForRead(
+      const vector<SlotDescriptor*>& collection_slots, Tuple* tuple);
+
+  /// Returns the number of null indicator bytes per row. Only valid if this stream has
+  /// nullable tuples.
+  int NullIndicatorBytesPerRow() const;
+
+  /// Returns the total bytes pinned. Only called in DCHECKs to validate bytes_pinned_.
+  int64_t CalcBytesPinned() const;
+
+  /// DCHECKs if the stream is internally inconsistent. The stream should always be in
+  /// a consistent state after returning success from a public API call.
+  void CheckConsistency() const;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h
new file mode 100644
index 0000000..6ad4bc4
--- /dev/null
+++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
+#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
+
+#include "runtime/buffered-tuple-stream-v2.h"
+
+#include "runtime/descriptors.h"
+#include "runtime/tuple-row.h"
+#include "util/bit-util.h"
+
+namespace impala {
+
+inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
+  DCHECK(has_nullable_tuple_);
+  return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
+}
+
+inline bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {
+  DCHECK(!closed_);
+  if (LIKELY(DeepCopy(row))) return true;
+  return AddRowSlow(row, status);
+}
+
+inline uint8_t* BufferedTupleStreamV2::AllocateRow(
+    int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) {
+  DCHECK(!closed_);
+  DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples";
+  const int total_size = fixed_size + varlen_size;
+  if (UNLIKELY(write_page_ == nullptr || write_ptr_ + total_size > write_end_ptr_)) {
+    return AllocateRowSlow(fixed_size, varlen_size, varlen_data, status);
+  }
+  DCHECK(write_page_ != nullptr);
+  DCHECK(write_page_->is_pinned());
+  DCHECK_LE(write_ptr_ + total_size, write_end_ptr_);
+  ++num_rows_;
+  ++write_page_->num_rows;
+
+  uint8_t* fixed_data = write_ptr_;
+  write_ptr_ += fixed_size;
+  *varlen_data = write_ptr_;
+  write_ptr_ += varlen_size;
+  return fixed_data;
+}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 0e0d384..d4640ac 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -44,7 +44,10 @@ namespace impala {
 
 class BufferPoolTest : public ::testing::Test {
  public:
-  virtual void SetUp() { test_env_ = obj_pool_.Add(new TestEnv); }
+  virtual void SetUp() {
+    test_env_ = obj_pool_.Add(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
 
   virtual void TearDown() {
     for (auto entry : query_reservations_) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 850c90b..611520c 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -27,6 +27,8 @@
 #include "gen-cpp/CatalogService.h"
 #include "gen-cpp/ImpalaInternalService.h"
 #include "runtime/backend-client.h"
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
@@ -42,6 +44,7 @@
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
 #include "statestore/statestore-subscriber.h"
+#include "util/bit-util.h"
 #include "util/debug-util.h"
 #include "util/debug-util.h"
 #include "util/default-path-handlers.h"
@@ -148,6 +151,8 @@ ExecEnv::ExecEnv()
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    buffer_reservation_(nullptr),
+    buffer_pool_(nullptr),
     enable_webserver_(FLAGS_enable_webserver),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -202,6 +207,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
         "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
+    buffer_reservation_(nullptr),
+    buffer_pool_(NULL),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -229,7 +236,10 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
   exec_env_ = this;
 }
 
-ExecEnv::~ExecEnv() {}
+ExecEnv::~ExecEnv() {
+  if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
+  disk_io_mgr_.reset(); // Need to tear down before mem_tracker_.
+}
 
 Status ExecEnv::InitForFeTests() {
   mem_tracker_.reset(new MemTracker(-1, "Process"));
@@ -273,18 +283,6 @@ Status ExecEnv::StartServices() {
   if (bytes_limit < 0) {
     return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
   }
-  // Minimal IO Buffer requirements:
-  //   IO buffer (8MB default) * number of IO buffers per thread (5) *
-  //   number of threads per core * number of cores
-  int64_t min_requirement = disk_io_mgr_->max_read_buffer_size() *
-      DiskIoMgr::DEFAULT_QUEUE_CAPACITY *
-      FLAGS_num_threads_per_core * FLAGS_num_cores;
-  if (bytes_limit < min_requirement) {
-    LOG(WARNING) << "Memory limit "
-                 << PrettyPrinter::Print(bytes_limit, TUnit::BYTES)
-                 << " does not meet minimal memory requirement of "
-                 << PrettyPrinter::Print(min_requirement, TUnit::BYTES);
-  }
 
   metrics_->Init(enable_webserver_ ? webserver_.get() : NULL);
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
@@ -353,4 +351,10 @@ Status ExecEnv::StartServices() {
   return Status::OK();
 }
 
+void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
+  DCHECK(buffer_pool_ == nullptr);
+  buffer_pool_.reset(new BufferPool(min_page_size, capacity));
+  buffer_reservation_.reset(new ReservationTracker());
+  buffer_reservation_->InitRootTracker(NULL, capacity);
+}
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index bdeb4a4..a5777ef 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class BufferPool;
 class CallableThreadPool;
 class DataStreamMgr;
 class DiskIoMgr;
@@ -42,6 +43,7 @@ class PoolMemTrackerRegistry;
 class MetricGroup;
 class QueryResourceMgr;
 class RequestPoolService;
+class ReservationTracker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
@@ -65,8 +67,7 @@ class ExecEnv {
   /// we return the most recently created instance.
   static ExecEnv* GetInstance() { return exec_env_; }
 
-  /// Empty destructor because the compiler-generated one requires full
-  /// declarations for classes in scoped_ptrs.
+  /// Destructor - only used in backend tests that create new environment per test.
   virtual ~ExecEnv();
 
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
@@ -99,6 +100,8 @@ class ExecEnv {
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
+  ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
+  BufferPool* buffer_pool() { return buffer_pool_.get(); }
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
@@ -143,12 +146,20 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
 
+  /// Query-wide buffer pool and the root reservation tracker for the pool. The
+  /// reservation limit is equal to the maximum capacity of the pool.
+  /// For now this is only used by backend tests that create them via InitBufferPool();
+  boost::scoped_ptr<ReservationTracker> buffer_reservation_;
+  boost::scoped_ptr<BufferPool> buffer_pool_;
+
   /// Not owned by this class
   ImpalaServer* impala_server_;
 
   bool enable_webserver_;
 
  private:
+  friend class TestEnv;
+
   static ExecEnv* exec_env_;
   bool is_fe_tests_;
 
@@ -157,6 +168,9 @@ class ExecEnv {
 
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;
+
+  /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity.
+  void InitBufferPool(int64_t min_page_len, int64_t capacity);
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 2b2e3a0..b931808 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -15,16 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #include "runtime/query-state.h"
 
-#include <boost/thread/locks.hpp>
 #include <boost/thread/lock_guard.hpp>
+#include <boost/thread/locks.hpp>
 
+#include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "runtime/exec-env.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-exec-mgr.h"
+#include "util/debug-util.h"
 
 #include "common/names.h"
 
@@ -41,7 +43,12 @@ QueryState::ScopedRef::~ScopedRef() {
 }
 
 QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
-  : query_ctx_(query_ctx), refcnt_(0), prepared_(false), released_resources_(false) {
+  : query_ctx_(query_ctx),
+    refcnt_(0),
+    prepared_(false),
+    released_resources_(false),
+    buffer_reservation_(nullptr),
+    file_group_(nullptr) {
   TQueryOptions& query_options = query_ctx_.client_request.query_options;
   // max_errors does not indicate how many errors in total have been recorded, but rather
   // how many are distinct. It is defined as the sum of the number of generic errors and
@@ -56,8 +63,12 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool)
 }
 
 void QueryState::ReleaseResources() {
+  // Clean up temporary files.
+  if (file_group_ != nullptr) file_group_->Close();
+  // Release any remaining reservation.
+  if (buffer_reservation_ != nullptr) buffer_reservation_->Close();
   // Avoid dangling reference from the parent of 'query_mem_tracker_'.
-  query_mem_tracker_->UnregisterFromParent();
+  if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent();
   released_resources_ = true;
 }
 
@@ -77,19 +88,27 @@ Status QueryState::Prepare() {
   // Starting a new query creates threads and consumes a non-trivial amount of memory.
   // If we are already starved for memory, fail as early as possible to avoid consuming
   // more resources.
-  MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
   if (process_mem_tracker->LimitExceeded()) {
     string msg = Substitute("Query $0 could not start because the backend Impala daemon "
                             "is over its memory limit",
         PrintId(query_id()));
-    prepare_status_ = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
-    return prepare_status_;
+    status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0);
+    goto error;
+  }
+  // Do buffer-pool-related setup if running in a backend test that explicitly created
+  // the pool.
+  if (exec_env->buffer_pool() != nullptr) {
+    status = InitBufferPoolState();
+    if (!status.ok()) goto error;
   }
-
-  // TODO: IMPALA-3748: acquire minimum buffer reservation at this point.
-
   prepared_ = true;
   return Status::OK();
+
+error:
+  prepare_status_ = status;
+  return status;
 }
 
 void QueryState::InitMemTrackers(const std::string& pool) {
@@ -103,6 +122,37 @@ void QueryState::InitMemTrackers(const std::string& pool) {
       MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_);
 }
 
+Status QueryState::InitBufferPoolState() {
+  ExecEnv* exec_env = ExecEnv::GetInstance();
+  int64_t query_mem_limit = query_mem_tracker_->limit();
+  if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max();
+
+  // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from
+  // query_mem_limit.
+  int64_t max_reservation = numeric_limits<int64_t>::max();
+  if (query_options().__isset.max_block_mgr_memory
+      && query_options().max_block_mgr_memory > 0) {
+    max_reservation = query_options().max_block_mgr_memory;
+  }
+
+  // TODO: IMPALA-3748: claim the query-wide minimum reservation.
+  // For now, rely on exec nodes to grab their minimum reservation during Prepare().
+  buffer_reservation_ = obj_pool_.Add(new ReservationTracker);
+  buffer_reservation_->InitChildTracker(
+      NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation);
+
+  // TODO: once there's a mechanism for reporting non-fragment-local profiles,
+  // should make sure to report this profile so it's not going into a black hole.
+  RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy"));
+  // Only create file group if spilling is enabled.
+  if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) {
+    file_group_ = obj_pool_.Add(
+        new TmpFileMgr::FileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(),
+            dummy_profile, query_id(), query_options().scratch_limit));
+  }
+  return Status::OK();
+}
+
 void QueryState::RegisterFInstance(FragmentInstanceState* fis) {
   VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id());
   lock_guard<SpinLock> l(fis_map_lock_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h
index c381dfe..650e8bf 100644
--- a/be/src/runtime/query-state.h
+++ b/be/src/runtime/query-state.h
@@ -26,6 +26,7 @@
 #include "common/object-pool.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gen-cpp/Types_types.h"
+#include "runtime/tmp-file-mgr.h"
 #include "util/spinlock.h"
 #include "util/uid-util.h"
 
@@ -33,6 +34,7 @@ namespace impala {
 
 class FragmentInstanceState;
 class MemTracker;
+class ReservationTracker;
 
 /// Central class for all backend execution state (example: the FragmentInstanceStates
 /// of the individual fragment instances) created for a particular query.
@@ -94,6 +96,8 @@ class QueryState {
   }
 
   MemTracker* query_mem_tracker() const { return query_mem_tracker_; }
+  ReservationTracker* buffer_reservation() const { return buffer_reservation_; }
+  TmpFileMgr::FileGroup* file_group() const { return file_group_; }
 
   /// Sets up state required for fragment execution: memory reservations, etc. Fails
   /// if resources could not be acquired. Safe to call concurrently and idempotent:
@@ -145,12 +149,26 @@ class QueryState {
   /// The top-level MemTracker for this query (owned by obj_pool_).
   MemTracker* query_mem_tracker_;
 
+  /// Buffer reservation for this query (owned by obj_pool_)
+  /// Only non-null in backend tests that explicitly enabled the new buffer pool
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  ReservationTracker* buffer_reservation_;
+
+  /// Temporary files for this query (owned by obj_pool_)
+  /// Only non-null in backend tests the explicitly enabled the new buffer pool
+  /// TODO: this will always be non-null once IMPALA-3200 is done
+  TmpFileMgr::FileGroup* file_group_;
+
   /// Create QueryState w/ copy of query_ctx and refcnt of 0.
   /// The query is associated with the resource pool named 'pool'
   QueryState(const TQueryCtx& query_ctx, const std::string& pool);
 
   /// Called from Prepare() to initialize MemTrackers.
   void InitMemTrackers(const std::string& pool);
+
+  /// Called from PrepareForExecution() to setup buffer reservations and the
+  /// file group. Fails if required resources are not available.
+  Status InitBufferPoolState();
 };
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 6daa02f..8fb0b55 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -17,17 +17,18 @@
 
 #include "runtime/row-batch.h"
 
-#include <stdint.h>  // for intptr_t
+#include <stdint.h> // for intptr_t
 #include <boost/scoped_ptr.hpp>
 
+#include "gen-cpp/Results_types.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/string-value.h"
 #include "runtime/tuple-row.h"
 #include "util/compress.h"
-#include "util/decompress.h"
 #include "util/debug-util.h"
+#include "util/decompress.h"
 #include "util/fixed-size-hash-table.h"
-#include "gen-cpp/Results_types.h"
 
 #include "common/names.h"
 
@@ -157,6 +158,10 @@ RowBatch::~RowBatch() {
   for (int i = 0; i < blocks_.size(); ++i) {
     blocks_[i]->Delete();
   }
+  for (BufferInfo& buffer_info : buffers_) {
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer_info.client, &buffer_info.buffer);
+  }
   if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
     DCHECK(tuple_ptrs_ != NULL);
     free(tuple_ptrs_);
@@ -305,6 +310,16 @@ void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
   if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
 }
 
+void RowBatch::AddBuffer(
+    BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush) {
+  auxiliary_mem_usage_ += buffer.len();
+  BufferInfo buffer_info;
+  buffer_info.client = client;
+  buffer_info.buffer = std::move(buffer);
+  buffers_.push_back(std::move(buffer_info));
+  if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
+}
+
 void RowBatch::Reset() {
   num_rows_ = 0;
   capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
@@ -318,6 +333,11 @@ void RowBatch::Reset() {
     blocks_[i]->Delete();
   }
   blocks_.clear();
+  for (BufferInfo& buffer_info : buffers_) {
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        buffer_info.client, &buffer_info.buffer);
+  }
+  buffers_.clear();
   auxiliary_mem_usage_ = 0;
   if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
     tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
@@ -336,6 +356,11 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
     dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
   }
   blocks_.clear();
+  for (BufferInfo& buffer_info : buffers_) {
+    dest->AddBuffer(
+        buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
+  }
+  buffers_.clear();
   if (needs_deep_copy_) {
     dest->MarkNeedsDeepCopy();
   } else if (flush_ == FlushMode::FLUSH_RESOURCES) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 91433c4..0bb71d8 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -15,18 +15,18 @@
 // specific language governing permissions and limitations
 // under the License.
 
-
 #ifndef IMPALA_RUNTIME_ROW_BATCH_H
 #define IMPALA_RUNTIME_ROW_BATCH_H
 
-#include <vector>
 #include <cstring>
+#include <vector>
 #include <boost/scoped_ptr.hpp>
 
 #include "codegen/impala-ir.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
-#include "runtime/buffered-block-mgr.h" // for BufferedBlockMgr::Block
+#include "runtime/buffered-block-mgr.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "runtime/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
@@ -208,6 +208,7 @@ class RowBatch {
   MemPool* tuple_data_pool() { return &tuple_data_pool_; }
   int num_io_buffers() const { return io_buffers_.size(); }
   int num_blocks() const { return blocks_.size(); }
+  int num_buffers() const { return buffers_.size(); }
 
   /// Resets the row batch, returning all resources it has accumulated.
   void Reset();
@@ -220,10 +221,18 @@ class RowBatch {
   /// the original owner, even when the ownership of batches is transferred. If the
   /// original owner wants the memory to be released, it should call this with 'mode'
   /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
-  /// TODO: after IMPALA-3200, make the ownership transfer model consistent between
-  /// Blocks and I/O buffers.
   void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
 
+  /// Adds a buffer to this row batch. The buffer is deleted when freeing resources.
+  /// The buffer's memory remains accounted against the original owner, even when the
+  /// ownership of batches is transferred. If the original owner wants the memory to be
+  /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources()
+  /// for further explanation).
+  /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and
+  /// make it consistent between buffers and I/O buffers.
+  void AddBuffer(
+      BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush);
+
   /// Used by an operator to indicate that it cannot produce more rows until the
   /// resources that it has attached to the row batch are freed or acquired by an
   /// ancestor operator. After this is called, the batch is at capacity and no more rows
@@ -424,11 +433,19 @@ class RowBatch {
   /// are owned by the BufferedBlockMgr.
   std::vector<BufferedBlockMgr::Block*> blocks_;
 
+  struct BufferInfo {
+    BufferPool::ClientHandle* client;
+    BufferPool::BufferHandle buffer;
+  };
+
+  /// Pages attached to this row batch. See AddBuffer() for ownership semantics.
+  std::vector<BufferInfo> buffers_;
+
   /// String to write compressed tuple data to in Serialize().
   /// This is a string so we can swap() with the string in the TRowBatch we're serializing
   /// to (we don't compress directly into the TRowBatch in case the compressed data is
-  /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and
-  /// avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
+  /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch
+  /// and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
   /// assuming all row batches are roughly the same size, all strings will eventually be
   /// allocated to the right size.
   std::string compression_scratch_;