You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2018/02/01 16:46:10 UTC

[4/4] impala git commit: IMPALA-6193: Track memory of incoming data streams

IMPALA-6193: Track memory of incoming data streams

This change adds memory tracking to incoming transmit data RPCs when
using KRPC. We track memory against a global tracker called "Data Stream
Service" until it is handed over to the stream manager. There we track
it in a global tracker called "Data Stream Queued RPC Calls" until a
receiver registers and takes over the early sender RPCs. Inside the
receiver, memory for deferred RPCs is tracked against the fragment
instance's memtracker until we unpack the batches and add them to the
row batch queue.

The DCHECK in MemTracker::Close() covers that all memory consumed by a
tracker gets release eventually. In addition to that, this change adds a
custom cluster test that makes sure that queued memory gets tracked by
inspecting the peak consumption of the new memtrackers.

Change-Id: I2df1204d2483313a8a18e5e3be6cec9e402614c4
Reviewed-on: http://gerrit.cloudera.org:8080/8914
Reviewed-by: Lars Volker <lv...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3bfda334
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3bfda334
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3bfda334

Branch: refs/heads/master
Commit: 3bfda3348740e0951cbf8f60cde70cc4d1391c5e
Parents: acfd169
Author: Lars Volker <lv...@cloudera.com>
Authored: Tue Jan 16 16:03:42 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 1 08:53:36 2018 +0000

----------------------------------------------------------------------
 be/src/rpc/impala-service-pool.cc           | 48 +++++++-----
 be/src/rpc/impala-service-pool.h            | 16 +++-
 be/src/rpc/rpc-mgr-test.cc                  | 49 ++++++++----
 be/src/rpc/rpc-mgr.cc                       |  4 +-
 be/src/rpc/rpc-mgr.h                        |  3 +-
 be/src/runtime/exec-env.cc                  | 46 +++++++----
 be/src/runtime/krpc-data-stream-mgr.cc      | 44 ++++++++---
 be/src/runtime/krpc-data-stream-mgr.h       | 23 +++++-
 be/src/runtime/krpc-data-stream-recvr.cc    | 25 ++++--
 be/src/runtime/mem-tracker.h                |  4 +-
 be/src/util/memory-metrics.h                |  2 +-
 common/protobuf/data_stream_service.proto   |  2 +-
 tests/custom_cluster/test_krpc_mem_usage.py | 98 ++++++++++++++++++++++++
 tests/verifiers/mem_usage_verifier.py       | 70 +++++++++++++++++
 14 files changed, 350 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index 3b1c02d..34a3960 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -34,6 +34,7 @@
 #include "kudu/util/metrics.h"
 #include "kudu/util/status.h"
 #include "kudu/util/trace.h"
+#include "runtime/mem-tracker.h"
 
 #include "common/names.h"
 #include "common/status.h"
