You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/20 00:52:10 UTC

[1/4] impala git commit: [DOCS] Fix in REPLICA_PREFERENCE numeric options

Repository: impala
Updated Branches:
  refs/heads/2.x 3a386d975 -> b961de23d


[DOCS] Fix in REPLICA_PREFERENCE numeric options

Change-Id: Ia10e69ac38229e0969db11b7edbcf08c2444602b
Reviewed-on: http://gerrit.cloudera.org:8080/9341
Reviewed-by: John Russell <jr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/bb6cd1d4
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/bb6cd1d4
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/bb6cd1d4

Branch: refs/heads/2.x
Commit: bb6cd1d41b6480dc1ed6eba426a6696aa55ad951
Parents: 473f93f
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Feb 15 14:18:35 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 17 00:54:57 2018 +0000

----------------------------------------------------------------------
 docs/topics/impala_replica_preference.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/bb6cd1d4/docs/topics/impala_replica_preference.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_replica_preference.xml b/docs/topics/impala_replica_preference.xml
index 610bd35..45a5dbd 100644
--- a/docs/topics/impala_replica_preference.xml
+++ b/docs/topics/impala_replica_preference.xml
@@ -45,7 +45,7 @@ under the License.
     </p>
 
     <p>
-      <b>Type:</b> numeric (0, 3, 5)
+      <b>Type:</b> numeric (0, 2, 4)
       or corresponding mnemonic strings (<codeph>CACHE_LOCAL</codeph>, <codeph>DISK_LOCAL</codeph>, <codeph>REMOTE</codeph>).
       The gaps in the numeric sequence are to accomodate other intermediate
       values that might be added in the future.


[2/4] impala git commit: [DOCS] Typos fixed in Impala Analytic Functions doc

Posted by ta...@apache.org.
[DOCS] Typos fixed in Impala Analytic Functions doc

Change-Id: Iec4a2822f5e066574e64bf025d300e4cde7a7d29
Reviewed-on: http://gerrit.cloudera.org:8080/9347
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-by: John Russell <jr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/473f93f0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/473f93f0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/473f93f0

Branch: refs/heads/2.x
Commit: 473f93f0c48f76c54130fa7175845efd32044cc9
Parents: 3a386d9
Author: Alex Rodoni <ar...@cloudera.com>
Authored: Thu Feb 15 16:16:37 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 17 00:54:57 2018 +0000

----------------------------------------------------------------------
 docs/shared/impala_common.xml             | 2 +-
 docs/topics/impala_analytic_functions.xml | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/473f93f0/docs/shared/impala_common.xml
----------------------------------------------------------------------
diff --git a/docs/shared/impala_common.xml b/docs/shared/impala_common.xml
index 410fa9c..a052879 100644
--- a/docs/shared/impala_common.xml
+++ b/docs/shared/impala_common.xml
@@ -2602,7 +2602,7 @@ flight_num:           INT32 SNAPPY DO:83456393 FPO:83488603 SZ:10216514/11474301
       <p rev="" id="analytic_partition_pruning_caveat">
         In queries involving both analytic functions and partitioned tables, partition pruning only occurs for columns named in the <codeph>PARTITION BY</codeph>
         clause of the analytic function call. For example, if an analytic function query has a clause such as <codeph>WHERE year=2016</codeph>,
-        the way to make the query prune all other <codeph>YEAR</codeph> partitions is to include <codeph>PARTITION BY year</codeph>in the analytic function call;
+        the way to make the query prune all other <codeph>YEAR</codeph> partitions is to include <codeph>PARTITION BY year</codeph> in the analytic function call;
         for example, <codeph>OVER (PARTITION BY year,<varname>other_columns</varname> <varname>other_analytic_clauses</varname>)</codeph>.
 <!--
         These examples illustrate the technique:

http://git-wip-us.apache.org/repos/asf/impala/blob/473f93f0/docs/topics/impala_analytic_functions.xml
----------------------------------------------------------------------
diff --git a/docs/topics/impala_analytic_functions.xml b/docs/topics/impala_analytic_functions.xml
index bdaacca..39d4fc5 100644
--- a/docs/topics/impala_analytic_functions.xml
+++ b/docs/topics/impala_analytic_functions.xml
@@ -781,7 +781,7 @@ order by kind, ordering desc, name;
 
       <p>
         Partitioning by the <codeph>X</codeph> column groups all the duplicate numbers together and returns the
-        place each each value within the group; because each value occurs only 1 or 2 times,
+        place each value within the group; because each value occurs only 1 or 2 times,
         <codeph>DENSE_RANK()</codeph> designates each <codeph>X</codeph> value as either first or second within its
         group.
       </p>
@@ -1569,7 +1569,7 @@ insert into animals values ('Fire-breathing dragon', 'Mythical', NULL);
 
       <p>
         Partitioning by the <codeph>X</codeph> column groups all the duplicate numbers together and returns the
-        place each each value within the group; because each value occurs only 1 or 2 times,
+        place each value within the group; because each value occurs only 1 or 2 times,
         <codeph>RANK()</codeph> designates each <codeph>X</codeph> value as either first or second within its
         group.
       </p>


[3/4] impala git commit: IMPALA-6526: Fix spilling test for running on local FS

Posted by ta...@apache.org.
IMPALA-6526: Fix spilling test for running on local FS

One of the spilling test was failing because its minimum bufferpool
mem requirement was more when ran on local FS as compared to when
it is run on HDFS.
The fix is to increase the bufferpool limit to a value just above
the min limit so that it still forces spill to disk on both filesystems.

Testing:
Ran core tests with local FS as target file system. Made sure the
failing test passed.

Change-Id: I50648d7936007a26891cf64d6343c47d9d646596
Reviewed-on: http://gerrit.cloudera.org:8080/9354
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9a389027
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9a389027
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9a389027

Branch: refs/heads/2.x
Commit: 9a38902729e22e0df6ee42c7d7a861d378b71906
Parents: bb6cd1d
Author: Bikramjeet Vig <bi...@cloudera.com>
Authored: Thu Feb 15 12:03:49 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 17 04:32:07 2018 +0000

