You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/20 05:56:28 UTC

impala git commit: IMPALA-5518: Allocate KrpcDataStreamRecvr RowBatch tuples from BufferPool

Repository: impala
Updated Branches:
  refs/heads/master 8302608bd -> 62d8462e1


IMPALA-5518: Allocate KrpcDataStreamRecvr RowBatch tuples from BufferPool

Previously, tuple pointers of a row batch are allocated from
the heap via malloc() and tuple data is allocated from the
MemPool associated with the RowBatch. This change converts
the allocations of tuple pointers and tuple data to using
BufferPool for row batches allocated from KrpcDataStreamRecvr.
The primary motivation for this change is to take advantage of
the fact that buffers allocated from BufferPool always go back
to the per-core arena they came from when they are freed. This
alleviates the TCMalloc imbalance between the RPC service threads
and the fragment execution threads. As described in IMPALA-5518,
row batches are always allocated from the service threads' TCMalloc
cache and placed into the fragment execution threads' TCMalloc cache
when they're freed. This leads to underflow and overflow in those
threads' caches and high contention for the spinlock of the central
free list. With BufferPool, the memory always went back to its
originating arena so this kind of imbalance is less likely to occur.
This also dovetails with the long term plan to put most allocations
under BufferPool and have each operators in the plan reserved
appropriate amount of memory before execution.

Note that the proper reservation mechanism of the exchange node
hasn't yet been implemented in this change so the buffer pool client
handle used for allocating buffers has an ad-hoc set-up of no reservation
limit and using root reservation tracker as parent. This needs to be
fixed as part of IMPALA-6524. The default buffer pool limit is also
bumped to 85% to account for the extra usage from the exchange nodes.
The minimum buffer size is also lowered to 8KB to reduce amount of memory
wastage as a row batch's tuple pointers / tuple data can sometimes be
much smaller than 64KB.

Testing done: Debug core build.

Change-Id: If4b1a45f68b9df0d3b539511e15aff15700246f2
Reviewed-on: http://gerrit.cloudera.org:8080/9344
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/62d8462e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/62d8462e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/62d8462e

Branch: refs/heads/master
Commit: 62d8462e13c69e6b2d67ad571fcd5c288514d398
Parents: 8302608
Author: Michael Ho <kw...@cloudera.com>
Authored: Wed Feb 14 17:45:58 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Feb 20 04:08:11 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   4 +-
 be/src/exec/blocking-join-node.cc               |  15 ++-
 be/src/exec/exchange-node.cc                    |  14 +-
 be/src/exec/exchange-node.h                     |   6 +
 be/src/exec/partial-sort-node.cc                |   2 +-
 be/src/runtime/data-stream-mgr-base.h           |   3 +-
 be/src/runtime/data-stream-mgr.cc               |   2 +-
 be/src/runtime/data-stream-mgr.h                |   5 +-
 be/src/runtime/data-stream-test.cc              |  37 ++++--
 be/src/runtime/exec-env.h                       |   1 +
 be/src/runtime/krpc-data-stream-mgr.cc          |   8 +-
 be/src/runtime/krpc-data-stream-mgr.h           |   5 +-
 be/src/runtime/krpc-data-stream-recvr.cc        |  32 +++--
 be/src/runtime/krpc-data-stream-recvr.h         |   8 +-
 be/src/runtime/row-batch.cc                     | 127 +++++++++++++------
 be/src/runtime/row-batch.h                      |  47 +++++--
 be/src/service/query-options-test.cc            |   6 +-
 .../queries/QueryTest/bloom_filters.test        |   4 +-
 tests/query_test/test_mem_usage_scaling.py      |  11 +-
 19 files changed, 242 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 8e81d23..3ed0a0f 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -59,7 +59,7 @@ DEFINE_string(mem_limit, "80%",  mem_limit_help_msg.c_str());
 static const string buffer_pool_limit_help_msg = "(Advanced) Limit on buffer pool size. "
      + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit") + " "
     "The default value and behaviour of this flag may change between releases.";
-DEFINE_string(buffer_pool_limit, "80%", buffer_pool_limit_help_msg.c_str());
+DEFINE_string(buffer_pool_limit, "85%", buffer_pool_limit_help_msg.c_str());
 
 static const string buffer_pool_clean_pages_limit_help_msg = "(Advanced) Limit on bytes "
     "of clean pages that will be accumulated in the buffer pool. "
@@ -67,7 +67,7 @@ static const string buffer_pool_clean_pages_limit_help_msg = "(Advanced) Limit o
 DEFINE_string(buffer_pool_clean_pages_limit, "10%",
     buffer_pool_clean_pages_limit_help_msg.c_str());
 