@@ -46,13 +47,15 @@ METRIC_DEFINE_histogram(server, impala_unused,
 
 namespace impala {
 
-ImpalaServicePool::ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
+ImpalaServicePool::ImpalaServicePool(MemTracker* mem_tracker,
+                         std::unique_ptr<kudu::rpc::ServiceIf> service,
                          const scoped_refptr<kudu::MetricEntity>& entity,
                          size_t service_queue_length)
-  : service_(std::move(service)),
+  : mem_tracker_(mem_tracker),
+    service_(std::move(service)),
     service_queue_(service_queue_length),
     unused_histogram_(METRIC_impala_unused.Instantiate(entity)) {
-
+  DCHECK(mem_tracker_ != nullptr);
 }
 
 ImpalaServicePool::~ImpalaServicePool() {
@@ -84,8 +87,8 @@ void ImpalaServicePool::Shutdown() {
   kudu::Status status = kudu::Status::ServiceUnavailable("Service is shutting down");
   std::unique_ptr<kudu::rpc::InboundCall> incoming;
   while (service_queue_.BlockingGet(&incoming)) {
-    incoming.release()->RespondFailure(
-        kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status,
+        incoming.release());
   }
 
   service_->Shutdown();
@@ -100,12 +103,20 @@ void ImpalaServicePool::RejectTooBusy(kudu::rpc::InboundCall* c) {
                  c->remote_address().ToString(),
                  service_queue_.max_size());
   rpcs_queue_overflow_.Add(1);
-  c->RespondFailure(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-                    kudu::Status::ServiceUnavailable(err_msg));
+  FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+                    kudu::Status::ServiceUnavailable(err_msg), c);
   VLOG(1) << err_msg << " Contents of service queue:\n"
           << service_queue_.ToString();
 }
 
+void ImpalaServicePool::FailAndReleaseRpc(
+    const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
+    const kudu::Status& status, kudu::rpc::InboundCall* call) {
+  int64_t transfer_size = call->GetTransferSize();
+  call->RespondFailure(error_code, status);
+  mem_tracker_->Release(transfer_size);
+}
+
 kudu::rpc::RpcMethodInfo* ImpalaServicePool::LookupMethod(
     const kudu::rpc::RemoteMethod& method) {
   return service_->LookupMethod(method);
@@ -124,15 +135,16 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
 
   if (!unsupported_features.empty()) {
     c->RespondUnsupportedFeature(unsupported_features);
-    return kudu::Status::NotSupported("call requires unsupported application feature flags",
-                                JoinMapped(unsupported_features,
-                                           [] (uint32_t flag) { return std::to_string(flag); },
-                                           ", "));
+    return kudu::Status::NotSupported(
+        "call requires unsupported application feature flags",
+        JoinMapped(unsupported_features,
+        [] (uint32_t flag) { return std::to_string(flag); }, ", "));
   }
 
   TRACE_TO(c->trace(), "Inserting onto call queue"); // NOLINT(*)
 
   // Queue message on service queue
+  mem_tracker_->Consume(c->GetTransferSize());
   boost::optional<kudu::rpc::InboundCall*> evicted;
   auto queue_status = service_queue_.Put(c, &evicted);
   if (queue_status == kudu::rpc::QueueStatus::QUEUE_FULL) {
@@ -154,11 +166,11 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
   kudu::Status status = kudu::Status::OK();
   if (queue_status == kudu::rpc::QueueStatus::QUEUE_SHUTDOWN) {
     status = kudu::Status::ServiceUnavailable("Service is shutting down");
-    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_SERVER_SHUTTING_DOWN, status, c);
   } else {
     status = kudu::Status::RuntimeError(
         Substitute("Unknown error from BlockingQueue: $0", queue_status));
-    c->RespondFailure(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN, status);
+    FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::FATAL_UNKNOWN, status, c);
   }
   return status;
 }
@@ -181,13 +193,9 @@ void ImpalaServicePool::RunThread() {
 
       // Respond as a failure, even though the client will probably ignore
       // the response anyway.
-      incoming->RespondFailure(
-        kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
-        kudu::Status::TimedOut("Call waited in the queue past client deadline"));
-
-      // Must release since RespondFailure above ends up taking ownership
-      // of the object.
-      ignore_result(incoming.release());
+      FailAndReleaseRpc(kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY,
+          kudu::Status::TimedOut("Call waited in the queue past client deadline"),
+          incoming.release());
       continue;
     }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/impala-service-pool.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.h b/be/src/rpc/impala-service-pool.h
index 93f2972..fe70686 100644
--- a/be/src/rpc/impala-service-pool.h
+++ b/be/src/rpc/impala-service-pool.h
@@ -31,14 +31,16 @@
 #include "util/thread.h"
 
 namespace impala {
+class MemTracker;
 
 // A pool of threads that handle new incoming RPC calls.
 // Also includes a queue that calls get pushed onto for handling by the pool.
 class ImpalaServicePool : public kudu::rpc::RpcService {
  public:
-  ImpalaServicePool(std::unique_ptr<kudu::rpc::ServiceIf> service,
-              const scoped_refptr<kudu::MetricEntity>& metric_entity,
-              size_t service_queue_length);
+  ImpalaServicePool(MemTracker* mem_tracker,
+      std::unique_ptr<kudu::rpc::ServiceIf> service,
+      const scoped_refptr<kudu::MetricEntity>& metric_entity,
+      size_t service_queue_length);
   virtual ~ImpalaServicePool();
 
   // Start up the thread pool.
@@ -58,6 +60,14 @@ class ImpalaServicePool : public kudu::rpc::RpcService {
   void RunThread();
   void RejectTooBusy(kudu::rpc::InboundCall* c);
 
+  // Respond with failure to the incoming call in 'call' with 'error_code' and 'status'
+  // and release the payload memory from 'mem_tracker_'. Takes ownership of 'call'.
+  void FailAndReleaseRpc(const kudu::rpc::ErrorStatusPB::RpcErrorCodePB& error_code,
+      const kudu::Status& status, kudu::rpc::InboundCall* call);
+
+  // Tracks memory of inbound calls in 'service_queue_'.
+  MemTracker* const mem_tracker_;
+
   std::unique_ptr<kudu::rpc::ServiceIf> service_;
   std::vector<std::unique_ptr<Thread> > threads_;
   kudu::rpc::LifoServiceQueue service_queue_;

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc
index 7effda9..c525148 100644
--- a/be/src/rpc/rpc-mgr-test.cc
+++ b/be/src/rpc/rpc-mgr-test.cc
@@ -26,6 +26,7 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "rpc/auth-provider.h"
+#include "runtime/mem-tracker.h"
 #include "testutil/gtest-util.h"
 #include "testutil/mini-kdc-wrapper.h"
 #include "testutil/scoped-flag-setter.h"
@@ -110,6 +111,8 @@ template <class T> class RpcMgrTestBase : public T {
     request->set_sidecar_idx(idx);
   }
 
+  MemTracker* service_tracker() { return &service_tracker_; }
+
  protected:
   TNetworkAddress krpc_address_;
   RpcMgr rpc_mgr_;
@@ -127,6 +130,7 @@ template <class T> class RpcMgrTestBase : public T {
 
  private:
   int32_t payload_[PAYLOAD_SIZE];
+  MemTracker service_tracker_;
 };
 
 // For tests that do not require kerberized testing, we use RpcTest.
@@ -172,25 +176,28 @@ class PingServiceImpl : public PingServiceIf {
  public:
   // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
   PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker,
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker,
       ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
-    : PingServiceIf(entity, tracker), cb_(cb) {}
+    : PingServiceIf(entity, tracker), mem_tracker_(mem_tracker), cb_(cb) {}
 
   virtual void Ping(
       const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
     response->set_int_response(42);
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
     cb_(context);
   }
 
  private:
+  MemTracker* mem_tracker_;
   ServiceCB cb_;
 };
 
 class ScanMemServiceImpl : public ScanMemServiceIf {
  public:
   ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
-      const scoped_refptr<kudu::rpc::ResultTracker> tracker)
-    : ScanMemServiceIf(entity, tracker) {
+      const scoped_refptr<kudu::rpc::ResultTracker> tracker, MemTracker* mem_tracker)
+    : ScanMemServiceIf(entity, tracker), mem_tracker_(mem_tracker) {
   }
 
   // The request comes with an int 'pattern' and a payload of int array sent with
@@ -207,13 +214,20 @@ class ScanMemServiceImpl : public ScanMemServiceIf {
     for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
       int32_t val = v[i];
       if (val != pattern) {
+        // Incoming requests will already be tracked and we need to release the memory.
+        mem_tracker_->Release(context->GetTransferSize());
         context->RespondFailure(kudu::Status::Corruption(
             Substitute("Expecting $1; Found $2", pattern, val)));
         return;
       }
     }
+    // Incoming requests will already be tracked and we need to release the memory.
+    mem_tracker_->Release(context->GetTransferSize());
     context->RespondSuccess();
   }
+
+ private:
+  MemTracker* mem_tracker_;
 };
 
 // TODO: USE_KUDU_KERBEROS and USE_IMPALA_KERBEROS are disabled due to IMPALA-6448.
@@ -225,16 +239,17 @@ INSTANTIATE_TEST_CASE_P(KerberosOnAndOff,
 template <class T>
 Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
     RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
+  MemTracker* mem_tracker = test_base->service_tracker();
   // Test that a service can be started, and will respond to requests.
-  unique_ptr<ServiceIf> ping_impl(
-      new PingServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl)));
+  unique_ptr<ServiceIf> ping_impl(new PingServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(ping_impl), mem_tracker));
 
   // Test that a second service, that verifies the RPC payload is not corrupted,
   // can be started.