----------------------------------------------------------------------
 .../workloads/functional-query/queries/QueryTest/spilling.test     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9a389027/testdata/workloads/functional-query/queries/QueryTest/spilling.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
index 3bfe0af..6639d82 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test
@@ -60,7 +60,7 @@ row_regex: .*SpilledPartitions: 0 .*
 # Adding TPCH-Q21 in the spilling test to check for IMPALA-1471 (spilling left anti
 # and left outer joins were returning wrong results).
 # Q21 - Suppliers Who Kept Orders Waiting Query
-set buffer_pool_limit=23m;
+set buffer_pool_limit=26m;
 select
   s_name,
   count(*) as numwait


[4/4] impala git commit: IMPALA-6116: Bound memory usage of DataStreamSevice's service queue

Posted by ta...@apache.org.
IMPALA-6116: Bound memory usage of DataStreamSevice's service queue

The fix for IMPALA-6193 added a memory tracker for the memory consumed
by the payloads in the service queue of DataStreamService. This change
extends it by introducing a bound on the memory usage for that service
queue. In addition, it deprecates FLAGS_datastream_service_queue_depth
and replaces it with FLAGS_datastream_service_queue_mem_limit. These flags
only take effect when KRPC is in use and KRPC was never enabled in any
previous releases so it seems safe to do this flag replacement. The new
flag FLAGS_datastream_service_queue_mem_limit directly dictates the amount
of memory which can be consumed by the service queue of DataStreamService.
This allows a more direct control over the memory usage of the queue instead
of inferring via the number of entries in the queue. The default value of
this flag is left at 0, in which case it will be set to 5% of process
memory limit.

Testing done: exhaustive debug builds. Updated data-stream-test to
exercise the case in which the payload is larger than the limit.

Change-Id: Idea4262dfb0e0aa8d58ff6ea6a8aaaa248e880b9
Reviewed-on: http://gerrit.cloudera.org:8080/9282
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b961de23
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b961de23
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b961de23

