You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/05/01 00:51:53 UTC

[impala] branch master updated: IMPALA-8469: admit_mem_limit for dedicated coordinator

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new d820952  IMPALA-8469: admit_mem_limit for dedicated coordinator
d820952 is described below

commit d820952d86d34ba887c55a09e58b735cbef866c2
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Apr 29 13:49:00 2019 -0700

    IMPALA-8469: admit_mem_limit for dedicated coordinator
    
    Refactored to avoid the code duplication that resulted in this bug:
    * admit_mem_limit is calculated once in ExecEnv
    * The local backend descriptor is always constructed with
      a static helper: Scheduler::BuildLocalBackendDescriptor()
    
    I chose to factor it in this way, in part, to avoid invasive
    changes to scheduler-test, which currently doesn't depend on
    ExecEnv or ImpalaServer.
    
    Testing:
    Added basic test that reproduces the bug.
    
    Change-Id: Iaceb21b753b9b021bedc4187c0d44aaa6a626521
    Reviewed-on: http://gerrit.cloudera.org:8080/13180
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/exec-env.cc                        |  8 +--
 be/src/runtime/exec-env.h                         |  7 +++
 be/src/scheduling/scheduler-test-util.cc          |  2 +-
 be/src/scheduling/scheduler.cc                    | 62 ++++++++++++++---------
 be/src/scheduling/scheduler.h                     | 13 +++--
 be/src/service/impala-server.cc                   | 23 ++-------
 tests/common/custom_cluster_test_suite.py         | 17 +++++--
 tests/custom_cluster/test_admission_controller.py | 24 +++++++++
 8 files changed, 101 insertions(+), 55 deletions(-)

diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index db214f3..37d6b37 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -243,17 +243,17 @@ Status ExecEnv::Init() {
   }
   // The bytes limit we want to size everything else as a fraction of, excluding the
   // JVM.
-  int64_t post_jvm_bytes_limit = bytes_limit;
+  admit_mem_limit_ = bytes_limit;
   if (FLAGS_mem_limit_includes_jvm) {
     // The JVM max heap size is static and therefore known at this point. Other categories
     // of JVM memory consumption are much smaller and dynamic so it is simpler not to
     // include them here.
-    post_jvm_bytes_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
+    admit_mem_limit_ -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
   }
 
   bool is_percent;
   int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