-DEFINE_int64(min_buffer_size, 64 * 1024,
+DEFINE_int64(min_buffer_size, 8 * 1024,
     "(Advanced) The minimum buffer size to use in the buffer pool");
 
 DEFINE_bool(enable_process_lifetime_heap_profiling, false, "(Advanced) Enables heap "

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 7adea7f..e8c0948 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -158,7 +158,13 @@ void BlockingJoinNode::ProcessBuildInputAsync(
   // is safe to do because while the build may have partially completed, it will not be
   // probed. BlockingJoinNode::Open() will return failure as soon as child(0)->Open()
   // completes.
-  if (CanCloseBuildEarly() || !status->ok()) child(1)->Close(state);
+  if (CanCloseBuildEarly() || !status->ok()) {
+    // Release resources in 'build_batch_' before closing the children as some of the
+    // resources are still accounted towards the children node.
+    build_batch_.reset();
+    child(1)->Close(state);
+  }
+
   // Release the thread token as soon as possible (before the main thread joins
   // on it).  This way, if we had a chain of 10 joins using 1 additional thread,
   // we'd keep the additional thread busy the whole time.
@@ -233,7 +239,12 @@ Status BlockingJoinNode::ProcessBuildInputAndOpenProbe(
     RETURN_IF_ERROR(child(1)->Open(state));
     RETURN_IF_ERROR(AcquireResourcesForBuild(state));
     RETURN_IF_ERROR(SendBuildInputToSink<false>(state, build_sink));
-    if (CanCloseBuildEarly()) child(1)->Close(state);
+    if (CanCloseBuildEarly()) {
+      // Release resources in 'build_batch_' before closing the children as some of the
+      // resources are still accounted towards the children node.
+      build_batch_.reset();
+      child(1)->Close(state);
+    }
     RETURN_IF_ERROR(child(0)->Open(state));
   }
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index cc39382..2dc662b 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -79,11 +79,18 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
   }
 #endif
 
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(
+      Substitute("Exchg Recvr (id=$0)", id_), nullptr,
+      ExecEnv::GetInstance()->buffer_reservation(), mem_tracker(),
+      numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_));
+
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
-  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(&input_row_desc_,
-      state->fragment_instance_id(), id_, num_senders_,
-      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker());
+  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(
+      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
+      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker(),
+      &recvr_buffer_pool_client_);
+
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));
@@ -128,6 +135,7 @@ void ExchangeNode::Close(RuntimeState* state) {
   if (is_closed()) return;
   if (less_than_.get() != nullptr) less_than_->Close(state);
   if (stream_recvr_ != nullptr) stream_recvr_->Close();
+  ExecEnv::GetInstance()->buffer_pool()->DeregisterClient(&recvr_buffer_pool_client_);
   ScalarExpr::Close(ordering_exprs_);
   ExecNode::Close(state);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/exec/exchange-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.h b/be/src/exec/exchange-node.h
index aaf44c2..64fa9fe 100644
--- a/be/src/exec/exchange-node.h
+++ b/be/src/exec/exchange-node.h
@@ -22,6 +22,8 @@
 #include <boost/scoped_ptr.hpp>
 #include "exec/exec-node.h"
 
+#include "runtime/bufferpool/buffer-pool.h"
+
 namespace impala {
 
 class DataStreamRecvrBase;
@@ -89,6 +91,10 @@ class ExchangeNode : public ExecNode {
   /// input_batch_ must be copied to the output batch in GetNext().
   int next_row_idx_;
 
+  /// The buffer pool client for allocating buffers for tuple pointers and
+  /// tuple data in row batches.
+  BufferPool::ClientHandle recvr_buffer_pool_client_;
+
   /// time spent reconstructing received rows
   RuntimeProfile::Counter* convert_row_batch_timer_;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/exec/partial-sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index 68d98a4..911f014 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -148,12 +148,12 @@ Status PartialSortNode::Reset(RuntimeState* state) {
 
 void PartialSortNode::Close(RuntimeState* state) {
   if (is_closed()) return;
+  input_batch_.reset();
   child(0)->Close(state);
   if (sorter_ != nullptr) sorter_->Close(state);
   sorter_.reset();
   ScalarExpr::Close(ordering_exprs_);
   ScalarExpr::Close(sort_tuple_exprs_);
-  input_batch_.reset();
   ExecNode::Close(state);
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index f9761cb..6f1ec78 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -20,6 +20,7 @@
 #define IMPALA_RUNTIME_DATA_STREAM_MGR_BASE_H
 
 #include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"  // for PlanNodeId
 #include "util/aligned-new.h"
 
@@ -47,7 +48,7 @@ class DataStreamMgrBase : public CacheLineAligned {
   virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
       const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
       int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker) = 0;
+      MemTracker* parent_tracker, BufferPool::ClientHandle* client = nullptr) = 0;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 48a819c..3a22130 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -78,7 +78,7 @@ inline uint32_t DataStreamMgr::GetHashValue(
 shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(const RowDescriptor* row_desc,
     const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
     int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-    MemTracker* parent_tracker) {
+    MemTracker* parent_tracker, BufferPool::ClientHandle* client) {
   DCHECK(profile != nullptr);
   DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 2be6478..f37b1b1 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -74,11 +74,12 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
   /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
-  /// caller.
+  /// caller. 'client' is the BufferPool's client handle for allocating buffers.
+  /// It's owned by the parent exchange node.
   std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
       const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
       int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker) override;
+      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
 
   /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender instance

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index c540d1d..11374a2 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -134,11 +134,11 @@ class ImpalaThriftTestBackend : public ImpalaInternalServiceIf {
 // this test file.
 class ImpalaKRPCTestBackend : public DataStreamServiceIf {
  public:
-  ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr)
+  ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr,
+      MemTracker* process_mem_tracker)
     : DataStreamServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
       rpc_mgr_(rpc_mgr),
       stream_mgr_(stream_mgr) {
-    MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
     bool is_percent;
     int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
         &is_percent, process_mem_tracker->limit());
@@ -184,7 +184,6 @@ enum KrpcSwitch {
 class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
  protected:
   DataStreamTest() : next_val_(0) {
-
     // Stop tests that rely on mismatched sender / receiver pairs timing out from failing.
     FLAGS_datastream_sender_timeout_ms = 250;
   }
@@ -196,9 +195,16 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
 
     exec_env_.reset(new ExecEnv());
     ABORT_IF_ERROR(exec_env_->InitForFeTests());
+    exec_env_->InitBufferPool(32 * 1024, 1024 * 1024 * 1024, 32 * 1024);
     runtime_state_.reset(new RuntimeState(TQueryCtx(), exec_env_.get()));
     mem_pool_.reset(new MemPool(&tracker_));
 
+    // Register a BufferPool client for allocating buffers for row batches.
+    ABORT_IF_ERROR(exec_env_->buffer_pool()->RegisterClient(
+        "DataStream Test Recvr", nullptr, exec_env_->buffer_reservation(), &tracker_,
+        numeric_limits<int64_t>::max(), runtime_state_->runtime_profile(),
+        &buffer_pool_client_));
+
     CreateRowDesc();
 
     is_asc_.push_back(true);
@@ -207,7 +213,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
 
     next_instance_id_.lo = 0;
     next_instance_id_.hi = 0;
-    stream_mgr_ = ExecEnv::GetInstance()->stream_mgr();
+    stream_mgr_ = exec_env_->stream_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
     broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
@@ -274,6 +280,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     } else {
       StopKrpcBackend();
     }
+    exec_env_->buffer_pool()->DeregisterClient(&buffer_pool_client_);
   }
 
   void Reset() {
@@ -297,6 +304,9 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   // The sorting expression for the single BIGINT column.
   vector<ScalarExpr*> ordering_exprs_;
 
+  // Client for allocating buffers for row batches.
+  BufferPool::ClientHandle buffer_pool_client_;
+
   // RowBatch generation
   scoped_ptr<RowBatch> batch_;
   int next_val_;
@@ -440,7 +450,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
     ReceiverInfo& info = receiver_info_.back();
     info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
-        num_senders, buffer_size, is_merging, profile, &tracker_);
+        num_senders, buffer_size, is_merging, profile, &tracker_, &buffer_pool_client_);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
     } else {
@@ -553,7 +563,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
     // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
     boost::shared_ptr<ImpalaThriftTestBackend> handler(
-        new ImpalaThriftTestBackend(ExecEnv::GetInstance()->ThriftStreamMgr()));
+        new ImpalaThriftTestBackend(exec_env_->ThriftStreamMgr()));
     boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
     ASSERT_OK(builder.Build(&server_));
@@ -561,10 +571,11 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StartKrpcBackend() {
-    RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
-    KrpcDataStreamMgr* krpc_stream_mgr = ExecEnv::GetInstance()->KrpcStreamMgr();
+    RpcMgr* rpc_mgr = exec_env_->rpc_mgr();
+    KrpcDataStreamMgr* krpc_stream_mgr = exec_env_->KrpcStreamMgr();
     ASSERT_OK(rpc_mgr->Init());
-    test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr));
+    test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr,
+        exec_env_->process_mem_tracker()));
     ASSERT_OK(test_service_->Init());
     ASSERT_OK(krpc_stream_mgr->Init(test_service_->mem_tracker()));
     ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