Branch: refs/heads/2.x
Commit: b961de23da039b599fb94dddcad1f304ed29eb00
Parents: 9a38902
Author: Michael Ho <kw...@cloudera.com>
Authored: Sat Feb 3 00:05:09 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 17 23:10:15 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc           |  48 ++++++----
 be/src/rpc/impala-service-pool.h            |  62 ++++++++-----
 be/src/rpc/rpc-mgr-test-base.h              |  48 ++++++----
 be/src/rpc/rpc-mgr-test.cc                  |  15 ++--
 be/src/rpc/rpc-mgr.cc                       |   7 +-
 be/src/rpc/rpc-mgr.h                        |   5 +-
 be/src/runtime/data-stream-test.cc          | 110 +++++++++++++++--------
 be/src/runtime/exec-env.cc                  |  31 ++-----
 be/src/runtime/exec-env.h                   |   3 +
 be/src/runtime/krpc-data-stream-mgr.cc      |  37 ++++----
 be/src/runtime/krpc-data-stream-mgr.h       |  25 +++---
 be/src/runtime/mem-tracker.h                |   1 +
 be/src/service/data-stream-service.cc       |  33 ++++++-
 be/src/service/data-stream-service.h        |  15 +++-
 tests/custom_cluster/test_krpc_mem_usage.py |   4 +-
 15 files changed, 282 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 34a3960..35a5d6d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -47,15 +47,14 @@ METRIC_DEFINE_histogram(server, impala_unused,
 
 namespace impala {
 
-ImpalaServicePool::ImpalaServicePool(MemTracker* mem_tracker,
-                         std::unique_ptr<kudu::rpc::ServiceIf> service,
-                         const scoped_refptr<kudu::MetricEntity>& entity,
-                         size_t service_queue_length)
-  : mem_tracker_(mem_tracker),
-    service_(std::move(service)),
+ImpalaServicePool::ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
+    size_t service_queue_length, kudu::rpc::ServiceIf* service,
+    MemTracker* service_mem_tracker)
+  : service_mem_tracker_(service_mem_tracker),
+    service_(service),
     service_queue_(service_queue_length),
     unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
-  DCHECK(mem_tracker_ != nullptr);
+  DCHECK(service_mem_tracker_ != nullptr);
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -114,7 +113,7 @@ void ImpalaServicePool::FailAndReleaseRpc(
     const kudu::Status& status, kudu::rpc::InboundCall* call) {
   int64_t transfer_size = call->GetTransferSize();
   call->RespondFailure(error_code, status);
-  mem_tracker_->Release(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
 }
 
 kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(
@@ -143,20 +142,39 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
 
   TRACE_TO(c->trace(), "Inserting onto call queue"); // NOLINT(*)
 
-  // Queue message on service queue
-  mem_tracker_->Consume(c->GetTransferSize());
+  // Queue message on service queue.
+  const int64_t transfer_size = c->GetTransferSize();
+  {
+    // Drops an incoming request if consumption already exceeded the limit. Note that
+    // the current inbound call isn't counted towards the limit yet so adding this call
+    // may cause the MemTracker's limit to be exceeded. This is done to ensure fairness
+    // among all inbound calls, otherwise calls with larger payloads are more likely to
+    // fail. The check and the consumption need to be atomic so as to bound the memory
+    // usage.
+    unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
+    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded())) {
+      // Discards the transfer early so the transfer size drops to 0. This is to ensure
+      // the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't
+      // called MemTracker::Consume() at this point.
+      mem_tracker_lock.unlock();
+      c->DiscardTransfer();
+      RejectTooBusy(c);
+      return kudu::Status::OK();
+    }
+    service_mem_tracker_->Consume(transfer_size);
+  }
+
   boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
-  if (queue_status == kudu::rpc::QueueStatus::QUEUE_FULL) {
+  if (UNLIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_FULL)) {
     RejectTooBusy(c);
     return kudu::Status::OK();
   }
-
-  if (PREDICT_FALSE(evicted != boost::none)) {
+  if (UNLIKELY(evicted != boost::none)) {
     RejectTooBusy(*evicted);
   }
 
-  if (PREDICT_TRUE(queue_status == kudu::rpc::QueueStatus::QUEUE_SUCCESS)) {
+  if (LIKELY(queue_status == kudu::rpc::QueueStatus::QUEUE_SUCCESS)) {
     // NB: do not do anything with 'c' after it is successfully queued --
     // a service thread may have already dequeued it, processed it, and
     // responded by this point, in which case the pointer would be invalid.
@@ -187,7 +205,7 @@ void ImpalaServicePool::RunThread() {
     incoming->RecordHandlingStarted(unused_histogram_);
     ADOPT_TRACE(incoming->trace());
 
-    if (PREDICT_FALSE(incoming->ClientTimedOut())) {
+    if (UNLIKELY(incoming->ClientTimedOut())) {
       TRACE_TO(incoming->trace(), "Skipping call since client already timed out"); // NOLINT(*)
       rpcs_timed_out_in_queue_.Add(1);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index fe70686..624e937 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -28,25 +28,33 @@
 #include "kudu/rpc/service_queue.h"
 #include "kudu/util/status.h"
 #include "util/histogram-metric.h"
+#include "util/spinlock.h"
 #include "util/thread.h"
 
 namespace impala {
 class MemTracker;
 
-// A pool of threads that handle new incoming RPC calls.
-// Also includes a queue that calls get pushed onto for handling by the pool.
+/// A pool of threads that handle new incoming RPC calls.
+/// Also includes a queue that calls get pushed onto for handling by the pool.
 class ImpalaServicePool : public kudu::rpc::RpcService {
  public:
-  ImpalaServicePool(MemTracker* mem_tracker,
-      std::unique_ptr<kudu::rpc::ServiceIf> service,
-      const scoped_refptr<kudu::MetricEntity>& metric_entity,
-      size_t service_queue_length);
+  /// 'service_queue_length' is the maximum number of requests that may be queued for
+  /// this service before clients begin to see rejection errors.
+  ///
+  /// 'service' contains an interface implementation that will handle RPCs.
+  ///
+  /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of RPC
+  /// payloads in the service queue.
+  ImpalaServicePool(const scoped_refptr<kudu::MetricEntity>& entity,
+      size_t service_queue_length, kudu::rpc::ServiceIf* service,
+      MemTracker* service_mem_tracker);
+
   virtual ~ImpalaServicePool();
 
-  // Start up the thread pool.
+  /// Start up the thread pool.
   virtual Status Init(int num_threads);
 
-  // Shut down the queue and the thread pool.
+  /// Shut down the queue and the thread pool.
   virtual void Shutdown();
 
   kudu::rpc::RpcMethodInfo* LookupMethod(const kudu::rpc::RemoteMethod& method) override;
@@ -60,31 +68,41 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
 
-  // Respond with failure to the incoming call in 'call' with 'error_code' and 'status'
-  // and release the payload memory from 'mem_tracker_'. Takes ownership of 'call'.
+  /// Respond with failure to the incoming call in 'call' with 'error_code' and 'status'
+  /// and release the payload memory from 'mem_tracker_'. Takes ownership of 'call'.
   void FailAndReleaseRpc(const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
       const kudu::Status& status, kudu::rpc::InboundCall* call);
 
-  // Tracks memory of inbound calls in 'service_queue_'.
-  MemTracker* const mem_tracker_;
+  /// Synchronizes accesses to 'service_mem_tracker_' to avoid over consumption.
+  SpinLock mem_tracker_lock_;
+
+  /// Tracks memory of inbound calls in 'service_queue_'.
+  MemTracker* const service_mem_tracker_;
+
+  /// Reference to the implementation of the RPC handlers. Not owned.
+  kudu::rpc::ServiceIf* const service_;
 
-  std::unique_ptr<kudu::rpc::ServiceIf> service_;
-  std::vector<std::unique_ptr<Thread> > threads_;
+  /// The set of service threads started to process incoming RPC calls.
+  std::vector<std::unique_ptr<Thread>> threads_;
+
+  /// The pending RPCs to be dequeued by the service threads.
   kudu::rpc::LifoServiceQueue service_queue_;
 
-  // TODO: Display these metrics in the debug webpage. IMPALA-6269
-  // Number of RPCs that timed out while waiting in the service queue.
+  /// TODO: Display these metrics in the debug webpage. IMPALA-6269
+  /// Number of RPCs that timed out while waiting in the service queue.
   AtomicInt32 rpcs_timed_out_in_queue_;
-  // Number of RPCs that were rejected due to the queue being full.
+
+  /// Number of RPCs that were rejected due to the queue being full.
   AtomicInt32 rpcs_queue_overflow_;
 
-  // Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
-  // appropriate internal KRPC state. Unused otherwise.
-  // TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
+  /// Dummy histogram needed to call InboundCall::RecordHandlingStarted() to set
+  /// appropriate internal KRPC state. Unused otherwise.
+  /// TODO: Consider displaying this histogram in the debug webpage. IMPALA-6269
   scoped_refptr<kudu::Histogram> unused_histogram_;
 
-  // Protects against concurrent Shutdown() operations.
-  // TODO: This seems implausible given our current usage pattern. Consider removing lock.
+  /// Protects against concurrent Shutdown() operations.
+  /// TODO: This seems implausible given our current usage pattern.
+  /// Consider removing lock.
   boost::mutex shutdown_lock_;
   bool closing_ = false;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr-test-base.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test-base.h b/be/src/rpc/rpc-mgr-test-base.h
index 43b6d83..4a79040 100644
--- a/be/src/rpc/rpc-mgr-test-base.h
+++ b/be/src/rpc/rpc-mgr-test-base.h
@@ -130,7 +130,12 @@ template <class T> class RpcMgrTestBase : public T {
     request->set_sidecar_idx(idx);
   }
 
-  MemTracker* service_tracker() { return &service_tracker_; }
+  // Takes over ownership of the newly created 'service' which needs to have a lifetime
+  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
+  ServiceIf* TakeOverService(std::unique_ptr<ServiceIf> service) {
+    services_.emplace_back(move(service));
+    return services_.back().get();
+  }
 
  protected:
   TNetworkAddress krpc_address_;
@@ -149,7 +154,9 @@ template <class T> class RpcMgrTestBase : public T {
 
  private:
   int32_t payload_[PAYLOAD_SIZE];
-  MemTracker service_tracker_;
+
+  // Own all the services used by the test.
+  std::vector<std::unique_ptr<ServiceIf>> services_;
 };
 
 typedef std::function<void(RpcContext*)> ServiceCB;
@@ -158,28 +165,30 @@ class PingServiceImpl : public PingServiceIf {
  public:
   // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
   PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
+    : PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
 
   virtual void Ping(
       const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
     response->set_int_response(42);
     // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
+    mem_tracker_.Release(context->GetTransferSize());
     cb_(context);
   }
 
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
  private:
-  MemTracker* mem_tracker_;
+  MemTracker mem_tracker_;
   ServiceCB cb_;
 };
 
 class ScanMemServiceImpl : public ScanMemServiceIf {
  public:
   ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
-    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(-1, "ScanMem Service") {
   }
 
   // The request comes with an int 'pattern' and a payload of int array sent with
@@ -197,36 +206,39 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
       int32_t val = v[i];
       if (val != pattern) {
         // Incoming requests will already be tracked and we need to release the memory.
-        mem_tracker_->Release(context->GetTransferSize());
+        mem_tracker_.Release(context->GetTransferSize());
         context->RespondFailure(kudu::Status::Corruption(
             Substitute("Expecting $1; Found $2", pattern, val)));
         return;
       }
     }
     // Incoming requests will already be tracked and we need to release the memory.
-    mem_tracker_->Release(context->GetTransferSize());
+    mem_tracker_.Release(context->GetTransferSize());
     context->RespondSuccess();
   }
 
+  MemTracker* mem_tracker() { return &mem_tracker_; }
+
  private:
-  MemTracker* mem_tracker_;
+  MemTracker mem_tracker_;
 
 };
 
 template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
-  MemTracker* mem_tracker = test_base->service_tracker();
   // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
+  ServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
+      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
-      rpc_mgr->result_tracker(), mem_tracker));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
+  ServiceIf* scan_mem_impl = test_base->TakeOverService(make_unique<ScanMemServiceImpl>(
+      rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index cd24672..8d5312f 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -178,12 +178,12 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  unique_ptr<ServiceIf> impl(new PingServiceImpl(rpc_mgr_.metric_entity(),
-      rpc_mgr_.result_tracker(), service_tracker(), slow_cb));
+  ServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
+      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
-  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl),
-      service_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
+      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -204,9 +204,10 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr_.metric_entity(),
-      rpc_mgr_.result_tracker(), service_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl), service_tracker()));
+  ServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
+      rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
+      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
   ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index 44ecc02..d723280 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -120,12 +120,11 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-    unique_ptr<ServiceIf> service_ptr, MemTracker* mem_tracker) {
+    ServiceIf* service_ptr, MemTracker* service_mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
-  scoped_refptr<ImpalaServicePool> service_pool =
-      new ImpalaServicePool(mem_tracker, std::move(service_ptr),
-          messenger_->metric_entity(), service_queue_depth);
+  scoped_refptr<ImpalaServicePool> service_pool = new ImpalaServicePool(
+      messenger_->metric_entity(), service_queue_depth, service_ptr, service_mem_tracker);
   // Start the thread pool first before registering the service in case the startup fails.
   RETURN_IF_ERROR(service_pool->Init(num_service_threads));
   KUDU_RETURN_IF_ERROR(

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index fc74c2e..e87b559 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -122,9 +122,12 @@ class RpcMgr {
   /// the service name has to be unique within an Impala instance or the registration will
   /// fail.
   ///
+  /// 'service_mem_tracker' is the MemTracker for tracking the memory usage of RPC
+  /// payloads in the service queue.
+  ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr, MemTracker* mem_tracker)
+      kudu::rpc::ServiceIf* service_ptr, MemTracker* service_mem_tracker)
       WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 'address', and places

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 75d5ac9..c540d1d 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -51,6 +51,7 @@
 #include "util/thread.h"
 #include "util/time.h"
 #include "util/mem-info.h"
+#include "util/parse-util.h"
 #include "util/test-info.h"
 #include "util/tuple-row-compare.h"
 #include "gen-cpp/data_stream_service.pb.h"
@@ -61,6 +62,7 @@
 #include "service/fe-support.h"
 
 #include <iostream>
+#include <string>
 #include <unistd.h>
 
 #include "common/names.h"
@@ -78,6 +80,7 @@ DEFINE_int32(port, 20001, "port on which to run Impala Thrift based test backend
 DECLARE_int32(datastream_sender_timeout_ms);
 DECLARE_int32(datastream_service_num_deserialization_threads);
 DECLARE_int32(datastream_service_deserialization_queue_size);
+DECLARE_string(datastream_service_queue_mem_limit);
 
 DECLARE_bool(use_krpc);
 
@@ -90,7 +93,7 @@ static const int BATCH_CAPACITY = 100;  // rows
 static const int PER_ROW_DATA = 8;
 static const int TOTAL_DATA_SIZE = 8 * 1024;
 static const int NUM_BATCHES = TOTAL_DATA_SIZE / BATCH_CAPACITY / PER_ROW_DATA;
-
+static const int SHORT_SERVICE_QUEUE_MEM_LIMIT = 16;
 
 namespace impala {
 
@@ -133,9 +136,22 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
  public:
   ImpalaKRPCTestBackend(RpcMgr* rpc_mgr, KrpcDataStreamMgr* stream_mgr)
     : DataStreamServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
-      stream_mgr_(stream_mgr) {}
+      rpc_mgr_(rpc_mgr),
+      stream_mgr_(stream_mgr) {
+    MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+    bool is_percent;
+    int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
+        &is_percent, process_mem_tracker->limit());
+    mem_tracker_.reset(
+        new MemTracker(bytes_limit, "DataStream Test", process_mem_tracker));
+  }
+
   virtual ~ImpalaKRPCTestBackend() {}
 
+  Status Init() {
+    return rpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, this, mem_tracker());
+  }
+
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, RpcContext* rpc_context) {
     stream_mgr_->AddData(request, response, rpc_context);
@@ -146,8 +162,12 @@ class ImpalaKRPCTestBackend : public DataStreamServiceIf {
     stream_mgr_->CloseSender(request, response, rpc_context);
   }
 
+  MemTracker* mem_tracker() { return mem_tracker_.get(); }
+
  private:
+  RpcMgr* rpc_mgr_;
   KrpcDataStreamMgr* stream_mgr_;
+  unique_ptr<MemTracker> mem_tracker_;
 };
 
 template <class T> class DataStreamTestBase : public T {
@@ -161,7 +181,7 @@ enum KrpcSwitch {
   USE_KRPC
 };
 
-class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch> > {
+class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwitch>> {
  protected:
   DataStreamTest() : next_val_(0) {
 
@@ -188,7 +208,6 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     next_instance_id_.lo = 0;
     next_instance_id_.hi = 0;
     stream_mgr_ = ExecEnv::GetInstance()->stream_mgr();
-    if (GetParam() == USE_KRPC) krpc_mgr_ = ExecEnv::GetInstance()->rpc_mgr();
 
     broadcast_sink_.dest_node_id = DEST_NODE_ID;
     broadcast_sink_.output_partition.type = TPartitionType::UNPARTITIONED;
@@ -219,6 +238,9 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     if (GetParam() == USE_THRIFT) {
       StartThriftBackend();
     } else {
+      IpAddr ip;
+      ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
+      krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
       StartKrpcBackend();
     }
   }
@@ -281,12 +303,14 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   int64_t* tuple_mem_;
 
   // Only used for KRPC. Not owned.
-  RpcMgr* krpc_mgr_ = nullptr;
   TNetworkAddress krpc_address_;
 
+  // The test service implementation. Owned by this class.
+  unique_ptr<ImpalaKRPCTestBackend> test_service_;
+
   // receiving node
   DataStreamMgrBase* stream_mgr_ = nullptr;
-  ThriftServer* server_;
+  ThriftServer* server_ = nullptr;
 
   // sending node(s)
   TDataStreamSink broadcast_sink_;
@@ -387,8 +411,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   RowBatch* CreateRowBatch() {
     RowBatch* batch = new RowBatch(row_desc_, BATCH_CAPACITY, &tracker_);
     int64_t* tuple_mem = reinterpret_cast<int64_t*>(
-        batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8));
-    bzero(tuple_mem, BATCH_CAPACITY * 8);
+        batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * PER_ROW_DATA));
+    bzero(tuple_mem, BATCH_CAPACITY * PER_ROW_DATA);
     for (int i = 0; i < BATCH_CAPACITY; ++i) {
       int idx = batch->AddRow();
       TupleRow* row = batch->GetRow(idx);
@@ -529,7 +553,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     // Dynamic cast stream_mgr_ which is of type DataStreamMgrBase to derived type
     // DataStreamMgr, since ImpalaThriftTestBackend() accepts only DataStreamMgr*.
     boost::shared_ptr<ImpalaThriftTestBackend> handler(
-        new ImpalaThriftTestBackend(dynamic_cast<DataStreamMgr*>(stream_mgr_)));
+        new ImpalaThriftTestBackend(ExecEnv::GetInstance()->ThriftStreamMgr()));
     boost::shared_ptr<TProcessor> processor(new ImpalaInternalServiceProcessor(handler));
     ThriftServerBuilder builder("DataStreamTest backend", processor, FLAGS_port);
     ASSERT_OK(builder.Build(&server_));
@@ -537,26 +561,13 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StartKrpcBackend() {
-    IpAddr ip;
-    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
-    krpc_address_ = MakeNetworkAddress(ip, FLAGS_port);
-
-    MemTracker* data_svc_tracker = obj_pool_.Add(
-        new MemTracker(-1, "Data Stream Service",
-            ExecEnv::GetInstance()->process_mem_tracker()));
-    MemTracker* stream_mgr_tracker = obj_pool_.Add(
-        new MemTracker(-1, "Data Stream Queued RPC Calls",
-            ExecEnv::GetInstance()->process_mem_tracker()));
-
-    KrpcDataStreamMgr* stream_mgr_ref = dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_);
-    ASSERT_OK(stream_mgr_ref->Init(stream_mgr_tracker, data_svc_tracker));
-    ASSERT_OK(krpc_mgr_->Init());
-
-    unique_ptr<ServiceIf> handler(
-        new ImpalaKRPCTestBackend(krpc_mgr_, stream_mgr_ref));
-    ASSERT_OK(krpc_mgr_->RegisterService(CpuInfo::num_cores(), 1024, move(handler),
-        data_svc_tracker));
-    ASSERT_OK(krpc_mgr_->StartServices(krpc_address_));
+    RpcMgr* rpc_mgr = ExecEnv::GetInstance()->rpc_mgr();
+    KrpcDataStreamMgr* krpc_stream_mgr = ExecEnv::GetInstance()->KrpcStreamMgr();
+    ASSERT_OK(rpc_mgr->Init());
+    test_service_.reset(new ImpalaKRPCTestBackend(rpc_mgr, krpc_stream_mgr));
+    ASSERT_OK(test_service_->Init());
+    ASSERT_OK(krpc_stream_mgr->Init(test_service_->mem_tracker()));
+    ASSERT_OK(rpc_mgr->StartServices(krpc_address_));
   }
 
   void StopThriftBackend() {
@@ -566,7 +577,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 
   void StopKrpcBackend() {
-    krpc_mgr_->Shutdown();
+    ExecEnv::GetInstance()->rpc_mgr()->Shutdown();
   }
 
   void StartSender(TPartitionType::type partition_type = TPartitionType::UNPARTITIONED,
@@ -660,7 +671,7 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
   }
 };
 
-// We use a seperate class for tests that are required to be run against Thrift only.
+// A seperate class for tests that are required to be run against Thrift only.
 class DataStreamTestThriftOnly : public DataStreamTest {
  protected:
   virtual void SetUp() {
@@ -672,10 +683,9 @@ class DataStreamTestThriftOnly : public DataStreamTest {
   }
 };
 
-// We need a seperate test class for IMPALA-6346, since we need to do some pre-SetUp()
-// work. Specifically we need to set 2 flags that will be picked up during the SetUp()
-// phase of the DataStreamTest class.
-class DataStreamTestForImpala6346 : public DataStreamTest {
+// A seperate test class which simulates the behavior in which deserialization queue
+// fills up and all deserialization threads are busy.
+class DataStreamTestShortDeserQueue : public DataStreamTest {
  protected:
   virtual void SetUp() {
     FLAGS_datastream_service_num_deserialization_threads = 1;
@@ -688,13 +698,31 @@ class DataStreamTestForImpala6346 : public DataStreamTest {
   }
 };
 
+// A separate test class which simulates that the service queue fills up.
+class DataStreamTestShortServiceQueue : public DataStreamTest {
+ protected:
+  virtual void SetUp() {
+    // Set the memory limit to very low to make the soft limit easy to surpass.
+    FLAGS_datastream_service_queue_mem_limit =
+        std::to_string(SHORT_SERVICE_QUEUE_MEM_LIMIT);
+    DataStreamTest::SetUp();
+  }
+
+  virtual void TearDown() {
+    DataStreamTest::TearDown();
+  }
+};
+
 INSTANTIATE_TEST_CASE_P(ThriftOrKrpc, DataStreamTest,
     ::testing::Values(USE_THRIFT, USE_KRPC));
 
 INSTANTIATE_TEST_CASE_P(ThriftOnly, DataStreamTestThriftOnly,
     ::testing::Values(USE_THRIFT));
 
-INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestForImpala6346,
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortDeserQueue,
+    ::testing::Values(USE_KRPC));
+
+INSTANTIATE_TEST_CASE_P(KrpcOnly, DataStreamTestShortServiceQueue,
     ::testing::Values(USE_KRPC));
 
 TEST_P(DataStreamTest, UnknownSenderSmallResult) {
@@ -813,7 +841,7 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
 // already being deserialized will be waiting on the KrpcDataStreamMgr::lock_ as well.
 // But the first thread will never release the lock since it's stuck on Offer(), causing
 // a deadlock. This is fixed with IMPALA-6346.
-TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
+TEST_P(DataStreamTestShortDeserQueue, TestNoDeadlock) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
 
@@ -834,7 +862,7 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
       4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
-      &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, &info);
+      &DataStreamTestShortDeserQueue_TestNoDeadlock_Test::ReadStream, this, &info);
 
   JoinSenders();
   CheckSenders();
@@ -844,6 +872,12 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   CheckReceivers(TPartitionType::UNPARTITIONED, 4);
 }
 
+// Test that payloads larger than the service queue's soft mem limit can be transmitted.
+TEST_P(DataStreamTestShortServiceQueue, TestLargePayload) {
+  TestStream(
+      TPartitionType::UNPARTITIONED, 4, 1, SHORT_SERVICE_QUEUE_MEM_LIMIT * 2, false);
+}
+
 // TODO: more tests:
 // - test case for transmission error in last batch
 // - receivers getting created concurrently

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1c3ab7a..17b7bec 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -85,11 +85,6 @@ DEFINE_bool_hidden(use_krpc, false, "Used to indicate whether to use KRPC for th
     "DataStream subsystem, or the Thrift RPC layer instead. Defaults to false. "
     "KRPC not yet supported");
 
-DEFINE_int32(datastream_service_queue_depth, 1024, "Size of datastream service queue");
-DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of datastream service "
-    "processing threads. If left at default value 0, it will be set to number of CPU "
-    "cores.");
-
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
@@ -179,7 +174,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port,
 
   if (FLAGS_use_krpc) {
     VLOG_QUERY << "Using KRPC.";
-    // KRPC relies on resolved IP address. It's set in StartServices().
+    // KRPC relies on resolved IP address. It's set in Init().
     krpc_address_.__set_port(krpc_port);
     rpc_mgr_.reset(new RpcMgr(IsInternalTlsConfigured()));
     stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get()));