-      &is_percent, post_jvm_bytes_limit);
+      &is_percent, admit_mem_limit_);
   if (buffer_pool_limit <= 0) {
     return Status(Substitute("Invalid --buffer_pool_limit value, must be a percentage or "
           "positive bytes value or percentage: $0", FLAGS_buffer_pool_limit));
@@ -338,7 +338,7 @@ Status ExecEnv::Init() {
 
   if (scheduler_ != nullptr) {
     RETURN_IF_ERROR(scheduler_->Init(
-          configured_backend_address_, krpc_address_, ip_address_));
+          configured_backend_address_, krpc_address_, ip_address_, admit_mem_limit_));
   }
   RETURN_IF_ERROR(admission_controller_->Init());
 
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index fc6f41d..c9cbe74 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -169,6 +169,8 @@ class ExecEnv {
   Status GetKuduClient(const std::vector<std::string>& master_addrs,
       kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
 
+  int64_t admit_mem_limit() const { return admit_mem_limit_; }
+
  private:
   boost::scoped_ptr<ObjectPool> obj_pool_;
   boost::scoped_ptr<MetricGroup> metrics_;
@@ -253,6 +255,11 @@ class ExecEnv {
   /// address lists be identical in order to share a KuduClient.
   KuduClientMap kudu_client_map_;
 
+  /// Return the bytes of memory available for queries to execute with - i.e.
+  /// mem_tracker()->limit() with any overhead that can't be used subtracted out,
+  /// such as the JVM if --mem_limit_includes_jvm=true. Set in Init().
+  int64_t admit_mem_limit_;
+
   /// Choose a memory limit (returned in *bytes_limit) based on the --mem_limit flag and
   /// the memory available to the daemon process. Returns an error if the memory limit is
   /// invalid or another error is encountered that should prevent starting up the daemon.
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 9916cbc..dd98720 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -580,7 +580,7 @@ void SchedulerWrapper::InitializeScheduler() {
   scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
       &metrics_, nullptr, nullptr));
   const Status status = scheduler_->Init(scheduler_backend_address,
-      scheduler_krpc_address, scheduler_host.ip);
+      scheduler_krpc_address, scheduler_host.ip, /* admit_mem_limit */ 0L);
   DCHECK(status.ok()) << "Scheduler init failed in test";
   // Initialize the scheduler backend maps.
   SendFullMembershipMap();
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 18a5eb6..78a4e98 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -49,6 +49,10 @@ using namespace apache::thrift;
 using namespace org::apache::impala::fb;
 using namespace strings;
 
+DECLARE_bool(is_coordinator);
+DECLARE_bool(is_executor);
+DECLARE_bool(mem_limit_includes_jvm);
+
 namespace impala {
 
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -68,19 +72,13 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
 }
 
 Status Scheduler::Init(const TNetworkAddress& backend_address,
-    const TNetworkAddress& krpc_address, const IpAddr& ip) {
+    const TNetworkAddress& krpc_address, const IpAddr& ip,
+    int64_t admit_mem_limit) {
   LOG(INFO) << "Starting scheduler";
-  local_backend_descriptor_.address = backend_address;
-  // Store our IP address so that each subscriber doesn't have to resolve
-  // it on every heartbeat. May as well do it up front to avoid frequent DNS
-  // requests.
-  local_backend_descriptor_.ip_address = ip;
-  LOG(INFO) << "Scheduler using " << ip << " as IP address";
-  // KRPC relies on resolved IP address.
-  DCHECK(IsResolvedAddress(krpc_address));
-  DCHECK_EQ(krpc_address.hostname, ip);
-  local_backend_descriptor_.__set_krpc_address(krpc_address);
-
+  local_backend_descriptor_ = BuildLocalBackendDescriptor(webserver_, backend_address,
+      krpc_address, ip, admit_mem_limit);
+  LOG(INFO) << "Scheduler using " << local_backend_descriptor_.ip_address
+            << " as IP address";
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
   if (statestore_subscriber_ != nullptr) {
@@ -105,20 +103,35 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
     initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
     num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
   }
+  return Status::OK();
+}
 
-  if (statestore_subscriber_ != nullptr) {
-    if (webserver_ != nullptr) {
-      const TNetworkAddress& webserver_address = webserver_->http_address();
-      if (IsWildcardAddress(webserver_address.hostname)) {
-        local_backend_descriptor_.__set_debug_http_address(
-            MakeNetworkAddress(ip, webserver_address.port));
-      } else {
-        local_backend_descriptor_.__set_debug_http_address(webserver_address);
-      }
-      local_backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
+TBackendDescriptor Scheduler::BuildLocalBackendDescriptor(
+    Webserver* webserver, const TNetworkAddress& backend_address,
+    const TNetworkAddress& krpc_address, const IpAddr& ip, int64_t admit_mem_limit) {
+  TBackendDescriptor local_backend_descriptor;
+  local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
+  local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
+  local_backend_descriptor.__set_address(backend_address);
+  // Store our IP address so that each subscriber doesn't have to resolve
+  // it on every heartbeat. May as well do it up front to avoid frequent DNS
+  // requests.
+  local_backend_descriptor.__set_ip_address(ip);
+  local_backend_descriptor.__set_admit_mem_limit(admit_mem_limit);
+  DCHECK(IsResolvedAddress(krpc_address)) << "KRPC relies on resolved IP address.";
+  DCHECK_EQ(krpc_address.hostname, local_backend_descriptor.ip_address);
+  local_backend_descriptor.__set_krpc_address(krpc_address);
+  if (webserver != nullptr) {
+    const TNetworkAddress& webserver_address = webserver->http_address();
+    if (IsWildcardAddress(webserver_address.hostname)) {
+      local_backend_descriptor.__set_debug_http_address(
+          MakeNetworkAddress(ip, webserver_address.port));
+    } else {
+      local_backend_descriptor.__set_debug_http_address(webserver_address);
     }
+    local_backend_descriptor.__set_secure_webserver(webserver->IsSecure());
   }
-  return Status::OK();
+  return local_backend_descriptor;
 }
 
 void Scheduler::UpdateLocalBackendAddrForBeTest() {
@@ -229,9 +242,8 @@ const TBackendDescriptor& Scheduler::LookUpBackendDesc(
     const BackendConfig& executor_config, const TNetworkAddress& host) {
   const TBackendDescriptor* desc = executor_config.LookUpBackendDesc(host);
   if (desc == nullptr) {
-    // Local host may not be in executor_config if it's a dedicated coordinator.
+    // Local host may not be in executor_config if it's a dedicated coordinator
     DCHECK(host == local_backend_descriptor_.address);
-    DCHECK(!local_backend_descriptor_.is_executor);
     desc = &local_backend_descriptor_;
   }
   return *desc;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 241a8cb..940b1c2 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -85,9 +85,10 @@ class Scheduler {
   /// on failure. 'backend_address' is the address of thrift based ImpalaInternalService
   /// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port
   /// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved
-  /// IP address of this backend.
-  Status Init(const TNetworkAddress& backend_address,
-      const TNetworkAddress& krpc_address, const IpAddr& ip);
+  /// IP address of this backend. 'admit_mem_limit' is the ExecEnv::admit_mem_limit()
+  /// value or a dummy value provided by scheduler tests.
+  Status Init(const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
+      const IpAddr& ip, int64_t admit_mem_limit);
 
   /// Test helper that updates the local backend address to reflect whatever
   /// ephemeral port was assigned during server startup. Should only be called
@@ -99,6 +100,12 @@ class Scheduler {
   /// ranges in the query exec request.
   Status Schedule(QuerySchedule* schedule);
 
+  /// Build a backend descriptor for this Impala daemon. Fills out all metadata using
+  /// the provided arguments, except does not set 'is_quiescing'.
+  static TBackendDescriptor BuildLocalBackendDescriptor(Webserver* webserver,
+      const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
+      const IpAddr& ip, int64_t admit_mem_limit);
+
  private:
   /// Map from a host's IP address to the next executor to be round-robin scheduled for
   /// that host (needed for setups with multiple executors on a single host)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4b50455..863ff16 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,7 +57,6 @@
 #include "runtime/coordinator.h"
 #include "runtime/exec-env.h"
 #include "runtime/lib-cache.h"
-#include "runtime/mem-tracker.h"
 #include "runtime/timestamp-value.h"
 #include "runtime/timestamp-value.inline.h"
 #include "runtime/tmp-file-mgr.h"
@@ -75,7 +74,6 @@
 #include "util/histogram-metric.h"
 #include "util/impalad-metrics.h"
 #include "util/lineage-util.h"
-#include "util/memory-metrics.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/parse-util.h"
@@ -125,7 +123,6 @@ DECLARE_string(authorized_proxy_group_config);
 DECLARE_string(authorized_proxy_group_config_delimiter);
 DECLARE_bool(abort_on_config_error);
 DECLARE_bool(disk_spill_encryption);
-DECLARE_bool(mem_limit_includes_jvm);
 DECLARE_bool(use_local_catalog);
 
 DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
@@ -1890,23 +1887,11 @@ void ImpalaServer::AddLocalBackendToStatestore(
     return;
   }
 
-  TBackendDescriptor local_backend_descriptor;
-  local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
-  local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
-  local_backend_descriptor.__set_address(exec_env_->GetThriftBackendAddress());
-  local_backend_descriptor.ip_address = exec_env_->ip_address();
-  int64_t admit_mem_limit = exec_env_->process_mem_tracker()->limit();
-  if (FLAGS_mem_limit_includes_jvm) {
-    // Memory used by JVM heap cannot be admitted to. Other categories of JVM memory
-    // consumption are much smaller and dynamic so it is simpler not to
-    // include them here.
-    admit_mem_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
-  }
-  local_backend_descriptor.__set_admit_mem_limit(admit_mem_limit);
+  TBackendDescriptor local_backend_descriptor =
+      Scheduler::BuildLocalBackendDescriptor(exec_env_->webserver(),
+          exec_env_->GetThriftBackendAddress(), exec_env_->krpc_address(),
+          exec_env_->ip_address(), exec_env_->admit_mem_limit());
   local_backend_descriptor.__set_is_quiescing(is_quiescing);
-  const TNetworkAddress& krpc_address = exec_env_->krpc_address();
-  DCHECK(IsResolvedAddress(krpc_address));
-  local_backend_descriptor.__set_krpc_address(krpc_address);
 
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index acba2c4..d221b43 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -48,6 +48,7 @@ CLUSTER_SIZE = "cluster_size"
 # other impala daemon arguments to allow merging multiple defaults into a single list.
 DEFAULT_QUERY_OPTIONS = 'default_query_options'
 IMPALA_LOG_DIR = 'impala_log_dir'
+NUM_EXCLUSIVE_COORDINATORS = 'num_exclusive_coordinators'
 
 # Run with fast topic updates by default to reduce time to first query running.
 DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
@@ -99,7 +100,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
   @staticmethod
   def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
       start_args=None, sentry_config=None, default_query_options=None,
-      impala_log_dir=None, sentry_log_dir=None, cluster_size=None):
+      impala_log_dir=None, sentry_log_dir=None, cluster_size=None,
+      num_exclusive_coordinators=None):
     """Records arguments to be passed to a cluster by adding them to the decorated
     method's func_dict"""
     def decorate(func):
@@ -124,6 +126,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
         func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
       if cluster_size is not None:
         func.func_dict[CLUSTER_SIZE] = cluster_size
+      if num_exclusive_coordinators is not None:
+        func.func_dict[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
       return func
     return decorate
 
@@ -143,12 +147,19 @@ class CustomClusterTestSuite(ImpalaTestSuite):
     if CLUSTER_SIZE in method.func_dict:
       cluster_size = method.func_dict[CLUSTER_SIZE]
 
+    use_exclusive_coordinators = False
+    num_coordinators = cluster_size
+    if NUM_EXCLUSIVE_COORDINATORS in method.func_dict:
+      num_coordinators = method.func_dict[NUM_EXCLUSIVE_COORDINATORS]
+      use_exclusive_coordinators = True
+
     # Start a clean new cluster before each test
     kwargs = {
       "cluster_size": cluster_size,
-      "num_coordinators": cluster_size,
+      "num_coordinators": num_coordinators,
       "expected_num_executors": cluster_size,
-      "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS)
+      "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
+      "use_exclusive_coordinators": use_exclusive_coordinators
     }
     if IMPALA_LOG_DIR in method.func_dict:
       kwargs["impala_log_dir"] = method.func_dict[IMPALA_LOG_DIR]
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 86805ee..2101088 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -486,6 +486,30 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
+      impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+          pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT),
+      num_exclusive_coordinators=1)
+  def test_mem_limit_dedicated_coordinator(self, vector):
+    """Regression test for IMPALA-8469: coordinator fragment should be admitted on
+    dedicated coordinator"""
+    query = "select * from functional.alltypesagg limit 1"
+    exec_options = vector.get_value('exec_option')
+    # Test both single-node and distributed plans
+    for num_nodes in [0, 1]:
+      # Memory just fits in memory limits
+      exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
+      exec_options['num_nodes'] = num_nodes
+      self.execute_query_expect_success(self.client, query, exec_options)
+
+      # A bit too much memory to run on coordinator.
+      exec_options['mem_limit'] = long(self.PROC_MEM_TEST_LIMIT * 1.1)
+      ex = self.execute_query_expect_failure(self.client, query, exec_options)
+      assert ("Rejected query from pool default-pool: request memory needed "
+              "1.10 GB per node is greater than memory available for admission 1.00 GB" in
+              str(ex)), str(ex)
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
       impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
       pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
       queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),