-  unique_ptr<ServiceIf> scan_mem_impl(
-      new ScanMemServiceImpl(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
-  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl)));
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr->metric_entity(),
+      rpc_mgr->result_tracker(), mem_tracker));
+  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, move(scan_mem_impl), mem_tracker));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -452,7 +467,6 @@ TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
 }
 
 TEST_F(RpcMgrTest, SlowCallback) {
-
   // Use a callback which is slow to respond.
   auto slow_cb = [](RpcContext* ctx) {
     SleepForMs(300);
@@ -462,11 +476,12 @@ TEST_F(RpcMgrTest, SlowCallback) {
   // Test a service which is slow to respond and has a short queue.
   // Set a timeout on the client side. Expect either a client timeout
   // or the service queue filling up.
-  unique_ptr<ServiceIf> impl(
-      new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
+  unique_ptr<ServiceIf> impl(new PingServiceImpl(rpc_mgr_.metric_entity(),
+      rpc_mgr_.result_tracker(), service_tracker(), slow_cb));
   const int num_service_threads = 1;
   const int queue_size = 3;
-  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl)));
+  ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl),
+      service_tracker()));
 
   FLAGS_num_acceptor_threads = 2;
   FLAGS_num_reactor_threads = 10;
@@ -487,9 +502,9 @@ TEST_F(RpcMgrTest, SlowCallback) {
 }
 
 TEST_F(RpcMgrTest, AsyncCall) {
-  unique_ptr<ServiceIf> scan_mem_impl(
-      new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
-  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl)));
+  unique_ptr<ServiceIf> scan_mem_impl(new ScanMemServiceImpl(rpc_mgr_.metric_entity(),
+      rpc_mgr_.result_tracker(), service_tracker()));
+  ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl), service_tracker()));
 
   unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
   ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc
index c70c117..7adde36 100644
--- a/be/src/rpc/rpc-mgr.cc
+++ b/be/src/rpc/rpc-mgr.cc
@@ -114,11 +114,11 @@ Status RpcMgr::Init() {
 }
 
 Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-    unique_ptr<ServiceIf> service_ptr) {
+    unique_ptr<ServiceIf> service_ptr, MemTracker* mem_tracker) {
   DCHECK(is_inited()) << "Must call Init() before RegisterService()";
   DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()";
   scoped_refptr<ImpalaServicePool> service_pool =
-      new ImpalaServicePool(std::move(service_ptr),
+      new ImpalaServicePool(mem_tracker, std::move(service_ptr),
           messenger_->metric_entity(), service_queue_depth);
   // Start the thread pool first before registering the service in case the startup fails.
   RETURN_IF_ERROR(service_pool->Init(num_service_threads));

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/rpc/rpc-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h
index b2099f2..fc74c2e 100644
--- a/be/src/rpc/rpc-mgr.h
+++ b/be/src/rpc/rpc-mgr.h
@@ -124,7 +124,8 @@ class RpcMgr {
   ///
   /// It is an error to call this after StartServices() has been called.
   Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth,
-      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr) WARN_UNUSED_RESULT;
+      std::unique_ptr<kudu::rpc::ServiceIf> service_ptr, MemTracker* mem_tracker)
+      WARN_UNUSED_RESULT;
 
   /// Creates a new proxy for a remote service of type P at location 'address', and places
   /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 0551848..1c3ab7a 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -301,23 +301,6 @@ Status ExecEnv::Init() {
   // Resolve hostname to IP address.
   RETURN_IF_ERROR(HostnameToIpAddr(backend_address_.hostname, &ip_address_));
 
-  // Initialize the RPCMgr before allowing services registration.
-  if (FLAGS_use_krpc) {
-    krpc_address_.__set_hostname(ip_address_);
-    RETURN_IF_ERROR(KrpcStreamMgr()->Init());
-    RETURN_IF_ERROR(rpc_mgr_->Init());
-    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
-    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
-        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
-    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
-        FLAGS_datastream_service_queue_depth, move(data_svc)));
-    // Bump thread cache to 1GB to reduce contention for TCMalloc central
-    // list's spinlock.
-    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
-      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
-    }
-  }
-
   mem_tracker_.reset(
       new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process"));
   // Add BufferPool MemTrackers for cached memory that is not tracked against queries
@@ -334,6 +317,35 @@ Status ExecEnv::Init() {
       BufferPoolMetric::UNUSED_RESERVATION_BYTES));
   obj_pool_->Add(new MemTracker(negated_unused_reservation, -1,
       "Buffer Pool: Unused Reservation", mem_tracker_.get()));
+
+  // Initialize the RPCMgr before allowing services registration.
+  if (FLAGS_use_krpc) {
+    krpc_address_.__set_hostname(ip_address_);
+    RETURN_IF_ERROR(rpc_mgr_->Init());
+
+    // Add a MemTracker for memory used to store incoming calls before they handed over to
+    // the data stream manager.
+    MemTracker* data_svc_tracker = obj_pool_->Add(
+        new MemTracker(-1, "Data Stream Service", mem_tracker_.get()));
+
+    // Add a MemTracker for the data stream manager, which uses it to track memory used by
+    // deferred RPC calls while they are buffered in the data stream manager.
+    MemTracker* stream_mgr_tracker = obj_pool_->Add(
+        new MemTracker(-1, "Data Stream Queued RPC Calls", mem_tracker_.get()));
+    RETURN_IF_ERROR(KrpcStreamMgr()->Init(stream_mgr_tracker, data_svc_tracker));
+
+    unique_ptr<ServiceIf> data_svc(new DataStreamService(rpc_mgr_.get()));
+    int num_svc_threads = FLAGS_datastream_service_num_svc_threads > 0 ?
+        FLAGS_datastream_service_num_svc_threads : CpuInfo::num_cores();
+    RETURN_IF_ERROR(rpc_mgr_->RegisterService(num_svc_threads,
+        FLAGS_datastream_service_queue_depth, move(data_svc), data_svc_tracker));
+    // Bump thread cache to 1GB to reduce contention for TCMalloc central
+    // list's spinlock.
+    if (FLAGS_tcmalloc_max_total_thread_cache_bytes == 0) {
+      FLAGS_tcmalloc_max_total_thread_cache_bytes = 1 << 30;
+    }
+  }
+
 #if !defined(ADDRESS_SANITIZER) && !defined(THREAD_SANITIZER)
   // Change the total TCMalloc thread cache size if necessary.
   if (FLAGS_tcmalloc_max_total_thread_cache_bytes > 0 &&

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index 86955c8..3f777ea 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -24,6 +24,8 @@
 
 #include "kudu/rpc/rpc_context.h"
 
+#include "exec/kudu-util.h"
+#include "runtime/mem-tracker.h"
 #include "runtime/krpc-data-stream-recvr.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/row-batch.h"
@@ -70,7 +72,10 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
       "total-senders-timedout-waiting-for-recvr-creation", 0L);
 }
 