@@ -318,27 +313,15 @@ Status ExecEnv::Init() {
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
 
-  // Initialize the RPCMgr before allowing services registration.
+  // Initializes the RPCMgr and DataStreamServices.
   if (FLAGS_use_krpc) {
     krpc_address_.__set_hostname(ip_address_);
+    // Initialization needs to happen in the following order due to dependencies:
+    // - RPC manager, DataStreamService and DataStreamManager.
     RETURN_IF_ERROR(rpc_mgr_->Init());
-
-    // Add a MemTracker for memory used to store incoming calls before they handed over to
-    // the data stream manager.
-    MemTracker* data_svc_tracker = obj_pool_->Add(
-        new MemTracker(-1, "Data Stream Service", mem_tracker_.get()));
-
-    // Add a MemTracker for the data stream manager, which uses it to track memory used by
-    // deferred RPC calls while they are buffered in the data stream manager.
-    MemTracker* stream_mgr_tracker = obj_pool_->Add(
-        new MemTracker(-1, "Data Stream Queued RPC Calls", mem_tracker_.get()));
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init(stream_mgr_tracker, data_svc_tracker));
-
-    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
-    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
-        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
-    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
-        FLAGS_datastream_service_queue_depth, move(data_svc), data_svc_tracker));
+    data_svc_.reset(new DataStreamService());
+    RETURN_IF_ERROR(data_svc_->Init());
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init(data_svc_->mem_tracker()));
     // Bump thread cache to 1GB to reduce contention for TCMalloc central
     // list's spinlock.
     if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 193fdde..cd07f9b 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -43,6 +43,7 @@ class BufferPool;
 class CallableThreadPool;
 class DataStreamMgrBase;
 class DataStreamMgr;
