You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2018/02/11 01:32:13 UTC

[1/2] impala git commit: IMPALA-3271: organise and warn on removed startup flags

Repository: impala
Updated Branches:
  refs/heads/master 315142190 -> 2e6034786


IMPALA-3271: organise and warn on removed startup flags

Gather all the removed flags in a single place and establish consistent
behaviour.

If the flag is set, a warning is logged. GFlags are initialised before
logging, so the warning goes to stderr instead of the WARNING log.

The affected flags are:
be_service_threads, cgroup_hierarchy_path, enable_accept_queue_server,
enable_partitioned_aggregation, enable_partitioned_hash_join,
enable_phj_probe_side_filtering, enable_rm, llama_addresses,
llama_callback_port, llama_host, llama_max_request_attempts,
llama_port, llama_registration_timeout_secs,
llama_registration_wait_secs, local_nodemanager_url,
resource_broker_cnxn_attempts, resource_broker_cnxn_retry_interval_ms,
resource_broker_recv_timeout, resource_broker_send_timeout,
rm_always_use_defaults, rm_default_cpu_vcores, rm_default_memory,
rpc_cnxn_attempts, rpc_cnxn_retry_interval_ms, staging_cgroup,
suppress_unknown_disk_id_warnings, use_statestore,

Testing:
Ran "start-impala-cluster.py --impalad_args=--enable_rm=true"
and verified that the warning appeared in
logs/cluster/impalad-error.log

Cherry-picks: not for 2.x

Change-Id: Ic9bb4792e1b18840aa85aa5d755a09f2b5461f34
Reviewed-on: http://gerrit.cloudera.org:8080/9173
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/0047f813
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/0047f813
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/0047f813

Branch: refs/heads/master
Commit: 0047f813abc4dc8fb08a15ac8b9d0f582ab8a5f5
Parents: 3151421
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 31 15:13:04 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 9 20:01:33 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc             | 56 +++++++++++++++++++++++++-
 be/src/exec/exec-node.cc                  |  2 -
 be/src/exec/hdfs-scan-node-base.cc        |  3 --
 be/src/exec/partitioned-hash-join-node.cc |  2 -
 be/src/rpc/thrift-server.cc               |  3 --
 be/src/runtime/exec-env.cc                | 17 --------
 be/src/scheduling/query-schedule.cc       |  5 ---
 be/src/service/impala-server.cc           |  5 ---
 be/src/service/impalad-main.cc            |  8 ----
 9 files changed, 54 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 2d832da..8e81d23 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -160,8 +160,6 @@ DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000, "Timeout (milliseconds) s
     "all Kudu operations. This must be a positive value, and there is no way to disable "
     "timeouts.");
 
-DEFINE_bool_hidden(enable_accept_queue_server, true, "Deprecated");
-
 DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "incremental stats the catalog is allowed to serialize per table. "
     "This limit is set as a safety check, to prevent the JVM from "
@@ -194,3 +192,57 @@ DEFINE_string(reserved_words_version, "3.0.0", "Reserved words compatibility ver
     "Reserved words cannot be used as identifiers in SQL. This flag determines the impala"
     " version from which the reserved word list is taken. The value must be one of "
     "[\"2.11.0\", \"3.0.0\"].");