@@ -577,7 +588,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StopKrpcBackend() {
-    ExecEnv::GetInstance()->rpc_mgr()->Shutdown();
+    exec_env_->rpc_mgr()->Shutdown();
   }
 
   void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
@@ -714,7 +725,7 @@ class DataStreamTestShortServiceQueue : public DataStreamTest {
 };
 
 INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
-    ::testing::Values(USE_THRIFT, USE_KRPC));
+    ::testing::Values(USE_KRPC, USE_THRIFT));
 
 INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
     ::testing::Values(USE_THRIFT));
@@ -796,7 +807,7 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
   shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
-      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_);
+      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_, nullptr);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last
   // (to confirm that the destructor does not access invalid state after tear-down).
@@ -860,7 +871,7 @@ TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
   ReceiverInfo& info = receiver_info_.back();
   info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
-      4, 1024 * 1024, false, profile, &tracker_);
+      4, 1024 * 1024, false, profile, &tracker_, &buffer_pool_client_);
   info.thread_handle = new thread(
       &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, &info);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index cd07f9b..3f050c9 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -215,6 +215,7 @@ class ExecEnv {
 
  private:
   friend class TestEnv;
+  friend class DataStreamTest;
 
   static ExecEnv* exec_env_;
   bool is_fe_tests_ = false;

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 4a9a91e..184ec49 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -100,13 +100,15 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
 shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
     const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
     int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-    MemTracker* parent_tracker) {
+    MemTracker* parent_tracker, BufferPool::ClientHandle* client) {
   DCHECK(profile != nullptr);
   DCHECK(parent_tracker != nullptr);
+  DCHECK(client != nullptr);
   VLOG_FILE << "creating receiver for fragment="<< finst_id
             << ", node=" << dest_node_id;
-  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(this, parent_tracker,
-      row_desc, finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
+  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(
+      this, parent_tracker, row_desc, finst_id, dest_node_id, num_senders, is_merging,
+      buffer_size, profile, client));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
   EarlySendersList early_senders_for_recvr;
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index f4358ea..d5f9d0e 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -241,11 +241,12 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
   /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
-  /// caller.
+  /// caller. 'client' is the BufferPool's client handle for allocating buffers.
+  /// It's owned by the parent exchange node.
   std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
       const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
       int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
-      MemTracker* parent_tracker) override;
+      MemTracker* parent_tracker, BufferPool::ClientHandle* client) override;
 
   /// Handler for TransmitData() RPC.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 138cc8b..2f379f4 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -121,10 +121,11 @@ class KrpcDataStreamRecvr::SenderQueue {
   // the row batch is deserialized. 'batch_size' is the size in bytes of the deserialized
   // row batch. The caller is expected to have called CanEnqueue() to make sure the row
   // batch can be inserted without exceeding the soft limit of the receiver. Also notify