+class DataStreamService;
 class QueryExecMgr;
 class Frontend;
 class HBaseTableFactory;
@@ -133,6 +134,7 @@ class ExecEnv {
   CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); }
   QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); }
   RpcMgr* rpc_mgr() const { return rpc_mgr_.get(); }
+  DataStreamService* data_svc() const { return data_svc_.get(); }
   PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); }
   ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); }
   BufferPool* buffer_pool() { return buffer_pool_.get(); }
@@ -198,6 +200,7 @@ class ExecEnv {
   boost::scoped_ptr<CallableThreadPool> async_rpc_pool_;
   boost::scoped_ptr<QueryExecMgr> query_exec_mgr_;
   boost::scoped_ptr<RpcMgr> rpc_mgr_;
+  boost::scoped_ptr<DataStreamService> data_svc_;
 
   /// Query-wide buffer pool and the root reservation tracker for the pool. The
   /// reservation limit is equal to the maximum capacity of the pool. Created in

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 91111dc..4a9a91e 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -25,11 +25,13 @@
 #include "kudu/rpc/rpc_context.h"
 
 #include "exec/kudu-util.h"
+#include "runtime/exec-env.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
+#include "service/data-stream-service.h"
 #include "util/debug-util.h"
 #include "util/periodic-counter-updater.h"
 #include "util/runtime-profile-counters.h"
@@ -75,10 +77,12 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 
-Status KrpcDataStreamMgr::Init(MemTracker* mem_tracker,
-    MemTracker* incoming_request_tracker) {
-  mem_tracker_ = mem_tracker;
-  incoming_request_tracker_ = incoming_request_tracker;
+Status KrpcDataStreamMgr::Init(MemTracker* service_mem_tracker) {
+  // MemTracker for tracking memory used for buffering deferred RPC calls which
+  // arrive before the receiver is ready.
+  mem_tracker_.reset(new MemTracker(-1, "Data Stream Manager Deferred RPCs",
+      ExecEnv::GetInstance()->process_mem_tracker()));
+  service_mem_tracker_ = service_mem_tracker;
   RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
       [this](){ this->Maintenance(); }, &maintenance_thread_));
   RETURN_IF_ERROR(deserialize_pool_.Init());
@@ -134,7 +138,8 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
   for (const unique_ptr<EndDataStreamCtx>& ctx :
       early_senders_for_recvr.closed_sender_ctxs) {
     recvr->RemoveSender(ctx->request->sender_id());
-    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, mem_tracker_);
+    RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context,
+        mem_tracker_.get());
     num_senders_waiting_->Increment(-1);
   }
   return recvr;