+
+// ++========================++
+// || Startup flag graveyard ||
+// ++========================++
+//
+//                       -----------
+//           -----------/   R I P   ╲
+//          /   R I P   ╲ -----------|-----------
+//          |-----------|           |/   R I P   ╲
+//          |           |   LLAMA   ||-----------|
+//          | Old Aggs  |           ||           |
+//          |           |    --     || Old Joins |
+//          |    --     |           ||           |
+//          |           |           ||    --     |
+//          |           |~.~~.~~.~~~~|           |
+//          ~~.~~.~~.~~~~            |           |
+//                                   ~~.~~.~~.~~~~
+// The flags have no effect but we don't want to prevent Impala from starting when they
+// are provided on the command line after an upgrade. We issue a warning if the flag is
+// set from the command line.
+#define REMOVED_FLAG(flagname) \
+  DEFINE_string_hidden(flagname, "__UNSET__", "Removed"); \
+  DEFINE_validator(flagname, [](const char* name, const string& val) { \
+      if (val != "__UNSET__") LOG(WARNING) << "Ignoring removed flag " << name; \
+      return true; \
+    });
+
+REMOVED_FLAG(be_service_threads);
+REMOVED_FLAG(cgroup_hierarchy_path);
+REMOVED_FLAG(enable_accept_queue_server);
+REMOVED_FLAG(enable_partitioned_aggregation);
+REMOVED_FLAG(enable_partitioned_hash_join);
+REMOVED_FLAG(enable_phj_probe_side_filtering);
+REMOVED_FLAG(enable_rm);
+REMOVED_FLAG(llama_addresses);
+REMOVED_FLAG(llama_callback_port);
+REMOVED_FLAG(llama_host);
+REMOVED_FLAG(llama_max_request_attempts);
+REMOVED_FLAG(llama_port);
+REMOVED_FLAG(llama_registration_timeout_secs);
+REMOVED_FLAG(llama_registration_wait_secs);
+REMOVED_FLAG(local_nodemanager_url);
+REMOVED_FLAG(resource_broker_cnxn_attempts);
+REMOVED_FLAG(resource_broker_cnxn_retry_interval_ms);
+REMOVED_FLAG(resource_broker_recv_timeout);
+REMOVED_FLAG(resource_broker_send_timeout);
+REMOVED_FLAG(rm_always_use_defaults);
+REMOVED_FLAG(rm_default_cpu_vcores);
+REMOVED_FLAG(rm_default_memory);
+REMOVED_FLAG(rpc_cnxn_attempts);
+REMOVED_FLAG(rpc_cnxn_retry_interval_ms);
+REMOVED_FLAG(staging_cgroup);
+REMOVED_FLAG(suppress_unknown_disk_id_warnings);
+REMOVED_FLAG(use_statestore);

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index eaf8dd1..08856f1 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -70,8 +70,6 @@ using strings::Substitute;
 
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
-DEFINE_bool_hidden(enable_partitioned_hash_join, true, "Deprecated - has no effect");
-DEFINE_bool_hidden(enable_partitioned_aggregation, true, "Deprecated - has no effect");
 
 namespace impala {
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 8b065fa..b40fb7e 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -46,9 +46,6 @@
 
 #include "common/names.h"
 
-// TODO: Remove this flag in a compatibility-breaking release.
-DEFINE_bool(suppress_unknown_disk_id_warnings, false, "Deprecated.");
-
 #ifndef NDEBUG
 DECLARE_bool(skip_file_runtime_filtering);
 #endif

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index e76a9ba..dd3efd1 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -39,8 +39,6 @@
 
 #include "common/names.h"
 
-DEFINE_bool_hidden(enable_phj_probe_side_filtering, true, "Deprecated.");
-
 static const string PREPARE_FOR_READ_FAILED_ERROR_MSG =
     "Failed to acquire initial read buffer for stream in hash join node $0. Reducing "
     "query concurrency or increasing the memory limit may help this query to complete "

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index 48fb1b9..8f948cd 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -58,9 +58,6 @@ using namespace apache::thrift::server;
 using namespace apache::thrift::transport;
 using namespace apache::thrift;
 
-DEFINE_int32_hidden(rpc_cnxn_attempts, 10, "Deprecated");
-DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
-
 DECLARE_string(principal);
 DECLARE_string(keytab_file);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 1c3ab7a..ff6a374 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -70,7 +70,6 @@ using boost::algorithm::join;
 using kudu::rpc::ServiceIf;
 using namespace strings;
 
-DEFINE_bool_hidden(use_statestore, true, "Deprecated, do not use");
 DEFINE_string(catalog_service_host, "localhost",
     "hostname where CatalogService is running");
 DEFINE_bool(enable_webserver, true, "If true, debug webserver is enabled");
@@ -103,22 +102,6 @@ DECLARE_bool(is_coordinator);
 DECLARE_int32(webserver_port);
 DECLARE_int64(tcmalloc_max_total_thread_cache_bytes);
 
-// TODO: Remove the following RM-related flags in Impala 3.0.
-DEFINE_bool_hidden(enable_rm, false, "Deprecated");
-DEFINE_int32_hidden(llama_callback_port, 28000, "Deprecated");
-DEFINE_string_hidden(llama_host, "", "Deprecated");
-DEFINE_int32_hidden(llama_port, 15000, "Deprecated");
-DEFINE_string_hidden(llama_addresses, "", "Deprecated");
-DEFINE_int64_hidden(llama_registration_timeout_secs, 30, "Deprecated");
-DEFINE_int64_hidden(llama_registration_wait_secs, 3, "Deprecated");
-DEFINE_int64_hidden(llama_max_request_attempts, 5, "Deprecated");
-DEFINE_string_hidden(cgroup_hierarchy_path, "", "Deprecated");
-DEFINE_string_hidden(staging_cgroup, "impala_staging", "Deprecated");
-DEFINE_int32_hidden(resource_broker_cnxn_attempts, 1, "Deprecated");
-DEFINE_int32_hidden(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated");
-DEFINE_int32_hidden(resource_broker_send_timeout, 0, "Deprecated");
-DEFINE_int32_hidden(resource_broker_recv_timeout, 0, "Deprecated");
-
 // TODO-MT: rename or retire
 DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to "
     "start fragments on remote Impala daemons.");

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index f833273..a424f78 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -35,11 +35,6 @@ using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using namespace impala;
 
-// TODO: Remove for Impala 3.0.
-DEFINE_bool_hidden(rm_always_use_defaults, false, "Deprecated");
-DEFINE_string_hidden(rm_default_memory, "4G", "Deprecated");
-DEFINE_int32_hidden(rm_default_cpu_vcores, 2, "Deprecated");
-
 namespace impala {
 
 QuerySchedule::QuerySchedule(const TUniqueId& query_id,

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ee8405b..9898029 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -118,8 +118,6 @@ DEFINE_int32(hs2_port, 21050, "port on which HiveServer2 client requests are ser
 
 DEFINE_int32(fe_service_threads, 64,
     "number of threads available to serve client requests");
-DEFINE_int32_hidden(be_service_threads, 64,
-    "Deprecated, no longer has any effect. Will be removed in Impala 3.0.");
 DEFINE_string(default_query_options, "", "key=value pair of default query options for"
     " impalad, separated by ','");
 DEFINE_int32(query_log_size, 25, "Number of queries to retain in the query log. If -1, "
@@ -208,9 +206,6 @@ DEFINE_bool(is_executor, true, "If true, this Impala daemon will execute query "
       "milliseconds. Only used for testing.");
 #endif
 
-// TODO: Remove for Impala 3.0.
-DEFINE_string_hidden(local_nodemanager_url, "", "Deprecated");
-
 DECLARE_bool(compact_catalog_topic);
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/impala/blob/0047f813/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index e0049d0..8463467 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -54,7 +54,6 @@ using namespace impala;
 DECLARE_int32(beeswax_port);
 DECLARE_int32(hs2_port);
 DECLARE_int32(be_port);
-DECLARE_bool(enable_rm);
 DECLARE_bool(is_coordinator);
 
 int ImpaladMain(int argc, char** argv) {
@@ -70,13 +69,6 @@ int ImpaladMain(int argc, char** argv) {
   ABORT_IF_ERROR(JniCatalogCacheUpdateIterator::InitJNI());
   InitFeSupport();
 
-  if (FLAGS_enable_rm) {
-    // TODO: Remove in Impala 3.0.
-    LOG(WARNING) << "*****************************************************************";
-    LOG(WARNING) << "Llama support has been deprecated. FLAGS_enable_rm has no effect.";
-    LOG(WARNING) << "*****************************************************************";
-  }
-
   ExecEnv exec_env;
   ABORT_IF_ERROR(exec_env.Init());
   CommonMetrics::InitCommonMetrics(exec_env.metrics());


[2/2] impala git commit: IMPALA-6396: Exchange node's memory usage should include its receiver's

Posted by kw...@apache.org.
IMPALA-6396: Exchange node's memory usage should include its receiver's

A DataStreamRecvr is co-owned by the DataStreamMgr and
an Exchange node. However, the life time of the memory
allocations (e.g. row batches) of a DataStreamRecvr never
exceeds that of its owning Exchange node. Previously, we
used the fragment instance's MemTracker as the parent of
the DataStreamRecvr's MemTracker. This change switches to
using the MemTracker of the owning Exchange node as the
parent tracker of the DataStreamRecvr. This makes it
easier to identify the peak memory usage of the receivers
of different exchange nodes in the runtime profile and
query summary. Most of the exchange node's memory usage
is from its receiver so we don't track the peak memory
usage of the receiver separately.

Sample output from TPCH-Q21:

EXCHANGE_NODE (id=18):(Total: 1s448ms, non-child: 265.818ms, % non-child: 18.35%)
   - ConvertRowBatchTime: 223.895ms
   - PeakMemoryUsage: 10.04 MB (10524943)
   - RowsReturned: 1.27M (1267464)
   - RowsReturnedRate: 875.19 K/sec
  RecvrSide:
    BytesReceived(500.000ms): 0, 1.64 MB, 9.98 MB, 9.98 MB, 10.01 MB, 10.01 MB, 10.01 MB, 31.79 MB, 60.19 MB, 87.84 MB
     - FirstBatchArrivalWaitTime: 0.000ns
     - TotalBytesReceived: 93.07 MB (97594728)
     - TotalGetBatchTime: 1s194ms
       - DataArrivalTimer: 1s183ms
   SenderSide:
      - DeserializeRowBatchTime: 344.343ms
      - NumBatchesAccepted: 3.80K (3796)
      - NumBatchesDeferred: 5 (5)
      - NumEarlySenders: 0 (0)

Testing done: Updated test_observability.py to verify the
peak memory usage of exchange node is not 0.

Change-Id: I8ca3c47d87bfcd221d34565eda1878f3c15d5c45
Reviewed-on: http://gerrit.cloudera.org:8080/9202
Reviewed-by: Michael Ho <kw...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/2e603478
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/2e603478
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/2e603478

Branch: refs/heads/master
Commit: 2e60347868d7e719d80401f9abcbe971e659502b
Parents: 0047f81
Author: Michael Ho <kw...@cloudera.com>
Authored: Thu Feb 1 13:56:31 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Sat Feb 10 03:10:26 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc           |  6 +++---
 be/src/runtime/data-stream-mgr-base.h  |  9 +++++----
 be/src/runtime/data-stream-mgr.cc      | 17 ++++++++---------
 be/src/runtime/data-stream-mgr.h       | 11 ++++++-----
 be/src/runtime/data-stream-test.cc     | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.cc | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.h  | 11 ++++++-----
 tests/query_test/test_observability.py |  2 ++
 8 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 353a59b..cc39382 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -81,9 +81,9 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
-  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state,
-      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
-      FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
+  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(&input_row_desc_,
+      state->fragment_instance_id(), id_, num_senders_,
+      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), mem_tracker());
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h b/be/src/runtime/data-stream-mgr-base.h
index 0e392e3..f9761cb 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -26,6 +26,7 @@
 namespace impala {
 
 class DataStreamRecvrBase;
+class MemTracker;
 class RuntimeProfile;
 class RuntimeState;
 class TRowBatch;
@@ -43,10 +44,10 @@ class DataStreamMgrBase : public CacheLineAligned {
   virtual ~DataStreamMgrBase() { }
 
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) = 0;
+  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) = 0;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc b/be/src/runtime/data-stream-mgr.cc
index 45eee7f..48a819c 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -75,17 +75,16 @@ inline uint32_t DataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-    RuntimeProfile* profile, bool is_merging) {
-  DCHECK(profile != NULL);
+shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(const RowDescriptor* row_desc,
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+    int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
+  DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="
             << fragment_instance_id << ", node=" << dest_node_id;
-  shared_ptr<DataStreamRecvr> recvr(
-      new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size,
-          profile));
+  shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(this, parent_tracker, row_desc,
+      fragment_instance_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
   lock_guard<mutex> l(lock_);
   fragment_recvr_set_.insert(make_pair(fragment_instance_id, dest_node_id));

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 07f7c56..2be6478 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -71,13 +71,14 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/node_id destination;
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Adds a row batch to the recvr identified by fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender instance

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index 07eefd4..75d5ac9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -415,8 +415,8 @@ class DataStreamTest : public DataStreamTestBase<testing::TestWithParam<KrpcSwit
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-        instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, is_merging);
+    info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+        num_senders, buffer_size, is_merging, profile, &tracker_);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, &info);
     } else {
@@ -767,9 +767,8 @@ TEST_P(DataStreamTestThriftOnly, CloseRecvrWhileReferencesRemain) {
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
-      false);
+  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(row_desc_,
+      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_);
 
   // Perform tear down, but keep a reference to the receiver so that it is deleted last
   // (to confirm that the destructor does not access invalid state after tear-down).
@@ -832,8 +831,8 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
   receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
   ReceiverInfo& info = receiver_info_.back();
-  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, DEST_NODE_ID,
+      4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
       &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, &info);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc b/be/src/runtime/krpc-data-stream-mgr.cc
index fabea13..91111dc 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -94,16 +94,15 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
 }
 
 shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
-    RuntimeState* state, const RowDescriptor* row_desc,
-    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
-    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-
+    const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId dest_node_id,
+    int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
   DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="<< finst_id
             << ", node=" << dest_node_id;
-  shared_ptr<KrpcDataStreamRecvr> recvr(
-      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
+  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(this, parent_tracker,
+      row_desc, finst_id, dest_node_id, num_senders, is_merging, buffer_size, profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
   EarlySendersList early_senders_for_recvr;
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h b/be/src/runtime/krpc-data-stream-mgr.h
index 458ebe7..16c0b30 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -236,13 +236,14 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of incoming row
   /// batches for each sender and merges the sorted streams from each sender into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Handler for TransmitData() RPC.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/2e603478/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index e838081..75e5194 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -38,6 +38,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
     assert result.exec_summary[0]['num_rows'] == 5
     assert result.exec_summary[0]['est_num_rows'] == 5
+    assert result.exec_summary[0]['peak_mem'] > 0
 
     for line in result.runtime_profile.split('\n'):
       # The first 'RowsProduced' we find is for the coordinator fragment.
@@ -55,6 +56,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
     assert result.exec_summary[5]['num_rows'] == 25
     assert result.exec_summary[5]['est_num_rows'] == 25
+    assert result.exec_summary[5]['peak_mem'] > 0
 
   @SkipIfS3.hbase
   @SkipIfLocal.hbase