-Status KrpcDataStreamMgr::Init() {
+Status KrpcDataStreamMgr::Init(MemTracker* mem_tracker,
+    MemTracker* incoming_request_tracker) {
+  mem_tracker_ = mem_tracker;
+  incoming_request_tracker_ = incoming_request_tracker;
   RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
       [this](){ this->Maintenance(); }, &maintenance_thread_));
   RETURN_IF_ERROR(deserialize_pool_.Init());
@@ -109,13 +114,15 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
       // Let the receiver take over the RPC payloads of early senders and process them
       // asynchronously.
       for (unique_ptr<TransmitDataCtx>& ctx : early_senders.waiting_sender_ctxs) {
+        // Release memory. The receiver will track it in its instance tracker.
+        int64_t transfer_size = ctx->rpc_context->GetTransferSize();
         recvr->TakeOverEarlySender(move(ctx));
+        mem_tracker_->Release(transfer_size);
         num_senders_waiting_->Increment(-1);
       }
       for (const unique_ptr<EndDataStreamCtx>& ctx : early_senders.closed_sender_ctxs) {
         recvr->RemoveSender(ctx->request->sender_id());
-        Status::OK().ToProto(ctx->response->mutable_status());
-        ctx->rpc_context->RespondSuccess();
+        RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context, mem_tracker_);
         num_senders_waiting_->Increment(-1);
       }
       early_senders_map_.erase(it);
@@ -150,6 +157,10 @@ shared_ptr<KrpcDataStreamRecvr> KrpcDataStreamMgr::FindRecvr(
 void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
     const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
+  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
+  incoming_request_tracker_->ReleaseLocal(
+      rpc_context->GetTransferSize(), mem_tracker_->parent());
+  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].waiting_sender_ctxs.emplace_back(move(payload));
@@ -160,6 +171,10 @@ void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
 void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
     const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
+  DCHECK_EQ(incoming_request_tracker_->parent(), mem_tracker_->parent());
+  incoming_request_tracker_->ReleaseLocal(
+      rpc_context->GetTransferSize(), mem_tracker_->parent());
+  mem_tracker_->ConsumeLocal(rpc_context->GetTransferSize(), mem_tracker_->parent());
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
   early_senders_map_[recvr_id].closed_sender_ctxs.emplace_back(move(payload));
@@ -199,12 +214,15 @@ void KrpcDataStreamMgr::AddData(const TransmitDataRequestPB* request,
     // detect this case by checking already_unregistered - if true then the receiver was
     // already closed deliberately, and there's no unexpected error here.
     ErrorMsg msg(TErrorCode::DATASTREAM_RECVR_CLOSED, PrintId(finst_id), dest_node_id);
-    Status::Expected(msg).ToProto(response->mutable_status());
-    rpc_context->RespondSuccess();
+    RespondAndReleaseRpc(Status::Expected(msg), response, rpc_context,
+        incoming_request_tracker_);
     return;
   }
   DCHECK(recvr != nullptr);
+  int64_t transfer_size = rpc_context->GetTransferSize();
   recvr->AddBatch(request, response, rpc_context);
+  // Release memory. The receiver already tracks it in its instance tracker.
+  incoming_request_tracker_->Release(transfer_size);
 }
 
 void KrpcDataStreamMgr::EnqueueDeserializeTask(const TUniqueId& finst_id,
@@ -252,8 +270,7 @@ void KrpcDataStreamMgr::CloseSender(const EndDataStreamRequestPB* request,
   // If we reach this point, either the receiver is found or it has been unregistered
   // already. In either cases, it's safe to just return an OK status.
   if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id());
-  Status::OK().ToProto(response->mutable_status());
-  rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::OK(), response, rpc_context, incoming_request_tracker_);
 
   {
     // TODO: Move this to maintenance thread.
@@ -338,12 +355,21 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextTyp
   ErrorMsg msg(TErrorCode::DATASTREAM_SENDER_TIMEOUT, remote_addr, PrintId(finst_id),
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
-  Status::Expected(msg).ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::Expected(msg), ctx->response, ctx->rpc_context,
+      mem_tracker_);
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }
 