@@ -166,10 +171,9 @@ shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::FindRecvr(
 void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
     const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
-  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
-  incoming_request_tracker_->ReleaseLocal(
-      rpc_context->GetTransferSize(), mem_tracker_->parent());
-  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
+  const int64_t transfer_size = rpc_context->GetTransferSize();
+  mem_tracker_->Consume(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
@@ -180,10 +184,9 @@ void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
 void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
     const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
-  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
-  incoming_request_tracker_->ReleaseLocal(
-      rpc_context->GetTransferSize(), mem_tracker_->parent());
-  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
+  const int64_t transfer_size = rpc_context->GetTransferSize();
+  mem_tracker_->Consume(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
@@ -224,14 +227,14 @@ void KrpcDataStreamMgr::AddData(const TransmitDataRequestPB* request,
     // already closed deliberately, and there's no unexpected error here.
     ErrorMsg msg(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), dest_node_id);
     RespondAndReleaseRpc(Status::Expected(msg), response, rpc_context,
-        incoming_request_tracker_);
+        service_mem_tracker_);
     return;
   }
   DCHECK(recvr != nullptr);
   int64_t transfer_size = rpc_context->GetTransferSize();
   recvr->AddBatch(request, response, rpc_context);
   // Release memory. The receiver already tracks it in its instance tracker.
-  incoming_request_tracker_->Release(transfer_size);
+  service_mem_tracker_->Release(transfer_size);
 }
 
 void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