-  // a thread waiting on 'data_arrival_cv_'.
-  void AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
+  // a thread waiting on 'data_arrival_cv_'. Return error status if the row batch creation
+  // failed. Returns OK otherwise.
+  Status AddBatchWork(int64_t batch_size, const RowBatchHeaderPB& header,
       const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
-      unique_lock<SpinLock>* lock);
+      unique_lock<SpinLock>* lock) WARN_UNUSED_RESULT;
 
   // Respond to the TransmitData RPC passed in 'ctx' with 'status' and release the payload
   // memory from the MemTracker associated with 'recvr_'.
@@ -291,7 +292,7 @@ Status KrpcDataStreamRecvr::SenderQueue::UnpackRequest(
   return Status::OK();
 }
 
-void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
+Status KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
     const RowBatchHeaderPB& header, const kudu::Slice& tuple_offsets,
     const kudu::Slice& tuple_data, unique_lock<SpinLock>* lock) {
   DCHECK(lock != nullptr);
@@ -308,21 +309,27 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   // Drop the lock so we can deserialize multiple batches in parallel.
   lock->unlock();
   unique_ptr<RowBatch> batch;
+  Status status;
   {
     SCOPED_TIMER(recvr_->deserialize_row_batch_timer_);
     // At this point, the row batch will be inserted into batch_queue_. Close() will
     // handle deleting any unconsumed batches from batch_queue_. Close() cannot proceed
     // until there are no pending insertion to batch_queue_.
-    batch.reset(new RowBatch(recvr_->row_desc(), header, tuple_offsets, tuple_data,
-        recvr_->mem_tracker()));
+    status = RowBatch::FromProtobuf(recvr_->row_desc(), header, tuple_offsets, tuple_data,
+        recvr_->mem_tracker(), recvr_->client(), &batch);
   }
   lock->lock();
 
   DCHECK_GT(num_pending_enqueue_, 0);
   --num_pending_enqueue_;
+  if (UNLIKELY(!status.ok())) {
+    recvr_->num_buffered_bytes_.Add(-batch_size);
+    return status;
+  }
   VLOG_ROW << "added #rows=" << batch->num_rows() << " batch_size=" << batch_size;
   batch_queue_.emplace_back(batch_size, move(batch));
   data_arrival_cv_.notify_one();
+  return Status::OK();
 }
 
 void KrpcDataStreamRecvr::SenderQueue::RespondAndReleaseRpc(const Status& status,
@@ -376,17 +383,18 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
     }
 
     // At this point, we are committed to inserting the row batch into 'batch_queue_'.
-    AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
   }
 
   // Respond to the sender to ack the insertion of the row batches.
-  Status::OK().ToProto(response->mutable_status());
+  status.ToProto(response->mutable_status());
   rpc_context->RespondSuccess();
 }
 
 void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
   // Owns the first entry of 'deferred_rpcs_' if it ends up being popped.
   std::unique_ptr<TransmitDataCtx> ctx;
+  Status status;
   {
     unique_lock<SpinLock> l(lock_);
     DCHECK_GT(num_deserialize_tasks_pending_, 0);
@@ -420,11 +428,11 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
     // Dequeues the deferred batch and adds it to 'batch_queue_'.
     deferred_rpcs_.pop();
     const RowBatchHeaderPB& header = ctx->request->row_batch_header();
-    AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
+    status = AddBatchWork(batch_size, header, tuple_offsets, tuple_data, &l);
   }
 
   // Responds to the sender to ack the insertion of the row batches.
-  RespondAndReleaseRpc(Status::OK(), ctx);
+  RespondAndReleaseRpc(status, ctx);
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
@@ -519,7 +527,8 @@ void KrpcDataStreamRecvr::TransferAllResources(RowBatch* transfer_batch) {
 KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     MemTracker* parent_tracker, const RowDescriptor* row_desc,
     const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
-    bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile)
+    bool is_merging, int64_t total_buffer_limit, RuntimeProfile* profile,
+    BufferPool::ClientHandle* client)
   : mgr_(stream_mgr),
     fragment_instance_id_(fragment_instance_id),
     dest_node_id_(dest_node_id),
