You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/02/11 01:32:14 UTC

[2/2] impala git commit: IMPALA-6396: Exchange node's memory usage should include its receiver's

IMPALA-6396: Exchange node's memory usage should include its receiver's

A DataStreamRecvr is co-owned by the DataStreamMgr and
an Exchange node. However, the life time of the memory
allocations (e.g. row batches) of a DataStreamRecvr never
exceeds that of its owning Exchange node. Previously, we
used the fragment instance's MemTracker as the parent of
the DataStreamRecvr's MemTracker. This change switches to
using the MemTracker of the owning Exchange node as the
parent tracker of the DataStreamRecvr. This makes it
easier to identify the peak memory usage of the receivers
of different exchange nodes in the runtime profile and
query summary. Most of the exchange node's memory usage
is from its receiver so we don't track the peak memory
usage of the receiver separately.

Sample output from TPCH-Q21:

EXCHANGE_NODE (id=18):(Total: 1s448ms, non-child: 265.818ms, % non-child: 18.35%)
   - ConvertRowBatchTime: 223.895ms
   - PeakMemoryUsage: 10.04 MB (10524943)
   - RowsReturned: 1.27M (1267464)
   - RowsReturnedRate: 875.19 K/sec
  RecvrSide:
    BytesReceived(500.000ms): 0, 1.64 MB, 9.98 MB, 9.98 MB, 10.01 MB, 10.01 MB, 10.01 MB, 31.79 MB, 60.19 MB, 87.84 MB
     - FirstBatchArrivalWaitTime: 0.000ns
     - TotalBytesReceived: 93.07 MB (97594728)
     - TotalGetBatchTime: 1s194ms
       - DataArrivalTimer: 1s183ms
   SenderSide:
      - DeserializeRowBatchTime: 344.343ms
      - NumBatchesAccepted: 3.80K (3796)
      - NumBatchesDeferred: 5 (5)
      - NumEarlySenders: 0 (0)

Testing done: Updated test_observability.py to verify the
peak memory usage of exchange node is not 0.

Change-Id: I8ca3c47d87bfcd221d34565eda1878f3c15d5c45
Reviewed-on: http://gerrit.cloudera.org:8080/9202
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2e603478
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2e603478
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2e603478

Branch: refs/heads/master
Commit: 2e60347868d7e719d80401f9abcbe971e659502b
Parents: 0047f81
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 1 13:56:31 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 10 03:10:26 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc           |  6 +++---
 be/src/runtime/data-stream-mgr-base.h  |  9 +++++----
 be/src/runtime/data-stream-mgr.cc      | 17 ++++++++---------
 be/src/runtime/data-stream-mgr.h       | 11 ++++++-----
 be/src/runtime/data-stream-test.cc     | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.cc | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.h  | 11 ++++++-----
 tests/query_test/test_observability.py |  2 ++
 8 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 353a59b..cc39382 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -81,9 +81,9 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
-  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state,
-      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
-      FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
+  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(&input_row_desc_,
+      state->fragment_instance_id(), id_, num_senders_,
+      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker());
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index 0e392e3..f9761cb 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -26,6 +26,7 @@
 namespace impala {
 
 class DataStreamRecvrBase;
+class MemTracker;
 class RuntimeProfile;
 class RuntimeState;
 class TRowBatch;
@@ -43,10 +44,10 @@ class DataStreamMgrBase : public CacheLineAligned {
   virtual ~DataStreamMgrBase() { }
 
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) = 0;
+  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) = 0;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 45eee7f..48a819c 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -75,17 +75,16 @@ inline uint32_t DataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-    RuntimeProfile* profile, bool is_merging) {
-  DCHECK(profile != NULL);
+shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(const RowDescriptor* row_desc,
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+    int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
+  DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="
             << fragment_instance_id << ", node=" << dest_node_id;
-  shared_ptr<DataStreamRecvr> recvr(
-      new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
-          profile));
+  shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(this, parent_tracker, row_desc,
+      fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
   lock_guard<mutex> l(lock_);
   fragment_recvr_set_.insert(make_pair(fragment_instance_id, dest_node_id));

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 07f7c56..2be6478 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -71,13 +71,14 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender instance

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 07eefd4..75d5ac9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -415,8 +415,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-        instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, is_merging);
+    info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+        num_senders, buffer_size, is_merging, profile, &tracker_);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
     } else {
@@ -767,9 +767,8 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
-      false);
+  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
+      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last
   // (to confirm that the destructor does not access invalid state after tear-down).
@@ -832,8 +831,8 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
   receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
   ReceiverInfo& info = receiver_info_.back();
-  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+      4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
       &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, &info);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index fabea13..91111dc 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -94,16 +94,15 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
 }
 
 shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
-    RuntimeState* state, const RowDescriptor* row_desc,
-    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
-    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-
+    const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
+    int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
   DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="<< finst_id
             << ", node=" << dest_node_id;
-  shared_ptr<KrpcDataStreamRecvr> recvr(
-      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
+  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(this, parent_tracker,
+      row_desc, finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
   EarlySendersList early_senders_for_recvr;
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 458ebe7..16c0b30 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -236,13 +236,14 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Handler for TransmitData() RPC.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index e838081..75e5194 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -38,6 +38,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
     assert result.exec_summary[0]['num_rows'] == 5
     assert result.exec_summary[0]['est_num_rows'] == 5
+    assert result.exec_summary[0]['peak_mem'] > 0
 
     for line in result.runtime_profile.split('\n'):
       # The first 'RowsProduced' we find is for the coordinator fragment.
@@ -55,6 +56,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
     assert result.exec_summary[5]['num_rows'] == 25
     assert result.exec_summary[5]['est_num_rows'] == 25
+    assert result.exec_summary[5]['peak_mem'] > 0
 
   @SkipIfS3.hbase
   @SkipIfLocal.hbase