@@ -279,7 +282,7 @@ void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
   // If we reach this point, either the receiver is found or it has been unregistered
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
-  RespondAndReleaseRpc(Status::OK(), response, rpc_context, incoming_request_tracker_);
+  RespondAndReleaseRpc(Status::OK(), response, rpc_context, service_mem_tracker_);
 
   {
     // TODO: Move this to maintenance thread.
@@ -365,7 +368,7 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextTyp
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
   RespondAndReleaseRpc(Status::Expected(msg), ctx->response, ctx->rpc_context,
-      mem_tracker_);
+      mem_tracker_.get());
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 16c0b30..f4358ea 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -229,9 +229,11 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
-  /// Initialize the deserialization thread pool and create the maintenance thread.
+  /// Initializes the deserialization thread pool and creates the maintenance thread.
+  /// 'service_mem_tracker' is the DataStreamService's MemTracker for tracking memory
+  /// used for RPC payloads before being handed over to data stream manager / receiver.
   /// Return error status on failure. Return OK otherwise.
-  Status Init(MemTracker* mem_tracker, MemTracker* incoming_request_tracker);
+  Status Init(MemTracker* service_mem_tracker);
 
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
@@ -290,18 +292,19 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
 
  private:
   friend class KrpcDataStreamRecvr;
+  friend class DataStreamTest;
 
   /// MemTracker for memory used for transmit data requests before we hand them over to a
   /// specific receiver. Used only to track payloads of deferred RPCs (e.g. early