+template<typename ResponsePBType>
+void KrpcDataStreamMgr::RespondAndReleaseRpc(const Status& status,
+    ResponsePBType* response, kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker) {
+  status.ToProto(response->mutable_status());
+  int64_t transfer_size = ctx->GetTransferSize();
+  ctx->RespondSuccess();
+  mem_tracker->Release(transfer_size);
+}
+
 void KrpcDataStreamMgr::Maintenance() {
   while (true) {
     // Notify any senders that have been waiting too long for their receiver to

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 5171e80..458ebe7 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -225,16 +225,13 @@ struct EndDataStreamCtx {
 ///  time.
 ///  'total-senders-timedout-waiting-for-recvr-creation' - total number of senders that
 ///  timed-out while waiting for a receiver.
-///
-/// TODO: The recv buffers used in KrpcDataStreamRecvr should count against
-/// per-query memory limits.
 class KrpcDataStreamMgr : public DataStreamMgrBase {
  public:
   KrpcDataStreamMgr(MetricGroup* metrics);
 
   /// Initialize the deserialization thread pool and create the maintenance thread.
   /// Return error status on failure. Return OK otherwise.
-  Status Init();
+  Status Init(MemTracker* mem_tracker, MemTracker* incoming_request_tracker);
 
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
@@ -293,6 +290,18 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
  private:
   friend class KrpcDataStreamRecvr;
 
+  /// MemTracker for memory used for transmit data requests before we hand them over to a
+  /// specific receiver. Used only to track payloads of deferred RPCs (e.g. early
+  /// senders). Not owned.
+  MemTracker* mem_tracker_ = nullptr;
+
+  /// MemTracker which is used by the DataStreamService to track memory for incoming
+  /// requests. Memory for new incoming requests is initially tracked against this tracker
+  /// before the requests are handed over to the data stream manager. It is this class's
+  /// responsibility to release memory from this tracker and track it against its own
+  /// tracker (here: mem_tracker_). Not owned.
+  MemTracker* incoming_request_tracker_ = nullptr;
+
   /// A task for the deserialization threads to work on. The fields identify
   /// the target receiver's sender queue.
   struct DeserializeTask {
@@ -466,6 +475,12 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   template<typename ContextType, typename RequestPBType>
   void RespondToTimedOutSender(const std::unique_ptr<ContextType>& ctx);
 
+  /// Respond to the RPC passed in 'response'/'ctx' with 'status' and release the payload
+  /// memory from 'mem_tracker'. Takes ownership of 'ctx'.
+  template<typename ResponsePBType>
+  void RespondAndReleaseRpc(const Status& status, ResponsePBType* response,
+      kudu::rpc::RpcContext* ctx, MemTracker* mem_tracker);
+
   /// Notifies any sender that has been waiting for its receiver for more than
   /// FLAGS_datastream_sender_timeout_ms.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/krpc-data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-recvr.cc b/be/src/runtime/krpc-data-stream-recvr.cc
index 68f00e3..138cc8b 100644
--- a/be/src/runtime/krpc-data-stream-recvr.cc
+++ b/be/src/runtime/krpc-data-stream-recvr.cc
@@ -126,6 +126,10 @@ class KrpcDataStreamRecvr::SenderQueue {
       const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data,
       unique_lock<SpinLock>* lock);
 
+  // Respond to the TransmitData RPC passed in 'ctx' with 'status' and release the payload
+  // memory from the MemTracker associated with 'recvr_'.
+  void RespondAndReleaseRpc(const Status& status, const unique_ptr<TransmitDataCtx>& ctx);
+
   // Receiver of which this queue is a member.
   KrpcDataStreamRecvr* recvr_;
 
@@ -321,6 +325,14 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatchWork(int64_t batch_size,
   data_arrival_cv_.notify_one();
 }
 
+void KrpcDataStreamRecvr::SenderQueue::RespondAndReleaseRpc(const Status& status,
+    const unique_ptr<TransmitDataCtx>& ctx) {
+  int64_t transfer_size = ctx->rpc_context->GetTransferSize();
+  status.ToProto(ctx->response->mutable_status());
+  ctx->rpc_context->RespondSuccess();
+  recvr_->mem_tracker()->Release(transfer_size);
+}
+
 void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* request,
     TransmitDataResponsePB* response, RpcContext* rpc_context) {
   // TODO: Add timers for time spent in this function and queue time in 'batch_queue_'.
@@ -349,13 +361,14 @@ void KrpcDataStreamRecvr::SenderQueue::AddBatch(const TransmitDataRequestPB* req
       return;
     }
 
-    // If there's something in the queue and this batch will push us over the buffer
+    // If there's something in the queue or this batch will push us over the buffer
     // limit we need to wait until the queue gets drained. We store the rpc context
     // so that we can signal it at a later time to resend the batch that we couldn't
     // process here. If there are already deferred RPCs waiting in queue, the new
     // batch needs to line up after the deferred RPCs to avoid starvation of senders
     // in the non-merging case.
     if (UNLIKELY(!deferred_rpcs_.empty() || !CanEnqueue(batch_size))) {
+      recvr_->mem_tracker()->Consume(rpc_context->GetTransferSize());
       auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
       deferred_rpcs_.push(move(payload));
       COUNTER_ADD(recvr_->num_deferred_batches_, 1);
@@ -391,8 +404,7 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
         &tuple_data, &batch_size);
     // Reply with error status if the entry cannot be unpacked.
     if (UNLIKELY(!status.ok())) {
-      status.ToProto(ctx->response->mutable_status());
-      ctx->rpc_context->RespondSuccess();
+      RespondAndReleaseRpc(status, ctx);
       deferred_rpcs_.pop();
       return;
     }
@@ -412,13 +424,13 @@ void KrpcDataStreamRecvr::SenderQueue::DequeueDeferredRpc() {
   }
 
   // Responds to the sender to ack the insertion of the row batches.
-  Status::OK().ToProto(ctx->response->mutable_status());
-  ctx->rpc_context->RespondSuccess();
+  RespondAndReleaseRpc(Status::OK(), ctx);
 }
 
 void KrpcDataStreamRecvr::SenderQueue::TakeOverEarlySender(
     unique_ptr<TransmitDataCtx> ctx) {
   int sender_id = ctx->request->sender_id();
+  recvr_->mem_tracker()->Consume(ctx->rpc_context->GetTransferSize());
   COUNTER_ADD(recvr_->num_deferred_batches_, 1);
   {
     lock_guard<SpinLock> l(lock_);
@@ -449,8 +461,7 @@ void KrpcDataStreamRecvr::SenderQueue::Cancel() {
     // Respond to deferred RPCs.
     while (!deferred_rpcs_.empty()) {
       const unique_ptr<TransmitDataCtx>& payload = deferred_rpcs_.front();
-      Status::OK().ToProto(payload->response->mutable_status());
-      payload->rpc_context->RespondSuccess();
+      RespondAndReleaseRpc(Status::OK(), payload);
       deferred_rpcs_.pop();
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index c582d72..4228288 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -55,8 +55,8 @@ class TQueryOptions;
 ///
 /// By default, memory consumption is tracked via calls to Consume()/Release(), either to
 /// the tracker itself or to one of its descendents. Alternatively, a consumption metric
-/// can specified, and then the metric's value is used as the consumption rather than the
-/// tally maintained by Consume() and Release(). A tcmalloc metric is used to track
+/// can be specified, and then the metric's value is used as the consumption rather than
+/// the tally maintained by Consume() and Release(). A tcmalloc metric is used to track
 /// process memory consumption, since the process memory usage may be higher than the
 /// computed total memory (tcmalloc does not release deallocated memory immediately).
 /// Other consumption metrics are used in trackers below the process level to account

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/be/src/util/memory-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/memory-metrics.h b/be/src/util/memory-metrics.h
index 6c10e09..0ac04bf 100644
--- a/be/src/util/memory-metrics.h
+++ b/be/src/util/memory-metrics.h
@@ -198,7 +198,7 @@ class BufferPoolMetric : public IntGauge {
   static Status InitMetrics(MetricGroup* metrics, ReservationTracker* global_reservations,
       BufferPool* buffer_pool) WARN_UNUSED_RESULT;
 
-  /// Global metrics, initialized by CreateAndRegisterMetrics().
+  /// Global metrics, initialized by InitMetrics().
   static BufferPoolMetric* LIMIT;
   static BufferPoolMetric* SYSTEM_ALLOCATED;
   static BufferPoolMetric* RESERVED;

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/common/protobuf/data_stream_service.proto
----------------------------------------------------------------------
diff --git a/common/protobuf/data_stream_service.proto b/common/protobuf/data_stream_service.proto
index 3aa3f28..c2045d2 100644
--- a/common/protobuf/data_stream_service.proto
+++ b/common/protobuf/data_stream_service.proto
@@ -41,7 +41,7 @@ message TransmitDataRequestPB {
   optional int32 tuple_offsets_sidecar_idx = 5;
 
   // The sidecar index of the tuple's data which is a (compressed) row batch.
-  // The details  of the row batch (e.g. # of rows) is in 'row_batch_header' above.
+  // The details of the row batch (e.g. # of rows) is in 'row_batch_header' above.
   optional int32 tuple_data_sidecar_idx = 6;
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
new file mode 100644
index 0000000..ed7b056
--- /dev/null
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import time
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_cluster import ImpalaCluster
+from tests.common.skip import SkipIfBuildType
+from tests.verifiers.mem_usage_verifier import MemUsageVerifier
+
+DATA_STREAM_MGR_METRIC = "Data Stream Queued RPC Calls"
+DATA_STREAM_SVC_METRIC = "Data Stream Service"
+ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
+
+class TestKrpcMemUsage(CustomClusterTestSuite):
+  """Test for memory usage tracking when using KRPC."""
+  TEST_QUERY = "select count(c2.string_col) from \
+     functional.alltypestiny join functional.alltypessmall c2"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def setup_class(cls):
+    if cls.exploration_strategy() != 'exhaustive':
+      pytest.skip('runs only in exhaustive')
+    super(TestKrpcMemUsage, cls).setup_class()
+
+  def execute_query_verify_mem_usage(self, query, non_zero_peak_metrics):
+    """Executes 'query' and makes sure that the memory used by KRPC is returned to the
+    memtrackers. It also verifies that metrics in 'non_zero_peak_metrics' have a peak
+    value > 0.
+    """
+    self.client.execute(query)
+    self.verify_mem_usage(non_zero_peak_metrics)
+
+  def verify_mem_usage(self, non_zero_peak_metrics):
+    """Verifies that the memory used by KRPC is returned to the memtrackers and that
+    metrics in 'non_zero_peak_metrics' have a peak value > 0.
+    """
+    verifiers = [ MemUsageVerifier(i.service) for i in ImpalaCluster().impalads ]
+    for verifier in verifiers:
+      for metric_name in ALL_METRICS:
+        usage = verifier.get_mem_usage_values(metric_name)
+        assert usage["total"] == 0
+        if metric_name in non_zero_peak_metrics:
+          assert usage["peak"] > 0, metric_name
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc")
+  def test_krpc_unqueued_memory_usage(self, vector):
+    """Executes a simple query and checks that the data stream service consumed some
+    memory.
+    """
+    # The data stream manager may not need to track memory in any queue if the receivers
+    # show up in time.
+    self.execute_query_verify_mem_usage(self.TEST_QUERY, [DATA_STREAM_SVC_METRIC])
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc --stress_datastream_recvr_delay_ms=1000")
+  def test_krpc_deferred_memory_usage(self, vector):
+    """Executes a simple query. The cluster is started with delayed receiver creation to
+    trigger RPC queueing.
+    """
+    self.execute_query_verify_mem_usage(self.TEST_QUERY, ALL_METRICS)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("--use_krpc --stress_datastream_recvr_delay_ms=1000")
+  def test_krpc_deferred_memory_cancellation(self, vector):
+    """Executes a query and cancels it while RPCs are still queued up. This exercises the
+    code to flush the deferred RPC queue in the receiver.
+    """
+    query = "select count(*) from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 \
+            where l1.l_orderkey = l2.l_orderkey"
+    # Warm up metadata
+    self.client.execute(query)
+    # Execute and cancel query
+    handle = self.client.execute_async(query)
+    # Sleep to allow RPCs to arrive.
+    time.sleep(0.5)
+    self.client.cancel(handle)
+    self.client.close()
+    self.verify_mem_usage(ALL_METRICS)

http://git-wip-us.apache.org/repos/asf/impala/blob/3bfda334/tests/verifiers/mem_usage_verifier.py
----------------------------------------------------------------------
diff --git a/tests/verifiers/mem_usage_verifier.py b/tests/verifiers/mem_usage_verifier.py
new file mode 100644
index 0000000..644e5fa
--- /dev/null
+++ b/tests/verifiers/mem_usage_verifier.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Verifier for memtracker usage values (Total, Peak, etc).
+
+import re
+
+SIZE_FACTORS = {"b": 1, "kb": 1 << 10, "mb": 1 << 20, "gb": 1 << 30}
+
+def parse_mem_value(value):
+  """Parses a memory value with an optional unit like "123", "10 B", or "1.5 KB" into an
+  number of bytes.
+  """
+  elements = value.split()
+  result = float(elements[0])
+  if len(elements) > 1:
+    result *= SIZE_FACTORS[elements[1].lower()]
+  return result
+
+class MemUsageVerifier(object):
+  """MemUsageVerifier objects can be used to verify values in the debug output of memory
+  trackers.
+  """
+
+  def __init__(self, impalad_service):
+    """Initialize module given an ImpalaService object"""
+    self.impalad_service = impalad_service
+
+  def get_mem_usage_values(self, name):
+    """Returns a dictionary of all key=value pairs of the memtracker specified by 'name'
+    by reading the '/memz' debug webpage. It also parses and converts memory values
+    including optional units like "10 B" or "1.5 KB". All strings are converted to
+    lowercase. Only the first line starting with 'name' is considered.
+
+    For example, for the line "Data Stream Service: Total=0 Peak=108.00 B" this will
+    return "dict(total=0, peak=108.0)".
+    """
+    memz = self.impalad_service.get_debug_webpage_json("memz")
+    details = memz.get("detailed", "")
+    for line in details.splitlines():
+      line = line.strip()
+      prefix = name + ":"
+      if line.startswith(prefix):
+        line = line[len(prefix):]
+        result = {}
+        # The value regex matches either '0' or any number including a decimal dot,
+        # followed by a required unit.
+        for k, v in re.findall(r"(\S+)=(0|[\d\.]+ [KMG]?B)", line):
+          result[k.lower()] = parse_mem_value(v)
+        return result
+    return {}
+
+
+
+
+