@@ -527,6 +536,7 @@ KrpcDataStreamRecvr::KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr,
     row_desc_(row_desc),
     is_merging_(is_merging),
     num_buffered_bytes_(0),
+    client_(client),
     profile_(profile),
     recvr_side_profile_(profile_->CreateChild("RecvrSide")),
     sender_side_profile_(profile_->CreateChild("SenderSide")) {

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/krpc-data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.h b/be/src/runtime/krpc-data-stream-recvr.h
index 8bd99cf..f4c2a5e 100644
--- a/be/src/runtime/krpc-data-stream-recvr.h
+++ b/be/src/runtime/krpc-data-stream-recvr.h
@@ -26,6 +26,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h"   // for TUniqueId
+#include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
 #include "util/tuple-row-compare.h"
 
@@ -102,6 +103,7 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   PlanNodeId dest_node_id() const { return dest_node_id_; }
   const RowDescriptor* row_desc() const { return row_desc_; }
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
+  BufferPool::ClientHandle* client() const { return client_; }
 
  private:
   friend class KrpcDataStreamMgr;
@@ -110,7 +112,8 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   KrpcDataStreamRecvr(KrpcDataStreamMgr* stream_mgr, MemTracker* parent_tracker,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, bool is_merging,
-      int64_t total_buffer_limit, RuntimeProfile* profile);
+      int64_t total_buffer_limit, RuntimeProfile* profile,
+      BufferPool::ClientHandle* client);
 
   /// Adds a new row batch to the appropriate sender queue. If the row batch can be
   /// inserted, the RPC will be responded to before this function returns. If the batch
@@ -169,6 +172,9 @@ class KrpcDataStreamRecvr : public DataStreamRecvrBase {
   /// Memtracker for batches in the sender queue(s).
   boost::scoped_ptr<MemTracker> mem_tracker_;
 
+  /// The buffer pool client for allocating buffers of incoming row batches. Not owned.
+  BufferPool::ClientHandle* client_;
+
   /// One or more queues of row batches received from senders. If is_merging_ is true,
   /// there is one SenderQueue for each sender. Otherwise, row batches from all senders
   /// are placed in the same SenderQueue. The SenderQueue instances are owned by the

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index bad1f1c..dbc12c7 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -47,13 +47,13 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(row_desc->tuple_descriptors().size()),
+    tuple_ptrs_size_(capacity * num_tuples_per_row_ * sizeof(Tuple*)),
     attached_buffer_bytes_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker_ != NULL);
   DCHECK_GT(capacity, 0);
-  tuple_ptrs_size_ = capacity * num_tuples_per_row_ * sizeof(Tuple*);
   DCHECK_GT(tuple_ptrs_size_, 0);
   // TODO: switch to Init() pattern so we can check memory limit and return Status.
   mem_tracker_->Consume(tuple_ptrs_size_);
@@ -74,56 +74,58 @@ RowBatch::RowBatch(
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(input_batch.row_tuples.size()),
+    tuple_ptrs_size_(capacity_ * num_tuples_per_row_ * sizeof(Tuple*)),
     attached_buffer_bytes_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker_ != nullptr);
-  kudu::Slice tuple_data =
+  DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
+  DCHECK_GT(tuple_ptrs_size_, 0);
+  kudu::Slice input_tuple_data =
       kudu::Slice(input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
-  kudu::Slice tuple_offsets = kudu::Slice(
+  kudu::Slice input_tuple_offsets = kudu::Slice(
       reinterpret_cast<const char*>(input_batch.tuple_offsets.data()),
       input_batch.tuple_offsets.size() * sizeof(int32_t));
   const THdfsCompression::type& compression_type = input_batch.compression_type;
   DCHECK(compression_type == THdfsCompression::NONE ||
       compression_type == THdfsCompression::LZ4)
       << "Unexpected compression type: " << input_batch.compression_type;
-  Deserialize(tuple_offsets, tuple_data, input_batch.uncompressed_size,
-      compression_type == THdfsCompression::LZ4);
+
+  mem_tracker_->Consume(tuple_ptrs_size_);
+  tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
+  DCHECK(tuple_ptrs_ != nullptr) << "Failed to allocate tuple pointers";
+
+  const uint64_t uncompressed_size = input_batch.uncompressed_size;
+  uint8_t* tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
+  DCHECK(tuple_data != nullptr) << "Failed to allocate tuple data";
+
+  Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size,
+      compression_type == THdfsCompression::LZ4, tuple_data);
 }
 
 RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
-    const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
     MemTracker* mem_tracker)
-  : num_rows_(header.num_rows()),
-    capacity_(header.num_rows()),
+  : num_rows_(0),
+    capacity_(0),
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
     num_tuples_per_row_(header.num_tuples_per_row()),
+    tuple_ptrs_size_(header.num_rows() * num_tuples_per_row_ * sizeof(Tuple*)),
     attached_buffer_bytes_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker_ != nullptr);
-  const CompressionType& compression_type = header.compression_type();
-  DCHECK(compression_type == CompressionType::NONE ||
-      compression_type == CompressionType::LZ4)
-      << "Unexpected compression type: " << compression_type;
-  Deserialize(tuple_offsets, tuple_data, header.uncompressed_size(),
-      compression_type == CompressionType::LZ4);
+  DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
+  DCHECK_GT(tuple_ptrs_size_, 0);
 }
 
 void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets,
