You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/05/01 00:51:53 UTC
[impala] branch master updated: IMPALA-8469: admit_mem_limit for
dedicated coordinator
This is an automated email from the ASF dual-hosted git repository.
tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new d820952 IMPALA-8469: admit_mem_limit for dedicated coordinator
d820952 is described below
commit d820952d86d34ba887c55a09e58b735cbef866c2
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Apr 29 13:49:00 2019 -0700
IMPALA-8469: admit_mem_limit for dedicated coordinator
Refactored to avoid the code duplication that resulted in this bug:
* admit_mem_limit is calculated once in ExecEnv
* The local backend descriptor is always constructed with
a static helper: Scheduler::BuildLocalBackendDescriptor()
I chose to factor it in this way, in part, to avoid invasive
changes to scheduler-test, which currently doesn't depend on
ExecEnv or ImpalaServer.
Testing:
Added basic test that reproduces the bug.
Change-Id: Iaceb21b753b9b021bedc4187c0d44aaa6a626521
Reviewed-on: http://gerrit.cloudera.org:8080/13180
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
be/src/runtime/exec-env.cc | 8 +--
be/src/runtime/exec-env.h | 7 +++
be/src/scheduling/scheduler-test-util.cc | 2 +-
be/src/scheduling/scheduler.cc | 62 ++++++++++++++---------
be/src/scheduling/scheduler.h | 13 +++--
be/src/service/impala-server.cc | 23 ++-------
tests/common/custom_cluster_test_suite.py | 17 +++++--
tests/custom_cluster/test_admission_controller.py | 24 +++++++++
8 files changed, 101 insertions(+), 55 deletions(-)
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index db214f3..37d6b37 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -243,17 +243,17 @@ Status ExecEnv::Init() {
}
// The bytes limit we want to size everything else as a fraction of, excluding the
// JVM.
- int64_t post_jvm_bytes_limit = bytes_limit;
+ admit_mem_limit_ = bytes_limit;
if (FLAGS_mem_limit_includes_jvm) {
// The JVM max heap size is static and therefore known at this point. Other categories
// of JVM memory consumption are much smaller and dynamic so it is simpler not to
// include them here.
- post_jvm_bytes_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
+ admit_mem_limit_ -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
}
bool is_percent;
int64_t buffer_pool_limit = ParseUtil::ParseMemSpec(FLAGS_buffer_pool_limit,
- &is_percent, post_jvm_bytes_limit);
+ &is_percent, admit_mem_limit_);
if (buffer_pool_limit <= 0) {
return Status(Substitute("Invalid --buffer_pool_limit value, must be a percentage or "
"positive bytes value or percentage: $0", FLAGS_buffer_pool_limit));
@@ -338,7 +338,7 @@ Status ExecEnv::Init() {
if (scheduler_ != nullptr) {
RETURN_IF_ERROR(scheduler_->Init(
- configured_backend_address_, krpc_address_, ip_address_));
+ configured_backend_address_, krpc_address_, ip_address_, admit_mem_limit_));
}
RETURN_IF_ERROR(admission_controller_->Init());
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index fc6f41d..c9cbe74 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -169,6 +169,8 @@ class ExecEnv {
Status GetKuduClient(const std::vector<std::string>& master_addrs,
kudu::client::KuduClient** client) WARN_UNUSED_RESULT;
+ int64_t admit_mem_limit() const { return admit_mem_limit_; }
+
private:
boost::scoped_ptr<ObjectPool> obj_pool_;
boost::scoped_ptr<MetricGroup> metrics_;
@@ -253,6 +255,11 @@ class ExecEnv {
/// address lists be identical in order to share a KuduClient.
KuduClientMap kudu_client_map_;
+ /// Return the bytes of memory available for queries to execute with - i.e.
+ /// mem_tracker()->limit() with any overhead that can't be used subtracted out,
+ /// such as the JVM if --mem_limit_includes_jvm=true. Set in Init().
+ int64_t admit_mem_limit_;
+
/// Choose a memory limit (returned in *bytes_limit) based on the --mem_limit flag and
/// the memory available to the daemon process. Returns an error if the memory limit is
/// invalid or another error is encountered that should prevent starting up the daemon.
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 9916cbc..dd98720 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -580,7 +580,7 @@ void SchedulerWrapper::InitializeScheduler() {
scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id,
&metrics_, nullptr, nullptr));
const Status status = scheduler_->Init(scheduler_backend_address,
- scheduler_krpc_address, scheduler_host.ip);
+ scheduler_krpc_address, scheduler_host.ip, /* admit_mem_limit */ 0L);
DCHECK(status.ok()) << "Scheduler init failed in test";
// Initialize the scheduler backend maps.
SendFullMembershipMap();
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 18a5eb6..78a4e98 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -49,6 +49,10 @@ using namespace apache::thrift;
using namespace org::apache::impala::fb;
using namespace strings;
+DECLARE_bool(is_coordinator);
+DECLARE_bool(is_executor);
+DECLARE_bool(mem_limit_includes_jvm);
+
namespace impala {
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -68,19 +72,13 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
}
Status Scheduler::Init(const TNetworkAddress& backend_address,
- const TNetworkAddress& krpc_address, const IpAddr& ip) {
+ const TNetworkAddress& krpc_address, const IpAddr& ip,
+ int64_t admit_mem_limit) {
LOG(INFO) << "Starting scheduler";
- local_backend_descriptor_.address = backend_address;
- // Store our IP address so that each subscriber doesn't have to resolve
- // it on every heartbeat. May as well do it up front to avoid frequent DNS
- // requests.
- local_backend_descriptor_.ip_address = ip;
- LOG(INFO) << "Scheduler using " << ip << " as IP address";
- // KRPC relies on resolved IP address.
- DCHECK(IsResolvedAddress(krpc_address));
- DCHECK_EQ(krpc_address.hostname, ip);
- local_backend_descriptor_.__set_krpc_address(krpc_address);
-
+ local_backend_descriptor_ = BuildLocalBackendDescriptor(webserver_, backend_address,
+ krpc_address, ip, admit_mem_limit);
+ LOG(INFO) << "Scheduler using " << local_backend_descriptor_.ip_address
+ << " as IP address";
coord_only_backend_config_.AddBackend(local_backend_descriptor_);
if (statestore_subscriber_ != nullptr) {
@@ -105,20 +103,35 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
initialized_ = metrics_->AddProperty(SCHEDULER_INIT_KEY, true);
num_fragment_instances_metric_ = metrics_->AddGauge(NUM_BACKENDS_KEY, num_backends);
}
+ return Status::OK();
+}
- if (statestore_subscriber_ != nullptr) {
- if (webserver_ != nullptr) {
- const TNetworkAddress& webserver_address = webserver_->http_address();
- if (IsWildcardAddress(webserver_address.hostname)) {
- local_backend_descriptor_.__set_debug_http_address(
- MakeNetworkAddress(ip, webserver_address.port));
- } else {
- local_backend_descriptor_.__set_debug_http_address(webserver_address);
- }
- local_backend_descriptor_.__set_secure_webserver(webserver_->IsSecure());
+TBackendDescriptor Scheduler::BuildLocalBackendDescriptor(
+ Webserver* webserver, const TNetworkAddress& backend_address,
+ const TNetworkAddress& krpc_address, const IpAddr& ip, int64_t admit_mem_limit) {
+ TBackendDescriptor local_backend_descriptor;
+ local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
+ local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
+ local_backend_descriptor.__set_address(backend_address);
+ // Store our IP address so that each subscriber doesn't have to resolve
+ // it on every heartbeat. May as well do it up front to avoid frequent DNS
+ // requests.
+ local_backend_descriptor.__set_ip_address(ip);
+ local_backend_descriptor.__set_admit_mem_limit(admit_mem_limit);
+ DCHECK(IsResolvedAddress(krpc_address)) << "KRPC relies on resolved IP address.";
+ DCHECK_EQ(krpc_address.hostname, local_backend_descriptor.ip_address);
+ local_backend_descriptor.__set_krpc_address(krpc_address);
+ if (webserver != nullptr) {
+ const TNetworkAddress& webserver_address = webserver->http_address();
+ if (IsWildcardAddress(webserver_address.hostname)) {
+ local_backend_descriptor.__set_debug_http_address(
+ MakeNetworkAddress(ip, webserver_address.port));
+ } else {
+ local_backend_descriptor.__set_debug_http_address(webserver_address);
}
+ local_backend_descriptor.__set_secure_webserver(webserver->IsSecure());
}
- return Status::OK();
+ return local_backend_descriptor;
}
void Scheduler::UpdateLocalBackendAddrForBeTest() {
@@ -229,9 +242,8 @@ const TBackendDescriptor& Scheduler::LookUpBackendDesc(
const BackendConfig& executor_config, const TNetworkAddress& host) {
const TBackendDescriptor* desc = executor_config.LookUpBackendDesc(host);
if (desc == nullptr) {
- // Local host may not be in executor_config if it's a dedicated coordinator.
+ // Local host may not be in executor_config if it's a dedicated coordinator
DCHECK(host == local_backend_descriptor_.address);
- DCHECK(!local_backend_descriptor_.is_executor);
desc = &local_backend_descriptor_;
}
return *desc;
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 241a8cb..940b1c2 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -85,9 +85,10 @@ class Scheduler {
/// on failure. 'backend_address' is the address of thrift based ImpalaInternalService
/// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port
/// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved
- /// IP address of this backend.
- Status Init(const TNetworkAddress& backend_address,
- const TNetworkAddress& krpc_address, const IpAddr& ip);
+ /// IP address of this backend. 'admit_mem_limit' is the ExecEnv::admit_mem_limit()
+ /// value or a dummy value provided by scheduler tests.
+ Status Init(const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
+ const IpAddr& ip, int64_t admit_mem_limit);
/// Test helper that updates the local backend address to reflect whatever
/// ephemeral port was assigned during server startup. Should only be called
@@ -99,6 +100,12 @@ class Scheduler {
/// ranges in the query exec request.
Status Schedule(QuerySchedule* schedule);
+ /// Build a backend descriptor for this Impala daemon. Fills out all metadata using
+ /// the provided arguments, except does not set 'is_quiescing'.
+ static TBackendDescriptor BuildLocalBackendDescriptor(Webserver* webserver,
+ const TNetworkAddress& backend_address, const TNetworkAddress& krpc_address,
+ const IpAddr& ip, int64_t admit_mem_limit);
+
private:
/// Map from a host's IP address to the next executor to be round-robin scheduled for
/// that host (needed for setups with multiple executors on a single host)
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4b50455..863ff16 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -57,7 +57,6 @@
#include "runtime/coordinator.h"
#include "runtime/exec-env.h"
#include "runtime/lib-cache.h"
-#include "runtime/mem-tracker.h"
#include "runtime/timestamp-value.h"
#include "runtime/timestamp-value.inline.h"
#include "runtime/tmp-file-mgr.h"
@@ -75,7 +74,6 @@
#include "util/histogram-metric.h"
#include "util/impalad-metrics.h"
#include "util/lineage-util.h"
-#include "util/memory-metrics.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/parse-util.h"
@@ -125,7 +123,6 @@ DECLARE_string(authorized_proxy_group_config);
DECLARE_string(authorized_proxy_group_config_delimiter);
DECLARE_bool(abort_on_config_error);
DECLARE_bool(disk_spill_encryption);
-DECLARE_bool(mem_limit_includes_jvm);
DECLARE_bool(use_local_catalog);
DEFINE_int32(beeswax_port, 21000, "port on which Beeswax client requests are served."
@@ -1890,23 +1887,11 @@ void ImpalaServer::AddLocalBackendToStatestore(
return;
}
- TBackendDescriptor local_backend_descriptor;
- local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator);
- local_backend_descriptor.__set_is_executor(FLAGS_is_executor);
- local_backend_descriptor.__set_address(exec_env_->GetThriftBackendAddress());
- local_backend_descriptor.ip_address = exec_env_->ip_address();
- int64_t admit_mem_limit = exec_env_->process_mem_tracker()->limit();
- if (FLAGS_mem_limit_includes_jvm) {
- // Memory used by JVM heap cannot be admitted to. Other categories of JVM memory
- // consumption are much smaller and dynamic so it is simpler not to
- // include them here.
- admit_mem_limit -= JvmMemoryMetric::HEAP_MAX_USAGE->GetValue();
- }
- local_backend_descriptor.__set_admit_mem_limit(admit_mem_limit);
+ TBackendDescriptor local_backend_descriptor =
+ Scheduler::BuildLocalBackendDescriptor(exec_env_->webserver(),
+ exec_env_->GetThriftBackendAddress(), exec_env_->krpc_address(),
+ exec_env_->ip_address(), exec_env_->admit_mem_limit());
local_backend_descriptor.__set_is_quiescing(is_quiescing);
- const TNetworkAddress& krpc_address = exec_env_->krpc_address();
- DCHECK(IsResolvedAddress(krpc_address));
- local_backend_descriptor.__set_krpc_address(krpc_address);
subscriber_topic_updates->emplace_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index acba2c4..d221b43 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -48,6 +48,7 @@ CLUSTER_SIZE = "cluster_size"
# other impala daemon arguments to allow merging multiple defaults into a single list.
DEFAULT_QUERY_OPTIONS = 'default_query_options'
IMPALA_LOG_DIR = 'impala_log_dir'
+NUM_EXCLUSIVE_COORDINATORS = 'num_exclusive_coordinators'
# Run with fast topic updates by default to reduce time to first query running.
DEFAULT_STATESTORE_ARGS = '--statestore_update_frequency_ms=50 \
@@ -99,7 +100,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
@staticmethod
def with_args(impalad_args=None, statestored_args=None, catalogd_args=None,
start_args=None, sentry_config=None, default_query_options=None,
- impala_log_dir=None, sentry_log_dir=None, cluster_size=None):
+ impala_log_dir=None, sentry_log_dir=None, cluster_size=None,
+ num_exclusive_coordinators=None):
"""Records arguments to be passed to a cluster by adding them to the decorated
method's func_dict"""
def decorate(func):
@@ -124,6 +126,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
func.func_dict[IMPALA_LOG_DIR] = impala_log_dir
if cluster_size is not None:
func.func_dict[CLUSTER_SIZE] = cluster_size
+ if num_exclusive_coordinators is not None:
+ func.func_dict[NUM_EXCLUSIVE_COORDINATORS] = num_exclusive_coordinators
return func
return decorate
@@ -143,12 +147,19 @@ class CustomClusterTestSuite(ImpalaTestSuite):
if CLUSTER_SIZE in method.func_dict:
cluster_size = method.func_dict[CLUSTER_SIZE]
+ use_exclusive_coordinators = False
+ num_coordinators = cluster_size
+ if NUM_EXCLUSIVE_COORDINATORS in method.func_dict:
+ num_coordinators = method.func_dict[NUM_EXCLUSIVE_COORDINATORS]
+ use_exclusive_coordinators = True
+
# Start a clean new cluster before each test
kwargs = {
"cluster_size": cluster_size,
- "num_coordinators": cluster_size,
+ "num_coordinators": num_coordinators,
"expected_num_executors": cluster_size,
- "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS)
+ "default_query_options": method.func_dict.get(DEFAULT_QUERY_OPTIONS),
+ "use_exclusive_coordinators": use_exclusive_coordinators
}
if IMPALA_LOG_DIR in method.func_dict:
kwargs["impala_log_dir"] = method.func_dict[IMPALA_LOG_DIR]
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 86805ee..2101088 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -486,6 +486,30 @@ class TestAdmissionController(TestAdmissionControllerBase, HS2TestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
+ impalad_args=impalad_admission_ctrl_flags(max_requests=1, max_queued=1,
+ pool_max_mem=10 * PROC_MEM_TEST_LIMIT, proc_mem_limit=PROC_MEM_TEST_LIMIT),
+ num_exclusive_coordinators=1)
+ def test_mem_limit_dedicated_coordinator(self, vector):
+ """Regression test for IMPALA-8469: coordinator fragment should be admitted on
+ dedicated coordinator"""
+ query = "select * from functional.alltypesagg limit 1"
+ exec_options = vector.get_value('exec_option')
+ # Test both single-node and distributed plans
+ for num_nodes in [0, 1]:
+ # Memory just fits in memory limits
+ exec_options['mem_limit'] = self.PROC_MEM_TEST_LIMIT
+ exec_options['num_nodes'] = num_nodes
+ self.execute_query_expect_success(self.client, query, exec_options)
+
+ # A bit too much memory to run on coordinator.
+ exec_options['mem_limit'] = long(self.PROC_MEM_TEST_LIMIT * 1.1)
+ ex = self.execute_query_expect_failure(self.client, query, exec_options)
+ assert ("Rejected query from pool default-pool: request memory needed "
+ "1.10 GB per node is greater than memory available for admission 1.00 GB" in
+ str(ex)), str(ex)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
impalad_args=impalad_admission_ctrl_flags(max_requests=2, max_queued=1,
pool_max_mem=10 * PROC_MEM_TEST_LIMIT,
queue_wait_timeout_ms=2 * STATESTORE_RPC_FREQUENCY_MS),