-  /// senders). Not owned.
-  MemTracker* mem_tracker_ = nullptr;
-
-  /// MemTracker which is used by the DataStreamService to track memory for incoming
-  /// requests. Memory for new incoming requests is initially tracked against this tracker
-  /// before the requests are handed over to the data stream manager. It is this class's
-  /// responsibility to release memory from this tracker and track it against its own
-  /// tracker (here: mem_tracker_). Not owned.
-  MemTracker* incoming_request_tracker_ = nullptr;
+  /// senders).
+  std::unique_ptr<MemTracker> mem_tracker_;
+
+  /// MemTracker used by the DataStreamService to track memory for incoming requests.
+  /// Memory for new incoming requests is initially tracked against this tracker before
+  /// the requests are handed over to the data stream manager / receiver. It is the
+  /// responsibility of data stream manager or receiver to release memory from the
+  /// service's tracker and track it in their own trackers. Not owned.
+  MemTracker* service_mem_tracker_ = nullptr;
 
   /// A task for the deserialization threads to work on. The fields identify
   /// the target receiver's sender queue.

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 4228288..10a3424 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -367,6 +367,7 @@ class MemTracker {
  private:
   friend class PoolMemTrackerRegistry;
 
+  /// Returns true if the current memory tracker's limit is exceeded.
   bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
 
   /// If consumption is higher than max_consumption, attempts to free memory by calling

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/service/data-stream-service.cc
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.cc b/be/src/service/data-stream-service.cc
index dcf0c1f..34682d4 100644
--- a/be/src/service/data-stream-service.cc
+++ b/be/src/service/data-stream-service.cc
@@ -17,6 +17,9 @@
 
 #include "service/data-stream-service.h"
 
+#include <climits>
+
+#include "common/constant-strings.h"
 #include "common/status.h"
 #include "exec/kudu-util.h"
 #include "kudu/rpc/rpc_context.h"
@@ -24,6 +27,7 @@
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/exec-env.h"
 #include "runtime/row-batch.h"
+#include "util/parse-util.h"
 #include "testutil/fault-injection-util.h"
 
 #include "gen-cpp/data_stream_service.pb.h"
@@ -32,10 +36,35 @@
 
 using kudu::rpc::RpcContext;
 
+static const string queue_limit_msg = "(Advanced) Limit on RPC payloads consumption for "
+    "DataStreamService. " + Substitute(MEM_UNITS_HELP_MSG, "the process memory limit");
+DEFINE_string(datastream_service_queue_mem_limit, "5%", queue_limit_msg.c_str());
+DEFINE_int32(datastream_service_num_svc_threads, 0, "Number of threads for processing "
+    "datastream services' RPCs. If left at default value 0, it will be set to number of "
+    "CPU cores");
+
 namespace impala {
 
-DataStreamService::DataStreamService(RpcMgr* mgr)
-  : DataStreamServiceIf(mgr->metric_entity(), mgr->result_tracker()) {}
+DataStreamService::DataStreamService()
+  : DataStreamServiceIf(ExecEnv::GetInstance()->rpc_mgr()->metric_entity(),
+        ExecEnv::GetInstance()->rpc_mgr()->result_tracker()) {
+  MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker();
+  bool is_percent;
+  int64_t bytes_limit = ParseUtil::ParseMemSpec(FLAGS_datastream_service_queue_mem_limit,
+      &is_percent, process_mem_tracker->limit());
+  mem_tracker_.reset(new MemTracker(
+      bytes_limit, "Data Stream Service Queue", process_mem_tracker));
+}
+
+Status DataStreamService::Init() {
+  int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+      FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+  // The maximum queue length is set to maximum 32-bit value. Its actual capacity is
+  // bound by memory consumption against 'mem_tracker_'.
+  RETURN_IF_ERROR(ExecEnv::GetInstance()->rpc_mgr()->RegisterService(num_svc_threads,
+      std::numeric_limits<int32_t>::max(), this, mem_tracker()));
+  return Status::OK();
+}
 
 void DataStreamService::EndDataStream(const EndDataStreamRequestPB* request,
     EndDataStreamResponsePB* response, RpcContext* rpc_context) {

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/be/src/service/data-stream-service.h
----------------------------------------------------------------------
diff --git a/be/src/service/data-stream-service.h b/be/src/service/data-stream-service.h
index 7f3c6e4..63a0bf7 100644
--- a/be/src/service/data-stream-service.h
+++ b/be/src/service/data-stream-service.h
@@ -20,6 +20,9 @@
 
 #include "gen-cpp/data_stream_service.service.h"
 
+#include "common/status.h"
+#include "runtime/mem-tracker.h"
+
 namespace kudu {
 namespace rpc {
 class RpcContext;
@@ -37,7 +40,11 @@ class RpcMgr;
 /// appropriate receivers.
 class DataStreamService : public DataStreamServiceIf {
  public:
-  DataStreamService(RpcMgr* rpc_mgr);
+  DataStreamService();
+
+  /// Initializes the service by registering it with the singleton RPC manager.
+  /// This mustn't be called until RPC manager has been initialized.
+  Status Init();
 
   /// Notifies the receiver to close the data stream specified in 'request'.
   /// The receiver replies to the client with a status serialized in 'response'.
@@ -48,6 +55,12 @@ class DataStreamService : public DataStreamServiceIf {
   /// The receiver replies to the client with a status serialized in 'response'.
   virtual void TransmitData(const TransmitDataRequestPB* request,
       TransmitDataResponsePB* response, kudu::rpc::RpcContext* context);
+
+  MemTracker* mem_tracker() { return mem_tracker_.get(); }
+
+ private:
+  /// Tracks the memory usage of the payloads in the service queue.
+  std::unique_ptr<MemTracker> mem_tracker_;
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/b961de23/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
index 07d2757..a145a7a 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -22,8 +22,8 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIf, SkipIfBuildType
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 
-DATA_STREAM_MGR_METRIC = "Data Stream Queued RPC Calls"
-DATA_STREAM_SVC_METRIC = "Data Stream Service"
+DATA_STREAM_MGR_METRIC = "Data Stream Manager Deferred RPCs"
+DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
 ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
 
 @SkipIf.not_krpc