-    const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed) {
-  // TODO: switch to Init() pattern so we can check memory limit and return Status.
-  DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
-  tuple_ptrs_size_ = num_rows_ * num_tuples_per_row_ * sizeof(Tuple*);
-  DCHECK_GT(tuple_ptrs_size_, 0);
-  mem_tracker_->Consume(tuple_ptrs_size_);
-  tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
+    const kudu::Slice& input_tuple_data, int64_t uncompressed_size,
+    bool is_compressed, uint8_t* tuple_data) {
   DCHECK(tuple_ptrs_ != nullptr);
-
-  uint8_t* tuple_data;
+  DCHECK(tuple_data != nullptr);
   if (is_compressed) {
     // Decompress tuple data into data pool
     const uint8_t* compressed_data = input_tuple_data.data();
@@ -135,14 +137,13 @@ void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets,
     auto compressor_cleanup =
         MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
 
-    DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
-    tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
     status = decompressor.ProcessBlock(
         true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
+    DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
     DCHECK(status.ok()) << "RowBatch decompression failed.";
   } else {
     // Tuple data uncompressed, copy directly into data pool
-    tuple_data = tuple_data_pool_.Allocate(input_tuple_data.size());
+    DCHECK_EQ(uncompressed_size, input_tuple_data.size());
     memcpy(tuple_data, input_tuple_data.data(), input_tuple_data.size());
   }
 
@@ -181,15 +182,50 @@ void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets,
   }
 }
 
+Status RowBatch::FromProtobuf(const RowDescriptor* row_desc,
+    const RowBatchHeaderPB& header, const kudu::Slice& input_tuple_offsets,
+    const kudu::Slice& input_tuple_data, MemTracker* mem_tracker,
+    BufferPool::ClientHandle* client, unique_ptr<RowBatch>* row_batch_ptr) {
+  unique_ptr<RowBatch> row_batch(new RowBatch(row_desc, header, mem_tracker));
+
+  DCHECK(client != nullptr);
+  row_batch->tuple_ptrs_info_.reset(new BufferInfo());
+  row_batch->tuple_ptrs_info_->client = client;
+  BufferPool::BufferHandle* tuple_ptrs_buffer = &(row_batch->tuple_ptrs_info_->buffer);
+  RETURN_IF_ERROR(
+      row_batch->AllocateBuffer(client, row_batch->tuple_ptrs_size_, tuple_ptrs_buffer));
+  row_batch->tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_ptrs_buffer->data());
+
+  const int64_t uncompressed_size = header.uncompressed_size();
+  BufferPool::BufferHandle tuple_data_buffer;
+  RETURN_IF_ERROR(
+      row_batch->AllocateBuffer(client, uncompressed_size, &tuple_data_buffer));
+  uint8_t* tuple_data = tuple_data_buffer.data();
+  row_batch->AddBuffer(client, move(tuple_data_buffer), FlushMode::NO_FLUSH_RESOURCES);
+
+  row_batch->num_rows_ = header.num_rows();
+  row_batch->capacity_ = header.num_rows();
+  const CompressionType& compression_type = header.compression_type();
+  DCHECK(compression_type == CompressionType::NONE ||
+      compression_type == CompressionType::LZ4)
+      << "Unexpected compression type: " << compression_type;
+  row_batch->Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size,
+      compression_type == CompressionType::LZ4, tuple_data);
+  *row_batch_ptr = std::move(row_batch);
+  return Status::OK();
+}
+
 RowBatch::~RowBatch() {
   tuple_data_pool_.FreeAll();
-  for (BufferInfo& buffer_info : buffers_) {
+  FreeBuffers();
+  if (tuple_ptrs_info_.get() != nullptr) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
-        buffer_info.client, &buffer_info.buffer);
+        tuple_ptrs_info_->client, &(tuple_ptrs_info_->buffer));
+  } else {
+    DCHECK(tuple_ptrs_ != nullptr);
+    free(tuple_ptrs_);
+    mem_tracker_->Release(tuple_ptrs_size_);
   }
-  DCHECK(tuple_ptrs_ != nullptr);
-  free(tuple_ptrs_);
-  mem_tracker_->Release(tuple_ptrs_size_);
   tuple_ptrs_ = nullptr;
 }
 
@@ -351,6 +387,20 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
   DCHECK_EQ(offset, size);
 }
 
+Status RowBatch::AllocateBuffer(BufferPool::ClientHandle* client, int64_t len,
+    BufferPool::BufferHandle* buffer_handle) {
+  BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
+  int64_t buffer_len = BitUtil::RoundUpToPowerOfTwo(len);
+  buffer_len = max(buffer_pool->min_buffer_len(), buffer_len);
+  RETURN_IF_ERROR(
+      buffer_pool->AllocateUnreservedBuffer(client, buffer_len, buffer_handle));
+  if (UNLIKELY(!buffer_handle->is_open())) {
+    return mem_tracker_->MemLimitExceeded(
+        nullptr, "Failed to allocate row batch", buffer_len);
+  }
+  return Status::OK();
+}
+
 void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
     BufferPool::BufferHandle&& buffer, FlushMode flush) {
   attached_buffer_bytes_ += buffer.len();
@@ -361,16 +411,20 @@ void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
   if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
 }
 
