You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/20 20:44:14 UTC

[16/21] impala git commit: IMPALA-6652: Rename label of MemTracker for early RPCs

IMPALA-6652: Rename label of MemTracker for early RPCs

This change renames the label of the MemTracker in
KrpcDataStreamMgr for tracking payloads of early RPCs
to "Data Stream Manager Early RPCs". This is to distinguish
these RPCs from the deferred RPCs in a receiver. The early
RPCs refer to those RPCs which arrive before a receiver
is ready. The responses to these RPCs are deferred until
the receiver is created. The receiver may also defer
responses to RPCs if the deserialized payloads of RPCs in
an inbound queue exceed FLAGS_exchg_node_buffer_size_bytes.
In this case, the RPCs won't be responded to until the
inbound queue is drained.

Change-Id: I5bb72c28e8d660a6b78543dbc8b5b156e0e7c843
Reviewed-on: http://gerrit.cloudera.org:8080/9633
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/5eba80be
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/5eba80be
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/5eba80be

Branch: refs/heads/2.x
Commit: 5eba80beef4498e60deb25f8c791e0e73c14081f
Parents: 45568e6
Author: Lars Volker <lv...@cloudera.com>
Authored: Wed Mar 14 10:33:48 2018 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sun Mar 18 21:03:22 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/krpc-data-stream-mgr.cc      | 14 +++++++-------
 be/src/runtime/krpc-data-stream-mgr.h       |  8 ++++----
 tests/custom_cluster/test_krpc_mem_usage.py |  8 ++++----
 3 files changed, 15 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/5eba80be/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index b291fee..5a9305f 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -78,9 +78,9 @@ KrpcDataStreamMgr::KrpcDataStreamMgr(MetricGroup* metrics)
 }
 
 Status KrpcDataStreamMgr::Init(MemTracker* service_mem_tracker) {
-  // MemTracker for tracking memory used for buffering deferred RPC calls which
+  // MemTracker for tracking memory used for buffering early RPC calls which
   // arrive before the receiver is ready.
-  mem_tracker_.reset(new MemTracker(-1, "Data Stream Manager Deferred RPCs",
+  early_rpcs_tracker_.reset(new MemTracker(-1, "Data Stream Manager Early RPCs",
       ExecEnv::GetInstance()->process_mem_tracker()));
   service_mem_tracker_ = service_mem_tracker;
   RETURN_IF_ERROR(Thread::Create("krpc-data-stream-mgr", "maintenance",
@@ -134,14 +134,14 @@ shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
     // Release memory. The receiver will track it in its instance tracker.
     int64_t transfer_size = ctx->rpc_context->GetTransferSize();
     recvr->TakeOverEarlySender(move(ctx));
-    mem_tracker_->Release(transfer_size);
+    early_rpcs_tracker_->Release(transfer_size);
     num_senders_waiting_->Increment(-1);
   }
   for (const unique_ptr<EndDataStreamCtx>& ctx :
       early_senders_for_recvr.closed_sender_ctxs) {
     recvr->RemoveSender(ctx->request->sender_id());
     DataStreamService::RespondAndReleaseRpc(Status::OK(), ctx->response, ctx->rpc_context,
-        mem_tracker_.get());
+        early_rpcs_tracker_.get());
     num_senders_waiting_->Increment(-1);
   }
   return recvr;
@@ -174,7 +174,7 @@ void KrpcDataStreamMgr::AddEarlySender(const TUniqueId& finst_id,
     const TransmitDataRequestPB* request, TransmitDataResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
   const int64_t transfer_size = rpc_context->GetTransferSize();
-  mem_tracker_->Consume(transfer_size);
+  early_rpcs_tracker_->Consume(transfer_size);
   service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<TransmitDataCtx>(request, response, rpc_context);
@@ -187,7 +187,7 @@ void KrpcDataStreamMgr::AddEarlyClosedSender(const TUniqueId& finst_id,
     const EndDataStreamRequestPB* request, EndDataStreamResponsePB* response,
     kudu::rpc::RpcContext* rpc_context) {
   const int64_t transfer_size = rpc_context->GetTransferSize();
-  mem_tracker_->Consume(transfer_size);
+  early_rpcs_tracker_->Consume(transfer_size);
   service_mem_tracker_->Release(transfer_size);
   RecvrId recvr_id = make_pair(finst_id, request->dest_node_id());
   auto payload = make_unique<EndDataStreamCtx>(request, response, rpc_context);
@@ -351,7 +351,7 @@ void KrpcDataStreamMgr::RespondToTimedOutSender(const std::unique_ptr<ContextTyp
       ctx->request->dest_node_id());
   VLOG_QUERY << msg.msg();
   DataStreamService::RespondAndReleaseRpc(Status::Expected(msg), ctx->response,
-      ctx->rpc_context, mem_tracker_.get());
+      ctx->rpc_context, early_rpcs_tracker_.get());
   num_senders_waiting_->Increment(-1);
   num_senders_timedout_->Increment(1);
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/5eba80be/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 889aee5..3cd2191 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -295,10 +295,10 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   friend class KrpcDataStreamRecvr;
   friend class DataStreamTest;
 
-  /// MemTracker for memory used for transmit data requests before we hand them over to a
-  /// specific receiver. Used only to track payloads of deferred RPCs (e.g. early
-  /// senders).
-  std::unique_ptr<MemTracker> mem_tracker_;
+  /// MemTracker for memory used for early transmit data RPCs which arrive before the
+  /// receiver is created. The memory of the RPC payload is transferred to the receiver
+  /// once it's created.
+  std::unique_ptr<MemTracker> early_rpcs_tracker_;
 
   /// MemTracker used by the DataStreamService to track memory for incoming requests.
   /// Memory for new incoming requests is initially tracked against this tracker before

http://git-wip-us.apache.org/repos/asf/impala/blob/5eba80be/tests/custom_cluster/test_krpc_mem_usage.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_krpc_mem_usage.py b/tests/custom_cluster/test_krpc_mem_usage.py
index a145a7a..c26b776 100644
--- a/tests/custom_cluster/test_krpc_mem_usage.py
+++ b/tests/custom_cluster/test_krpc_mem_usage.py
@@ -22,7 +22,7 @@ from tests.common.impala_cluster import ImpalaCluster
 from tests.common.skip import SkipIf, SkipIfBuildType
 from tests.verifiers.mem_usage_verifier import MemUsageVerifier
 
-DATA_STREAM_MGR_METRIC = "Data Stream Manager Deferred RPCs"
+DATA_STREAM_MGR_METRIC = "Data Stream Manager Early RPCs"
 DATA_STREAM_SVC_METRIC = "Data Stream Service Queue"
 ALL_METRICS = [ DATA_STREAM_MGR_METRIC, DATA_STREAM_SVC_METRIC ]
 
@@ -74,7 +74,7 @@ class TestKrpcMemUsage(CustomClusterTestSuite):
   @SkipIfBuildType.not_dev_build
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=1000")
-  def test_krpc_deferred_memory_usage(self, vector):
+  def test_krpc_early_sender_memory_usage(self, vector):
     """Executes a simple query. The cluster is started with delayed receiver creation to
     trigger RPC queueing.
     """
@@ -83,9 +83,9 @@ class TestKrpcMemUsage(CustomClusterTestSuite):
   @SkipIfBuildType.not_dev_build
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args("--stress_datastream_recvr_delay_ms=1000")
-  def test_krpc_deferred_memory_cancellation(self, vector):
+  def test_krpc_early_sender_memory_cancellation(self, vector):
     """Executes a query and cancels it while RPCs are still queued up. This exercises the
-    code to flush the deferred RPC queue in the receiver.
+    code to flush the early sender RPC queue in the receiver.
     """
     query = "select count(*) from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 \
             where l1.l_orderkey = l2.l_orderkey"