-void RowBatch::Reset() {
-  num_rows_ = 0;
-  capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
-  // TODO: Change this to Clear() and investigate the repercussions.
-  tuple_data_pool_.FreeAll();
+void RowBatch::FreeBuffers() {
   for (BufferInfo& buffer_info : buffers_) {
     ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
         buffer_info.client, &buffer_info.buffer);
   }
   buffers_.clear();
+}
+
+void RowBatch::Reset() {
+  num_rows_ = 0;
+  capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
+  // TODO: Change this to Clear() and investigate the repercussions.
+  tuple_data_pool_.FreeAll();
+  FreeBuffers();
   attached_buffer_bytes_ = 0;
   flush_ = FlushMode::NO_FLUSH_RESOURCES;
   needs_deep_copy_ = false;
@@ -430,6 +484,7 @@ void RowBatch::AcquireState(RowBatch* src) {
   num_rows_ = src->num_rows_;
   capacity_ = src->capacity_;
   // tuple_ptrs_ were allocated with malloc so can be swapped between batches.
+  DCHECK(tuple_ptrs_info_.get() == nullptr);
   std::swap(tuple_ptrs_, src->tuple_ptrs_);
   src->TransferResourceOwnership(this);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 2c08f30..aad5ebe 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -143,13 +143,16 @@ class RowBatch {
   RowBatch(const RowDescriptor* row_desc, const TRowBatch& input_batch,
       MemTracker* tracker);
 
-  /// Populate a row batch from the serialized row batch header, decompress / copy
-  /// the tuple's data into a buffer and convert all offsets in 'tuple_offsets' back
-  /// into pointers into the tuple data's buffer. The tuple data's buffer is allocated
-  /// from the row batch's MemPool tracked by 'mem_tracker'.
-  RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
-      const kudu::Slice& input_tuple_data, const kudu::Slice& input_tuple_offsets,
-      MemTracker* mem_tracker);
+  /// Creates a row batch from the protobuf row batch header, decompress / copy
+  /// 'input_tuple_data' into a buffer and convert all offsets in 'input_tuple_offsets'
+  /// back into pointers. The tuple pointers and data's buffers are allocated from the
+  /// buffer pool with 'client' as client handle. The newly created row batch is
+  /// stored in 'row_batch_ptr'. Returns error status on failure. Returns ok otherwise.
+  static Status FromProtobuf(const RowDescriptor* row_desc,
+      const RowBatchHeaderPB& header, const kudu::Slice& input_tuple_data,
+      const kudu::Slice& input_tuple_offsets, MemTracker* mem_tracker,
+      BufferPool::ClientHandle* client, std::unique_ptr<RowBatch>* row_batch_ptr)
+      WARN_UNUSED_RESULT;
 
   /// Releases all resources accumulated at this row batch.  This includes
   ///  - tuple_ptrs
@@ -405,6 +408,22 @@ class RowBatch {
   friend class RowBatchSerializeBenchmark;
   friend class RowBatchSerializeTest;
 
+  /// Creates an empty row batch based on the serialized row batch header. Called from
+  /// FromProtobuf() above before desrialization of a protobuf row batch.
+  RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
+      MemTracker* mem_tracker);
+
+  /// Allocate from buffer pool a buffer of 'len' using the client handle 'client'.
+  /// The actual buffer size is 'len' rounded up to power of 2 or minimum buffer size,
+  /// whichever is larger. The reservation of 'client' may be increased. On success,
+  /// the newly allocated buffer is returned in 'buffer_handle'. Return error status
+  /// if allocation failed. In which case, 'buffer_handle' is not opened.
+  Status AllocateBuffer(BufferPool::ClientHandle* client, int64_t len,
+      BufferPool::BufferHandle* buffer_handle);
+
+  /// Free all BufferInfo and the associated buffers in 'buffers_'.
+  void FreeBuffers();
+
   /// Decide whether to do full tuple deduplication based on row composition. Full
   /// deduplication is enabled only when there is risk of the serialized size being
   /// much larger than in-memory size due to non-adjacent duplicate tuples.
@@ -441,9 +460,12 @@ class RowBatch {
   ///
   /// 'is_compressed': True if 'input_tuple_data' is compressed.
   ///
+  /// 'tuple_data': buffer of 'uncompressed_size' bytes for holding tuple data.
+  ///
   /// TODO: clean this up once the thrift RPC implementation is removed.
   void Deserialize(const kudu::Slice& input_tuple_offsets,
-      const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed);
+      const kudu::Slice& input_tuple_data, int64_t uncompressed_size, bool is_compressed,
+      uint8_t* tuple_data);
 
   typedef FixedSizeHashTable<Tuple*, int> DedupMap;
 
@@ -484,8 +506,8 @@ class RowBatch {
   /// more performant that allocating the pointers from 'tuple_data_pool_' especially
   /// with SubplanNodes in the ExecNode tree because the tuple pointers are not
   /// transferred and do not have to be re-created in every Reset().
-  int tuple_ptrs_size_;
-  Tuple** tuple_ptrs_;
+  const int tuple_ptrs_size_;
+  Tuple** tuple_ptrs_ = nullptr;
 
   /// Total bytes of BufferPool buffers attached to this batch.
   int64_t attached_buffer_bytes_;
@@ -503,13 +525,16 @@ class RowBatch {
   MemTracker* mem_tracker_;  // not owned
 
   struct BufferInfo {
-    BufferPool::ClientHandle* client;
+    BufferPool::ClientHandle* client = nullptr;
     BufferPool::BufferHandle buffer;
   };
 
   /// Pages attached to this row batch. See AddBuffer() for ownership semantics.
   std::vector<BufferInfo> buffers_;
 
+  /// The BufferInfo for the 'tuple_ptrs_' which are allocated from the buffer pool.
+  std::unique_ptr<BufferInfo> tuple_ptrs_info_;
+
   /// String to write compressed tuple data to in Serialize().
   /// This is a string so we can swap() with the string in the serialized row batch
   /// (i.e. TRowBatch or OutboundRowBatch) we're serializing to (we don't compress

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index c198919..06be431 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -148,9 +148,9 @@ TEST(QueryOptions, SetByteOptions) {
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       // Lower limit for runtime_filter_max_size is FLAGS_min_buffer_size which has a
-      // default value of is 64KB.
+      // default value of is 8KB.
       {MAKE_OPTIONDEF(runtime_filter_max_size),
-          {64 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
+          {8 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
       {MAKE_OPTIONDEF(runtime_bloom_filter_size),
           {RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
               RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}
@@ -316,7 +316,7 @@ TEST(QueryOptions, SetSpecialOptions) {
     auto TestOk = MakeTestOkFn(options, key_def);
     auto TestError = MakeTestErrFn(options, key_def);
     TestOk("128KB", 128 * 1024);
-    TestError("65535"); // default value of FLAGS_min_buffer_size is 64KB
+    TestError("8191"); // default value of FLAGS_min_buffer_size is 8KB
     TestOk("64KB", 64 * 1024);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
index b7a6123..593e66b 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/bloom_filters.test
@@ -128,12 +128,12 @@ SET RUNTIME_FILTER_MODE=GLOBAL;
 SET RUNTIME_FILTER_WAIT_TIME_MS=30000;
 SET RUNTIME_FILTER_MIN_SIZE=4KB;
 SET RUNTIME_BLOOM_FILTER_SIZE=4KB;
-# The min buffer size is set to 64KB for end to end tests. This query would
+# The min buffer size is set to 8KB for end to end tests. This query would
 # produce a 4KB filter if the min buffer size limit bound is not enforced.
 select STRAIGHT_JOIN count(*) from alltypes a join [SHUFFLE] alltypes b on a.id = b.id;
 ---- RESULTS
 7300
 ---- RUNTIME_PROFILE
 row_regex: .*1 of 1 Runtime Filter Published.*
-row_regex: .*Filter 0 \(64.00 KB\).*
+row_regex: .*Filter 0 \(8.00 KB\).*
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/62d8462e/tests/query_test/test_mem_usage_scaling.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_mem_usage_scaling.py b/tests/query_test/test_mem_usage_scaling.py
index 3a9f5ee..ddd9e2d 100644
--- a/tests/query_test/test_mem_usage_scaling.py
+++ b/tests/query_test/test_mem_usage_scaling.py
@@ -19,10 +19,12 @@ import pytest
 from copy import copy
 
 from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_cluster import ImpalaCluster
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfLocal
 from tests.common.test_dimensions import create_single_exec_option_dimension
 from tests.common.test_vector import ImpalaTestDimension
+from tests.verifiers.metric_verifier import MetricVerifier
 
 # Substrings of the expected error messages when the mem limit is too low
 MEM_LIMIT_EXCEEDED_MSG = "Memory limit exceeded"
@@ -90,7 +92,6 @@ class TestExprMemUsage(ImpalaTestSuite):
       "select count(*) from lineitem where lower(l_comment) = 'hello'", exec_options,
       table_format=vector.get_value('table_format'))
 
-
 class TestLowMemoryLimits(ImpalaTestSuite):
   '''Super class for the memory limit tests with the TPC-H and TPC-DS queries'''
 
@@ -218,6 +219,14 @@ class TestTpchMemLimitError(TestLowMemoryLimits):
   def test_low_mem_limit_q22(self, vector):
     self.low_memory_limit_test(vector, 'tpch-q22', self.MIN_MEM_FOR_TPCH['Q22'])
 
+  @pytest.mark.execute_serially
+  def test_low_mem_limit_no_fragments(self, vector):
+    self.low_memory_limit_test(vector, 'tpch-q14', self.MIN_MEM_FOR_TPCH['Q14'])
+    self.low_memory_limit_test(vector, 'tpch-q18', self.MIN_MEM_FOR_TPCH['Q18'])
+    self.low_memory_limit_test(vector, 'tpch-q20', self.MIN_MEM_FOR_TPCH['Q20'])
+    for impalad in ImpalaCluster().impalads:
+      verifier = MetricVerifier(impalad.service)
+      verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0)
 
 class TestTpchPrimitivesMemLimitError(TestLowMemoryLimits):
   """