You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/09/21 04:00:41 UTC
[4/7] incubator-impala git commit: IMPALA-4160: Remove Llama support.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 089ada1..0f93875 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -635,7 +635,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
ASSERT_OK(io_mgr.Init(&root_mem_tracker));
- MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+ MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
@@ -950,7 +950,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
ASSERT_OK(io_mgr.Init(&root_mem_tracker));
ASSERT_EQ(root_mem_tracker.consumption(), 0);
- MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+ MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
DiskIoRequestContext* reader;
ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 5df69ed..dd45932 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -370,9 +370,9 @@ DiskIoMgr::~DiskIoMgr() {
Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
DCHECK(process_mem_tracker != NULL);
free_buffer_mem_tracker_.reset(
- new MemTracker(-1, -1, "Free Disk IO Buffers", process_mem_tracker, false));
+ new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
unowned_buffer_mem_tracker_.reset(
- new MemTracker(-1, -1, "Untracked Disk IO Buffers", process_mem_tracker, false));
+ new MemTracker(-1, "Untracked Disk IO Buffers", process_mem_tracker, false));
// If we hit the process limit, see if we can reclaim some memory by removing
// previously allocated (but unused) io buffers.
process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 38e9f96..1b3fa14 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -24,7 +24,6 @@
#include <gutil/strings/substitute.h>
#include "common/logging.h"
-#include "resourcebroker/resource-broker.h"
#include "runtime/backend-client.h"
#include "runtime/client-cache.h"
#include "runtime/coordinator.h"
@@ -51,7 +50,6 @@
#include "util/webserver.h"
#include "util/mem-info.h"
#include "util/debug-util.h"
-#include "util/cgroups-mgr.h"
#include "util/memory-metrics.h"
#include "util/pretty-printer.h"
#include "util/thread-pool.h"
@@ -84,49 +82,21 @@ DECLARE_int32(num_cores);
DECLARE_int32(be_port);
DECLARE_string(mem_limit);
-DEFINE_bool(enable_rm, false, "Whether to enable resource management. If enabled, "
- "-fair_scheduler_allocation_path is required.");
-DEFINE_int32(llama_callback_port, 28000,
- "Port where Llama notification callback should be started");
-// TODO: Deprecate llama_host and llama_port in favor of the new llama_hostports.
-// This needs to be coordinated with CM.
-DEFINE_string(llama_host, "",
- "Host of Llama service that the resource broker should connect to");
-DEFINE_int32(llama_port, 15000,
- "Port of Llama service that the resource broker should connect to");
-DEFINE_string(llama_addresses, "",
- "Llama availability group given as a comma-separated list of hostports.");
-DEFINE_int64(llama_registration_timeout_secs, 30,
- "Maximum number of seconds that Impala will attempt to (re-)register "
- "with Llama before aborting the triggering action with an error "
- "(e.g. Impalad startup or a Llama RPC request). "
- "A setting of -1 means try indefinitely.");
-DEFINE_int64(llama_registration_wait_secs, 3,
- "Number of seconds to wait between attempts during Llama registration.");
-DEFINE_int64(llama_max_request_attempts, 5,
- "Maximum number of times a non-registration Llama RPC request "
- "(reserve/expand/release, etc.) is retried until the request is aborted. "
- "An attempt is counted once Impala is registered with Llama, i.e., a "
- "request survives at most llama_max_request_attempts-1 re-registrations.");
-DEFINE_string(cgroup_hierarchy_path, "", "If Resource Management is enabled, this must "
- "be set to the Impala-writeable root of the cgroups hierarchy into which execution "
- "threads are assigned.");
-DEFINE_string(staging_cgroup, "impala_staging", "Name of the cgroup that a query's "
- "execution threads are moved into once the query completes.");
-
-// Use a low default value because the reconnection logic is performed manually
-// for the purpose of faster Llama failover (otherwise we may try to reconnect to the
-// inactive Llama for a long time).
-DEFINE_int32(resource_broker_cnxn_attempts, 1, "The number of times to retry an "
- "RPC connection to Llama. A setting of 0 means retry indefinitely");
-DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
- "to wait between attempts to make an RPC connection to the Llama.");
-DEFINE_int32(resource_broker_send_timeout, 0, "Time to wait, in ms, "
- "for the underlying socket of an RPC to Llama to successfully send data. "
- "A setting of 0 means the socket will wait indefinitely.");
-DEFINE_int32(resource_broker_recv_timeout, 0, "Time to wait, in ms, "
- "for the underlying socket of an RPC to Llama to successfully receive data. "
- "A setting of 0 means the socket will wait indefinitely.");
+// TODO: Remove the following RM-related flags in Impala 3.0.
+DEFINE_bool(enable_rm, false, "Deprecated");
+DEFINE_int32(llama_callback_port, 28000, "Deprecated");
+DEFINE_string(llama_host, "", "Deprecated");
+DEFINE_int32(llama_port, 15000, "Deprecated");
+DEFINE_string(llama_addresses, "", "Deprecated");
+DEFINE_int64(llama_registration_timeout_secs, 30, "Deprecated");
+DEFINE_int64(llama_registration_wait_secs, 3, "Deprecated");
+DEFINE_int64(llama_max_request_attempts, 5, "Deprecated");
+DEFINE_string(cgroup_hierarchy_path, "", "Deprecated");
+DEFINE_string(staging_cgroup, "impala_staging", "Deprecated");
+DEFINE_int32(resource_broker_cnxn_attempts, 1, "Deprecated");
+DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated");
+DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated");
+DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated");
DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to "
"start fragments on remote Impala daemons.");
@@ -145,11 +115,6 @@ DEFINE_int32(catalog_client_connection_num_retries, 3, "Retry catalog connection
DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket "
"send/recv timeout in milliseconds for a catalog client RPC.");
-// The key for a variable set in Impala's test environment only, to allow the
-// resource-broker to correctly map node addresses into a form that Llama understand.
-const static string PSEUDO_DISTRIBUTED_CONFIG_KEY =
- "yarn.scheduler.include-port-in-node-name";
-
const static string DEFAULT_FS = "fs.defaultFS";
namespace impala {
@@ -160,35 +125,29 @@ ExecEnv::ExecEnv()
: metrics_(new MetricGroup("impala-metrics")),
stream_mgr_(new DataStreamMgr(metrics_.get())),
impalad_client_cache_(
- new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
- 0, FLAGS_backend_client_rpc_timeout_ms,
- FLAGS_backend_client_rpc_timeout_ms,
- "", !FLAGS_ssl_client_ca_certificate.empty())),
+ new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
+ FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
catalogd_client_cache_(
- new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
- 0, FLAGS_catalog_client_rpc_timeout_ms,
- FLAGS_catalog_client_rpc_timeout_ms,
- "", !FLAGS_ssl_client_ca_certificate.empty())),
+ new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0,
+ FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new DiskIoMgr()),
webserver_(new Webserver()),
mem_tracker_(NULL),
thread_mgr_(new ThreadResourceMgr),
- cgroups_mgr_(NULL),
hdfs_op_thread_pool_(
CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
tmp_file_mgr_(new TmpFileMgr),
request_pool_service_(new RequestPoolService(metrics_.get())),
frontend_(new Frontend()),
- fragment_exec_thread_pool_(
- new CallableThreadPool("coordinator-fragment-rpc", "worker",
- FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
+ fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
+ "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
enable_webserver_(FLAGS_enable_webserver),
is_fe_tests_(false),
- backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
- is_pseudo_distributed_llama_(false) {
- if (FLAGS_enable_rm) InitRm();
+ backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
// Initialize the scheduler either dynamically (with a statestore) or statically (with
// a standalone single backend)
if (FLAGS_use_statestore) {
@@ -202,33 +161,30 @@ ExecEnv::ExecEnv()
subscriber_address, statestore_address, metrics_.get()));
scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(),
- statestore_subscriber_->id(), backend_address_, metrics_.get(),
- webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
+ statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
+ request_pool_service_.get()));
} else {
vector<TNetworkAddress> addresses;
addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
- scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
- resource_broker_.get(), request_pool_service_.get()));
+ scheduler_.reset(new SimpleScheduler(
+ addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
}
if (exec_env_ == NULL) exec_env_ = this;
- if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
}
// TODO: Need refactor to get rid of duplicated code.
ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
- int webserver_port, const string& statestore_host, int statestore_port)
+ int webserver_port, const string& statestore_host, int statestore_port)
: metrics_(new MetricGroup("impala-metrics")),
stream_mgr_(new DataStreamMgr(metrics_.get())),
impalad_client_cache_(
- new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
- 0, FLAGS_backend_client_rpc_timeout_ms,
- FLAGS_backend_client_rpc_timeout_ms,
- "", !FLAGS_ssl_client_ca_certificate.empty())),
+ new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
+ FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
catalogd_client_cache_(
- new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
- 0, FLAGS_catalog_client_rpc_timeout_ms,
- FLAGS_catalog_client_rpc_timeout_ms,
- "", !FLAGS_ssl_client_ca_certificate.empty())),
+ new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0,
+ FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
+ !FLAGS_ssl_client_ca_certificate.empty())),
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new DiskIoMgr()),
webserver_(new Webserver(webserver_port)),
@@ -238,16 +194,13 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
tmp_file_mgr_(new TmpFileMgr),
frontend_(new Frontend()),
- fragment_exec_thread_pool_(
- new CallableThreadPool("coordinator-fragment-rpc", "worker",
- FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
+ fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
+ "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
is_fe_tests_(false),
- backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
- is_pseudo_distributed_llama_(false) {
+ backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
request_pool_service_.reset(new RequestPoolService(metrics_.get()));
- if (FLAGS_enable_rm) InitRm();
if (FLAGS_use_statestore && statestore_port > 0) {
TNetworkAddress subscriber_address =
@@ -260,73 +213,23 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
subscriber_address, statestore_address, metrics_.get()));
scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(),
- statestore_subscriber_->id(), backend_address_, metrics_.get(),
- webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
+ statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
+ request_pool_service_.get()));
} else {
vector<TNetworkAddress> addresses;
addresses.push_back(MakeNetworkAddress(hostname, backend_port));
- scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
- resource_broker_.get(), request_pool_service_.get()));
+ scheduler_.reset(new SimpleScheduler(
+ addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
}
if (exec_env_ == NULL) exec_env_ = this;
- if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
}
-void ExecEnv::InitRm() {
- // Unique addresses from FLAGS_llama_addresses and FLAGS_llama_host/FLAGS_llama_port.
- vector<TNetworkAddress> llama_addresses;
- if (!FLAGS_llama_addresses.empty()) {
- vector<string> components;
- split(components, FLAGS_llama_addresses, is_any_of(","), token_compress_on);
- for (int i = 0; i < components.size(); ++i) {
- to_lower(components[i]);
- TNetworkAddress llama_address = MakeNetworkAddress(components[i]);
- if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
- == llama_addresses.end()) {
- llama_addresses.push_back(llama_address);
- }
- }
- }
- // Add Llama hostport from deprecated flags (if it does not already exist).
- if (!FLAGS_llama_host.empty()) {
- to_lower(FLAGS_llama_host);
- TNetworkAddress llama_address =
- MakeNetworkAddress(FLAGS_llama_host, FLAGS_llama_port);
- if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
- == llama_addresses.end()) {
- llama_addresses.push_back(llama_address);
- }
- }
- for (int i = 0; i < llama_addresses.size(); ++i) {
- LOG(INFO) << "Llama address " << i << ": " << llama_addresses[i];
- }
-
- TNetworkAddress llama_callback_address =
- MakeNetworkAddress(FLAGS_hostname, FLAGS_llama_callback_port);
- resource_broker_.reset(new ResourceBroker(llama_addresses, llama_callback_address,
- metrics_.get()));
- cgroups_mgr_.reset(new CgroupsMgr(metrics_.get()));
-
- TGetHadoopConfigRequest config_request;
- config_request.__set_name(PSEUDO_DISTRIBUTED_CONFIG_KEY);
- TGetHadoopConfigResponse config_response;
- frontend_->GetHadoopConfig(config_request, &config_response);
- if (config_response.__isset.value) {
- to_lower(config_response.value);
- is_pseudo_distributed_llama_ = (config_response.value == "true");
- } else {
- is_pseudo_distributed_llama_ = false;
- }
- if (is_pseudo_distributed_llama_) {
- LOG(INFO) << "Pseudo-distributed Llama cluster detected";
- }
-}
ExecEnv::~ExecEnv() {
}
Status ExecEnv::InitForFeTests() {
- mem_tracker_.reset(new MemTracker(-1, -1, "Process"));
+ mem_tracker_.reset(new MemTracker(-1, "Process"));
is_fe_tests_ = true;
return Status::OK();
}
@@ -334,15 +237,6 @@ Status ExecEnv::InitForFeTests() {
Status ExecEnv::StartServices() {
LOG(INFO) << "Starting global services";
- if (FLAGS_enable_rm) {
- // Initialize the resource broker to make sure the Llama is up and reachable.
- DCHECK(resource_broker_.get() != NULL);
- RETURN_IF_ERROR(resource_broker_->Init());
- DCHECK(cgroups_mgr_.get() != NULL);
- RETURN_IF_ERROR(
- cgroups_mgr_->Init(FLAGS_cgroup_hierarchy_path, FLAGS_staging_cgroup));
- }
-
// Initialize global memory limit.
// Depending on the system configuration, we will have to calculate the process
// memory limit either based on the available physical memory, or if overcommitting
@@ -397,7 +291,7 @@ Status ExecEnv::StartServices() {
#ifndef ADDRESS_SANITIZER
// Limit of -1 means no memory limit.
mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED,
- bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
+ bytes_limit > 0 ? bytes_limit : -1, "Process"));
// Since tcmalloc does not free unused memory, we may exceed the process mem limit even
// if Impala is not actually using that much memory. Add a callback to free any unused
@@ -407,7 +301,7 @@ Status ExecEnv::StartServices() {
#else
// tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to
// track process memory usage (sum of all children trackers).
- mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
+ mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, "Process"));
#endif
mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 1f37572..303876f 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -29,7 +29,6 @@
namespace impala {
class CallableThreadPool;
-class CgroupsMgr;
class DataStreamMgr;
class DiskIoMgr;
class FragmentMgr;
@@ -42,7 +41,6 @@ class MemTracker;
class MetricGroup;
class QueryResourceMgr;
class RequestPoolService;
-class ResourceBroker;
class Scheduler;
class StatestoreSubscriber;
class TestExecEnv;
@@ -89,7 +87,6 @@ class ExecEnv {
MetricGroup* metrics() { return metrics_.get(); }
MemTracker* process_mem_tracker() { return mem_tracker_.get(); }
ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); }
- CgroupsMgr* cgroups_mgr() { return cgroups_mgr_.get(); }
HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); }
TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
CallableThreadPool* fragment_exec_thread_pool() {
@@ -102,7 +99,6 @@ class ExecEnv {
void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
- ResourceBroker* resource_broker() { return resource_broker_.get(); }
Scheduler* scheduler() { return scheduler_.get(); }
StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
@@ -119,11 +115,6 @@ class ExecEnv {
/// differently.
bool is_fe_tests() { return is_fe_tests_; }
- /// Returns true if the Llama in use is pseudo-distributed, used for development
- /// purposes. The pseudo-distributed version has special requirements for specifying
- /// resource locations.
- bool is_pseudo_distributed_llama() { return is_pseudo_distributed_llama_; }
-
/// Returns the configured defaultFs set in core-site.xml
string default_fs() { return default_fs_; }
@@ -131,7 +122,6 @@ class ExecEnv {
/// Leave protected so that subclasses can override
boost::scoped_ptr<MetricGroup> metrics_;
boost::scoped_ptr<DataStreamMgr> stream_mgr_;
- boost::scoped_ptr<ResourceBroker> resource_broker_;
boost::scoped_ptr<Scheduler> scheduler_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
@@ -141,7 +131,6 @@ class ExecEnv {
boost::scoped_ptr<Webserver> webserver_;
boost::scoped_ptr<MemTracker> mem_tracker_;
boost::scoped_ptr<ThreadResourceMgr> thread_mgr_;
- boost::scoped_ptr<CgroupsMgr> cgroups_mgr_;
boost::scoped_ptr<HdfsOpThreadPool> hdfs_op_thread_pool_;
boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
boost::scoped_ptr<RequestPoolService> request_pool_service_;
@@ -161,16 +150,8 @@ class ExecEnv {
/// Address of the Impala backend server instance
TNetworkAddress backend_address_;
- /// True if the cluster has set 'yarn.scheduler.include-port-in-node-name' to true,
- /// indicating that this cluster is pseudo-distributed. Should not be true in real
- /// deployments.
- bool is_pseudo_distributed_llama_;
-
/// fs.defaultFs value set in core-site.xml
std::string default_fs_;
-
- /// Initialise cgroups manager, detect test RM environment and init resource broker.
- void InitRm();
};
} // namespace impala
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 99cec9a..604923c 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -224,8 +224,8 @@ TEST(MemPoolTest, ReturnPartial) {
TEST(MemPoolTest, Limits) {
MemTracker limit3(4 * MemPoolTest::INITIAL_CHUNK_SIZE);
- MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3);
- MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3);
+ MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
+ MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
MemPool* p1 = new MemPool(&limit1);
EXPECT_FALSE(limit1.AnyLimitExceeded());
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 87eec14..546b8ab 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -64,7 +64,7 @@ TEST(MemTestTest, ConsumptionMetric) {
UIntGauge metric(md, 0);
EXPECT_EQ(metric.value(), 0);
- MemTracker t(&metric, 100, -1, "");
+ MemTracker t(&metric, 100, "");
EXPECT_TRUE(t.has_limit());
EXPECT_EQ(t.consumption(), 0);
@@ -112,8 +112,8 @@ TEST(MemTestTest, ConsumptionMetric) {
TEST(MemTestTest, TrackerHierarchy) {
MemTracker p(100);
- MemTracker c1(80, -1, "", &p);
- MemTracker c2(50, -1, "", &p);
+ MemTracker c1(80, "", &p);
+ MemTracker c2(50, "", &p);
// everything below limits
c1.Consume(60);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a9ceb76..a9de160 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -23,10 +23,8 @@
#include <gutil/strings/substitute.h>
#include "bufferpool/reservation-tracker-counters.h"
-#include "resourcebroker/resource-broker.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
-#include "scheduling/query-resource-mgr.h"
#include "util/debug-util.h"
#include "util/mem-info.h"
#include "util/pretty-printer.h"
@@ -49,10 +47,9 @@ AtomicInt64 MemTracker::released_memory_since_gc_;
// Name for request pool MemTrackers. '$0' is replaced with the pool name.
const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
-MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const string& label,
- MemTracker* parent, bool log_usage_if_zero)
+MemTracker::MemTracker(
+ int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero)
: limit_(byte_limit),
- rm_reserved_limit_(rm_reserved_limit),
label_(label),
parent_(parent),
consumption_(&local_counter_),
@@ -60,7 +57,6 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri
consumption_metric_(NULL),
auto_unregister_(false),
log_usage_if_zero_(log_usage_if_zero),
- query_resource_mgr_(NULL),
num_gcs_metric_(NULL),
bytes_freed_by_last_gc_metric_(NULL),
bytes_over_limit_metric_(NULL),
@@ -69,11 +65,9 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri
Init();
}
-MemTracker::MemTracker(
- RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit,
+MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
const std::string& label, MemTracker* parent)
: limit_(byte_limit),
- rm_reserved_limit_(rm_reserved_limit),
label_(label),
parent_(parent),
consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
@@ -81,7 +75,6 @@ MemTracker::MemTracker(
consumption_metric_(NULL),
auto_unregister_(false),
log_usage_if_zero_(true),
- query_resource_mgr_(NULL),
num_gcs_metric_(NULL),
bytes_freed_by_last_gc_metric_(NULL),
bytes_over_limit_metric_(NULL),
@@ -90,10 +83,9 @@ MemTracker::MemTracker(
Init();
}
-MemTracker::MemTracker(UIntGauge* consumption_metric,
- int64_t byte_limit, int64_t rm_reserved_limit, const string& label)
+MemTracker::MemTracker(
+ UIntGauge* consumption_metric, int64_t byte_limit, const string& label)
: limit_(byte_limit),
- rm_reserved_limit_(rm_reserved_limit),
label_(label),
parent_(NULL),
consumption_(&local_counter_),
@@ -101,7 +93,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric,
consumption_metric_(consumption_metric),
auto_unregister_(false),
log_usage_if_zero_(true),
- query_resource_mgr_(NULL),
num_gcs_metric_(NULL),
bytes_freed_by_last_gc_metric_(NULL),
bytes_over_limit_metric_(NULL),
@@ -111,7 +102,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric,
void MemTracker::Init() {
DCHECK_GE(limit_, -1);
- DCHECK(rm_reserved_limit_ == -1 || limit_ == -1 || rm_reserved_limit_ <= limit_);
// populate all_trackers_ and limit_trackers_
MemTracker* tracker = this;
while (tracker != NULL) {
@@ -173,9 +163,8 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name,
} else {
if (parent == NULL) return NULL;
// First time this pool_name registered, make a new object.
- MemTracker* tracker = new MemTracker(-1, -1,
- Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
- parent);
+ MemTracker* tracker = new MemTracker(
+ -1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), parent);
tracker->auto_unregister_ = true;
tracker->pool_name_ = pool_name;
pool_to_mem_trackers_[pool_name] = tracker;
@@ -184,8 +173,7 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name,
}
shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
- const TUniqueId& id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
- QueryResourceMgr* res_mgr) {
+ const TUniqueId& id, int64_t byte_limit, MemTracker* parent) {
if (byte_limit != -1) {
if (byte_limit > MemInfo::physical_mem()) {
LOG(WARNING) << "Memory limit "
@@ -210,12 +198,11 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
} else {
// First time this id registered, make a new object. Give a shared ptr to
// the caller and put a weak ptr in the map.
- shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit,
- rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
+ shared_ptr<MemTracker> tracker = make_shared<MemTracker>(
+ byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
tracker->auto_unregister_ = true;
tracker->query_id_ = id;
request_to_mem_trackers_[id] = tracker;
- if (res_mgr != NULL) tracker->SetQueryResourceMgr(res_mgr);
return tracker;
}
}
@@ -278,9 +265,6 @@ string MemTracker::LogUsage(const string& prefix) const {
ss << prefix << label_ << ":";
if (CheckLimitExceeded()) ss << " memory limit exceeded.";
if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
- if (rm_reserved_limit_ > 0) {
- ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES);
- }
int64_t total = consumption();
int64_t peak = consumption_->value();
@@ -358,45 +342,4 @@ void MemTracker::GcTcmalloc() {
#endif
}
-bool MemTracker::ExpandRmReservation(int64_t bytes) {
- if (query_resource_mgr_ == NULL || rm_reserved_limit_ == -1) return false;
- // TODO: Make this asynchronous after IO mgr changes to use TryConsume() are done.
- lock_guard<mutex> l(resource_acquisition_lock_);
- int64_t requested = consumption_->current_value() + bytes;
- // Can't exceed the hard limit under any circumstance
- if (requested >= limit_ && limit_ != -1) return false;
- // Test to see if we can satisfy the limit anyhow; maybe a different request was already
- // in flight.
- if (requested < rm_reserved_limit_) return true;
-
- int64_t bytes_allocated;
- Status status = query_resource_mgr_->RequestMemExpansion(bytes, &bytes_allocated);
- if (!status.ok()) {
- LOG(INFO) << "Failed to expand memory limit by "
- << PrettyPrinter::Print(bytes, TUnit::BYTES) << ": "
- << status.GetDetail();
- return false;
- }
-
- for (const MemTracker* tracker: limit_trackers_) {
- if (tracker == this) continue;
- if (tracker->consumption_->current_value() + bytes_allocated > tracker->limit_) {
- // TODO: Allocation may be larger than needed and might exceed some parent
- // tracker limit. IMPALA-2182.
- VLOG_RPC << "Failed to use " << bytes_allocated << " bytes allocated over "
- << tracker->label() << " tracker limit=" << tracker->limit_
- << " consumption=" << tracker->consumption();
- // Don't adjust our limit; rely on query tear-down to release the resource.
- return false;
- }
- }
-
- rm_reserved_limit_ += bytes_allocated;
- // Resource broker might give us more than we ask for
- if (limit_ != -1) rm_reserved_limit_ = min(rm_reserved_limit_, limit_);
- VLOG_RPC << "Reservation bytes_allocated=" << bytes_allocated << " rm_reserved_limit="
- << rm_reserved_limit_ << " limit=" << limit_;
- return true;
-}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index e3548cc..a2c3e9b 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -39,7 +39,6 @@ namespace impala {
class ReservationTrackerCounters;
class MemTracker;
-class QueryResourceMgr;
/// A MemTracker tracks memory consumption; it contains an optional limit
/// and can be arranged into a tree structure such that the consumption tracked
@@ -66,19 +65,17 @@ class MemTracker {
/// 'label' is the label used in the usage string (LogUsage())
/// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included
/// in LogUsage() output if consumption is 0.
- MemTracker(int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
- const std::string& label = std::string(), MemTracker* parent = NULL,
- bool log_usage_if_zero = true);
+ MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
+ MemTracker* parent = NULL, bool log_usage_if_zero = true);
/// C'tor for tracker for which consumption counter is created as part of a profile.
/// The counter is created with name COUNTER_NAME.
- MemTracker(RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit = -1,
+ MemTracker(RuntimeProfile* profile, int64_t byte_limit,
const std::string& label = std::string(), MemTracker* parent = NULL);
/// C'tor for tracker that uses consumption_metric as the consumption value.
/// Consume()/Release() can still be called. This is used for the process tracker.
- MemTracker(UIntGauge* consumption_metric,
- int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
+ MemTracker(UIntGauge* consumption_metric, int64_t byte_limit = -1,
const std::string& label = std::string());
~MemTracker();
@@ -98,9 +95,8 @@ class MemTracker {
/// 'parent' as the parent tracker.
/// byte_limit and parent must be the same for all GetMemTracker() calls with the
/// same id.
- static std::shared_ptr<MemTracker> GetQueryMemTracker(const TUniqueId& id,
- int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
- QueryResourceMgr* res_mgr);
+ static std::shared_ptr<MemTracker> GetQueryMemTracker(
+ const TUniqueId& id, int64_t byte_limit, MemTracker* parent);
/// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same
/// 'pool_name' will return the same MemTracker object. This is used to track the local
@@ -112,14 +108,6 @@ class MemTracker {
static MemTracker* GetRequestPoolMemTracker(const std::string& pool_name,
MemTracker* parent);
- /// Returns the minimum of limit and rm_reserved_limit
- int64_t effective_limit() const {
- // TODO: maybe no limit should be MAX_LONG?
- DCHECK(rm_reserved_limit_ <= limit_ || limit_ == -1);
- if (rm_reserved_limit_ == -1) return limit_;
- return rm_reserved_limit_;
- }
-
/// Increases consumption of this tracker and its ancestors by 'bytes'.
void Consume(int64_t bytes) {
if (bytes <= 0) {
@@ -166,49 +154,33 @@ class MemTracker {
if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
if (UNLIKELY(bytes <= 0)) return true;
int i;
- // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
- // won't accommodate the change.
+ // Walk the tracker tree top-down.
for (i = all_trackers_.size() - 1; i >= 0; --i) {
MemTracker* tracker = all_trackers_[i];
- int64_t limit = tracker->effective_limit();
+ const int64_t limit = tracker->limit();
if (limit < 0) {
tracker->consumption_->Add(bytes); // No limit at this tracker.
} else {
- // If TryConsume fails, we can try to GC or expand the RM reservation, but we may
- // need to try several times if there are concurrent consumers because we don't
- // take a lock before trying to update consumption_.
+ // If TryConsume fails, we can try to GC, but we may need to try several times if
+ // there are concurrent consumers because we don't take a lock before trying to
+ // update consumption_.
while (true) {
if (LIKELY(tracker->consumption_->TryAdd(bytes, limit))) break;
VLOG_RPC << "TryConsume failed, bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
- << " limit=" << limit << " attempting to GC and expand reservation";
- // TODO: This may not be right if more than one tracker can actually change its
- // RM reservation limit.
- if (UNLIKELY(tracker->GcMemory(limit - bytes) &&
- !tracker->ExpandRmReservation(bytes))) {
+ << " limit=" << limit << " attempting to GC";
+ if (UNLIKELY(tracker->GcMemory(limit - bytes))) {
DCHECK_GE(i, 0);
// Failed for this mem tracker. Roll back the ones that succeeded.
- // TODO: this doesn't roll it back completely since the max values for
- // the updated trackers aren't decremented. The max values are only used
- // for error reporting so this is probably okay. Rolling those back is
- // pretty hard; we'd need something like 2PC.
- //
- // TODO: This might leave us with an allocated resource that we can't use.
- // Specifically, the RM reservation of some ancestors' trackers may have been
- // expanded only to fail at the current tracker. This may be wasteful as
- // subsequent TryConsume() never gets to use the reserved resources. Consider
- // adjusting the reservation of the ancestors' trackers.
for (int j = all_trackers_.size() - 1; j > i; --j) {
all_trackers_[j]->consumption_->Add(-bytes);
}
return false;
}
- VLOG_RPC << "GC or expansion succeeded, TryConsume bytes=" << bytes
+ VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes
<< " consumption=" << tracker->consumption_->current_value()
- << " new limit=" << tracker->effective_limit() << " prev=" << limit;
- // Need to update the limit if the RM reservation was expanded.
- limit = tracker->effective_limit();
+ << " limit=" << limit;
}
}
}
@@ -363,11 +335,6 @@ class MemTracker {
/// can cause us to go way over mem limits.
void GcTcmalloc();
- /// Set the resource mgr to allow expansion of limits (if NULL, no expansion is possible)
- void SetQueryResourceMgr(QueryResourceMgr* context) {
- query_resource_mgr_ = context;
- }
-
/// Walks the MemTracker hierarchy and populates all_trackers_ and
/// limit_trackers_
void Init();
@@ -378,11 +345,6 @@ class MemTracker {
static std::string LogUsage(const std::string& prefix,
const std::list<MemTracker*>& trackers);
- /// Try to expand the limit (by asking the resource broker for more memory) by at least
- /// 'bytes'. Returns false if not possible, true if the request succeeded. May allocate
- /// more memory than was requested.
- bool ExpandRmReservation(int64_t bytes);
-
/// Size, in bytes, that is considered a large value for Release() (or Consume() with
/// a negative value). If tcmalloc is used, this can trigger it to GC.
/// A higher value will make us call into tcmalloc less often (and therefore more
@@ -425,11 +387,6 @@ class MemTracker {
/// there is no consumption limit.
int64_t limit_;
- /// If > -1, when RM is enabled this is the limit after which this memtracker needs to
- /// acquire more memory from Llama.
- /// This limit is always less than or equal to the hard limit.
- int64_t rm_reserved_limit_;
-
std::string label_;
MemTracker* parent_;
@@ -476,14 +433,6 @@ class MemTracker {
/// if consumption is 0.
bool log_usage_if_zero_;
- /// Lock is taken during ExpandRmReservation() to prevent concurrent acquisition of new
- /// resources.
- boost::mutex resource_acquisition_lock_;
-
- /// If non-NULL, contains all the information required to expand resource reservations if
- /// required.
- QueryResourceMgr* query_resource_mgr_;
-
/// The number of times the GcFunctions were called.
IntCounter* num_gcs_metric_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 1e52d08..7300f44 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -32,26 +32,21 @@
#include "exec/hdfs-scan-node.h"
#include "exec/hbase-table-scanner.h"
#include "exprs/expr.h"
-#include "resourcebroker/resource-broker.h"
#include "runtime/descriptors.h"
#include "runtime/data-stream-mgr.h"
#include "runtime/row-batch.h"
#include "runtime/runtime-filter-bank.h"
#include "runtime/mem-tracker.h"
-#include "scheduling/query-resource-mgr.h"
-#include "util/cgroups-mgr.h"
#include "util/cpu-info.h"
#include "util/debug-util.h"
#include "util/container-util.h"
#include "util/parse-util.h"
#include "util/mem-info.h"
#include "util/periodic-counter-updater.h"
-#include "util/llama-util.h"
#include "util/pretty-printer.h"
DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch");
DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
-DECLARE_bool(enable_rm);
#include "common/names.h"
@@ -76,9 +71,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
PlanFragmentExecutor::~PlanFragmentExecutor() {
Close();
- if (is_prepared_ && runtime_state_->query_resource_mgr() != NULL) {
- exec_env_->resource_broker()->UnregisterQueryResourceMgr(query_id_);
- }
// at this point, the report thread should have been stopped
DCHECK(!report_thread_active_);
}
@@ -100,58 +92,15 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx);
DCHECK(request.__isset.fragment_ctx);
- bool request_has_reserved_resource =
- request.fragment_instance_ctx.__isset.reserved_resource;
- if (request_has_reserved_resource) {
- VLOG_QUERY << "Executing fragment in reserved resource:\n"
- << request.fragment_instance_ctx.reserved_resource;
- }
-
- string cgroup = "";
- if (FLAGS_enable_rm && request_has_reserved_resource) {
- cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
- }
// Prepare() must not return before runtime_state_ is set if is_prepared_ was
// set. Having runtime_state_.get() != NULL is a postcondition of this method in that
// case. Do not call RETURN_IF_ERROR or explicitly return before this line.
- runtime_state_.reset(new RuntimeState(request, cgroup, exec_env_));
+ runtime_state_.reset(new RuntimeState(request, exec_env_));
// total_time_counter() is in the runtime_state_ so start it up now.
SCOPED_TIMER(profile()->total_time_counter());
- // Register after setting runtime_state_ to ensure proper cleanup.
- if (FLAGS_enable_rm && !cgroup.empty() && request_has_reserved_resource) {
- bool is_first;
- RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment(
- request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first));
- // The first fragment using cgroup sets the cgroup's CPU shares based on the reserved
- // resource.
- if (is_first) {
- DCHECK(request_has_reserved_resource);
- int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares(
- request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
- RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares));
- }
- }
-
- // TODO: Find the reservation id when the resource request is not set
- if (FLAGS_enable_rm && request_has_reserved_resource) {
- TUniqueId reservation_id;
- reservation_id << request.fragment_instance_ctx.reserved_resource.reservation_id;
-
- // TODO: Combine this with RegisterFragment() etc.
- QueryResourceMgr* res_mgr;
- bool is_first = exec_env_->resource_broker()->GetQueryResourceMgr(query_id_,
- reservation_id, request.fragment_instance_ctx.local_resource_address, &res_mgr);
- DCHECK(res_mgr != NULL);
- runtime_state_->SetQueryResourceMgr(res_mgr);
- if (is_first) {
- runtime_state_->query_resource_mgr()->InitVcoreAcquisition(
- request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
- }
- }
-
// reservation or a query option.
int64_t bytes_limit = -1;
if (runtime_state_->query_options().__isset.mem_limit &&
@@ -161,36 +110,14 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
<< PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
}
- int64_t rm_reservation_size_bytes = -1;
- if (request_has_reserved_resource &&
- request.fragment_instance_ctx.reserved_resource.memory_mb > 0) {
- int64_t rm_reservation_size_mb =
- static_cast<int64_t>(request.fragment_instance_ctx.reserved_resource.memory_mb);
- rm_reservation_size_bytes = rm_reservation_size_mb * 1024L * 1024L;
- // Queries that use more than the hard limit will be killed, so it's not useful to
- // have a reservation larger than the hard limit. Clamp reservation bytes limit to the
- // hard limit (if it exists).
- if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) {
- runtime_state_->LogError(ErrorMsg(TErrorCode::FRAGMENT_EXECUTOR,
- PrettyPrinter::PrintBytes(rm_reservation_size_bytes),
- PrettyPrinter::PrintBytes(bytes_limit)));
- rm_reservation_size_bytes = bytes_limit;
- }
- VLOG_QUERY << "Using RM reservation memory limit from resource reservation: "
- << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES);
- }
-
DCHECK(!fragment_instance_ctx.request_pool.empty());
- runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool,
- bytes_limit, rm_reservation_size_bytes);
+ runtime_state_->InitMemTrackers(
+ query_id_, &fragment_instance_ctx.request_pool, bytes_limit);
RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
runtime_state_->InitFilterBank();
// Reserve one main thread from the pool
runtime_state_->resource_pool()->AcquireThreadToken();
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
- }
has_thread_token_ = true;
average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
@@ -266,8 +193,8 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
obj_pool(), request.fragment_ctx.fragment.output_sink,
request.fragment_ctx.fragment.output_exprs,
fragment_instance_ctx, row_desc(), &sink_));
- sink_mem_tracker_.reset(new MemTracker(-1, -1, sink_->GetName(),
- runtime_state_->instance_mem_tracker(), true));
+ sink_mem_tracker_.reset(new MemTracker(
+ -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
RuntimeProfile* sink_profile = sink_->profile();
@@ -565,9 +492,6 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
if (has_thread_token_) {
has_thread_token_ = false;
runtime_state_->resource_pool()->ReleaseThreadToken(true);
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
- }
PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_);
PeriodicCounterUpdater::StopTimeSeriesCounter(
thread_usage_sampled_counter_);
@@ -583,10 +507,6 @@ void PlanFragmentExecutor::Close() {
}
// Prepare may not have been called, which sets runtime_state_
if (runtime_state_.get() != NULL) {
- if (runtime_state_->query_resource_mgr() != NULL) {
- exec_env_->cgroups_mgr()->UnregisterFragment(
- runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
- }
if (plan_ != NULL) plan_->Close(runtime_state_.get());
for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
runtime_state_->io_mgr()->UnregisterContext(context);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 8d47431..a25bf8d 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -66,8 +66,8 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
default_filter_size_ =
BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
- filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank",
- state->instance_mem_tracker(), false));
+ filter_mem_tracker_.reset(
+ new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false));
}
RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -226,4 +226,3 @@ void RuntimeFilterBank::Close() {
filter_mem_tracker_->Release(memory_allocated_->value());
filter_mem_tracker_->UnregisterFromParent();
}
-
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 57aae58..5249076 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -67,18 +67,15 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
namespace impala {
-RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
- const string& cgroup, ExecEnv* exec_env)
+RuntimeState::RuntimeState(
+ const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env)
: obj_pool_(new ObjectPool()),
fragment_params_(fragment_params),
- now_(new TimestampValue(query_ctx().now_string.c_str(),
- query_ctx().now_string.size())),
- cgroup_(cgroup),
+ now_(new TimestampValue(
+ query_ctx().now_string.c_str(), query_ctx().now_string.size())),
codegen_expr_(false),
- profile_(obj_pool_.get(),
- "Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
+ profile_(obj_pool_.get(), "Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
is_cancelled_(false),
- query_resource_mgr_(NULL),
root_node_id_(-1) {
Status status = Init(exec_env);
DCHECK(status.ok()) << status.GetDetail();
@@ -92,7 +89,6 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
codegen_expr_(false),
profile_(obj_pool_.get(), "<unnamed>"),
is_cancelled_(false),
- query_resource_mgr_(NULL),
root_node_id_(-1) {
fragment_params_.__set_query_ctx(query_ctx);
fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
@@ -147,18 +143,17 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
return Status::OK();
}
-void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool_name,
- int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes) {
+void RuntimeState::InitMemTrackers(
+ const TUniqueId& query_id, const string* pool_name, int64_t query_bytes_limit) {
MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
if (pool_name != NULL) {
query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
query_parent_tracker);
}
query_mem_tracker_ =
- MemTracker::GetQueryMemTracker(query_id, query_bytes_limit,
- query_rm_reservation_limit_bytes, query_parent_tracker, query_resource_mgr());
- instance_mem_tracker_.reset(new MemTracker(runtime_profile(), -1, -1,
- runtime_profile()->name(), query_mem_tracker_.get()));
+ MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, query_parent_tracker);
+ instance_mem_tracker_.reset(new MemTracker(
+ runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get()));
}
void RuntimeState::InitFilterBank() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b5f7882..0bf9db5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -65,8 +65,7 @@ typedef std::map<std::string, std::string> FileMoveMap;
/// query and shared across all execution nodes of that query.
class RuntimeState {
public:
- RuntimeState(const TExecPlanFragmentParams& fragment_params,
- const std::string& cgroup, ExecEnv* exec_env);
+ RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env);
/// RuntimeState for executing expr in fe-support.
RuntimeState(const TQueryCtx& query_ctx);
@@ -81,7 +80,7 @@ class RuntimeState {
/// tracker (in the fifth level). If 'request_pool' is null, no request pool mem
/// tracker is set up, i.e. query pools will have the process mem pool as the parent.
void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool,
- int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1);
+ int64_t query_bytes_limit);
/// Initializes the runtime filter bank. Must be called after InitMemTrackers().
void InitFilterBank();
@@ -124,7 +123,6 @@ class RuntimeState {
const TUniqueId& fragment_instance_id() const {
return fragment_ctx().fragment_instance_id;
}
- const std::string& cgroup() const { return cgroup_; }
ExecEnv* exec_env() { return exec_env_; }
DataStreamMgr* stream_mgr() { return exec_env_->stream_mgr(); }
HBaseTableFactory* htable_factory() { return exec_env_->htable_factory(); }
@@ -262,9 +260,6 @@ class RuntimeState {
/// execution doesn't continue if the query terminates abnormally.
Status CheckQueryState();
- QueryResourceMgr* query_resource_mgr() const { return query_resource_mgr_; }
- void SetQueryResourceMgr(QueryResourceMgr* res_mgr) { query_resource_mgr_ = res_mgr; }
-
private:
/// Allow TestEnv to set block_mgr manually for testing.
friend class TestEnv;
@@ -301,9 +296,6 @@ class RuntimeState {
/// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues.
boost::scoped_ptr<TimestampValue> now_;
- /// The Impala-internal cgroup into which execution threads are assigned.
- /// If empty, no RM is enabled.
- std::string cgroup_;
ExecEnv* exec_env_;
boost::scoped_ptr<LlvmCodeGen> codegen_;
@@ -351,10 +343,6 @@ class RuntimeState {
SpinLock query_status_lock_;
Status query_status_;
- /// Query-wide resource manager for resource expansion etc. Not owned by us; owned by
- /// the ResourceBroker instead.
- QueryResourceMgr* query_resource_mgr_;
-
/// Reader contexts that need to be closed when the fragment is closed.
std::vector<DiskIoRequestContext*> reader_contexts_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index a5818af..8690e39 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -70,7 +70,7 @@ RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) {
TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
plan_params.query_ctx.query_id.hi = 0;
plan_params.query_ctx.query_id.lo = query_id;
- return new RuntimeState(plan_params, "", exec_env_.get());
+ return new RuntimeState(plan_params, exec_env_.get());
}
Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index c5b4eb4..9cfb672 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -26,7 +26,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
add_library(Scheduling STATIC
admission-controller.cc
backend-config.cc
- query-resource-mgr.cc
query-schedule.cc
request-pool-service.cc
simple-scheduler.cc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
deleted file mode 100644
index abfe085..0000000
--- a/be/src/scheduling/query-resource-mgr.cc
+++ /dev/null
@@ -1,271 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scheduling/query-resource-mgr.h"
-
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <gutil/strings/substitute.h>
-#include <sstream>
-
-#include "runtime/exec-env.h"
-#include "resourcebroker/resource-broker.h"
-#include "util/bit-util.h"
-#include "util/cgroups-mgr.h"
-#include "util/container-util.h"
-#include "util/network-util.h"
-#include "util/promise.h"
-#include "util/time.h"
-
-#include "common/names.h"
-
-using boost::uuids::random_generator;
-using boost::uuids::uuid;
-using namespace impala;
-using namespace strings;
-
-DEFINE_int64(rm_mem_expansion_timeout_ms, 5000, "The amount of time to wait (ms) "
- "for a memory expansion request.");
-DEFINE_double(max_vcore_oversubscription_ratio, 2.5, "(Advanced) The maximum ratio "
- "allowed between running threads and acquired VCore resources for a query's fragments"
- " on a single node");
-
-ResourceResolver::ResourceResolver(const unordered_set<TNetworkAddress>& unique_hosts) {
- if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
- CreateLocalLlamaNodeMapping(unique_hosts);
- }
-}
-
-void ResourceResolver::GetResourceHostport(const TNetworkAddress& src,
- TNetworkAddress* dest) {
- if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
- *dest = impalad_to_dn_[src];
- } else {
- dest->hostname = src.hostname;
- dest->port = 0;
- }
-}
-
-void ResourceResolver::CreateLocalLlamaNodeMapping(
- const unordered_set<TNetworkAddress>& unique_hosts) {
- DCHECK(ExecEnv::GetInstance()->is_pseudo_distributed_llama());
- const vector<string>& llama_nodes =
- ExecEnv::GetInstance()->resource_broker()->llama_nodes();
- DCHECK(!llama_nodes.empty());
- int llama_node_ix = 0;
- for (const TNetworkAddress& host: unique_hosts) {
- TNetworkAddress dn_hostport = MakeNetworkAddress(llama_nodes[llama_node_ix]);
- impalad_to_dn_[host] = dn_hostport;
- dn_to_impalad_[dn_hostport] = host;
- LOG(INFO) << "Mapping Datanode " << dn_hostport << " to Impalad: " << host;
- // Round robin the registered Llama nodes.
- llama_node_ix = (llama_node_ix + 1) % llama_nodes.size();
- }
-}
-
-QueryResourceMgr::QueryResourceMgr(const TUniqueId& reservation_id,
- const TNetworkAddress& local_resource_location, const TUniqueId& query_id)
- : reservation_id_(reservation_id), query_id_(query_id),
- local_resource_location_(local_resource_location), exit_(false), callback_count_(0),
- threads_running_(0), vcores_(0) {
- max_vcore_oversubscription_ratio_ = FLAGS_max_vcore_oversubscription_ratio;
-}
-
-void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
- LOG(INFO) << "Initialising vcore acquisition thread for query " << PrintId(query_id_)
- << " (" << init_vcores << " initial vcores)";
- DCHECK(acquire_vcore_thread_.get() == NULL)
- << "Double initialisation of QueryResourceMgr::InitCpuAcquisition()";
- vcores_ = init_vcores;
-
- // These shared pointers to atomic values are used to communicate between the vcore
- // acquisition thread and the class destructor. If the acquisition thread is in the
- // middle of an Expand() call, the destructor might have to wait 5s (the default
- // timeout) to return. This holds up query close operations. So instead check to see if
- // the thread is in Expand(), and if so we set a synchronised flag early_exit_ which it
- // inspects immediately after exiting Expand(), and if true, exits before touching any
- // of the class-wide state (because the destructor may have finished before this point).
-
- thread_in_expand_.reset(new AtomicInt32());
- early_exit_.reset(new AtomicInt32());
- acquire_vcore_thread_.reset(
- new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
- bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
- thread_in_expand_, early_exit_)));
-}
-
-llama::TResource QueryResourceMgr::CreateResource(int64_t memory_mb, int64_t vcores) {
- DCHECK(memory_mb > 0 || vcores > 0);
- DCHECK(reservation_id_ != TUniqueId()) << "Expansion requires existing reservation";
-
- unordered_set<TNetworkAddress> hosts;
- hosts.insert(local_resource_location_);
- ResourceResolver resolver(hosts);
- llama::TResource res;
- res.memory_mb = memory_mb;
- res.v_cpu_cores = vcores;
- TNetworkAddress res_address;
- resolver.GetResourceHostport(local_resource_location_, &res_address);
- res.__set_askedLocation(TNetworkAddressToString(res_address));
-
- random_generator uuid_generator;
- uuid id = uuid_generator();
- res.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
- res.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
- res.enforcement = llama::TLocationEnforcement::MUST;
- return res;
-}
-
-bool QueryResourceMgr::AboveVcoreSubscriptionThreshold() {
- return threads_running_ > vcores_ * (max_vcore_oversubscription_ratio_ * 0.8);
-}
-
-void QueryResourceMgr::NotifyThreadUsageChange(int delta) {
- lock_guard<mutex> l(threads_running_lock_);
- threads_running_ += delta;
- DCHECK(threads_running_ >= 0L);
- if (AboveVcoreSubscriptionThreshold()) threads_changed_cv_.notify_all();
-}
-
-int32_t QueryResourceMgr::AddVcoreAvailableCb(const VcoreAvailableCb& callback) {
- lock_guard<mutex> l(callbacks_lock_);
- callbacks_[callback_count_] = callback;
- callbacks_it_ = callbacks_.begin();
- return callback_count_++;
-}
-
-void QueryResourceMgr::RemoveVcoreAvailableCb(int32_t callback_id) {
- lock_guard<mutex> l(callbacks_lock_);
- CallbackMap::iterator it = callbacks_.find(callback_id);
- DCHECK(it != callbacks_.end()) << "Could not find callback with id: " << callback_id;
- callbacks_.erase(it);
- callbacks_it_ = callbacks_.begin();
-}
-
-Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
- int64_t* allocated_bytes) {
- DCHECK(allocated_bytes != NULL);
- *allocated_bytes = 0;
- int64_t requested_mb = BitUtil::Ceil(requested_bytes, 1024L * 1024L);
- llama::TResource res = CreateResource(max<int64_t>(1, requested_mb), 0);
- llama::TUniqueId expansion_id;
- llama::TAllocatedResource resource;
- RETURN_IF_ERROR(ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id_,
- res, FLAGS_rm_mem_expansion_timeout_ms, &expansion_id, &resource));
-
- DCHECK_EQ(resource.v_cpu_cores, 0L) << "Unexpected VCPUs returned by Llama";
- *allocated_bytes = resource.memory_mb * 1024L * 1024L;
- return Status::OK();
-}
-
-void QueryResourceMgr::AcquireVcoreResources(
- shared_ptr<AtomicInt32> thread_in_expand,
- shared_ptr<AtomicInt32> early_exit) {
- // Take a copy because we'd like to print it in some cases after the destructor.
- TUniqueId reservation_id = reservation_id_;
- VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
- while (!ShouldExit()) {
- {
- unique_lock<mutex> l(threads_running_lock_);
- while (!AboveVcoreSubscriptionThreshold() && !ShouldExit()) {
- threads_changed_cv_.wait(l);
- }
- }
- if (ShouldExit()) break;
-
- llama::TResource res = CreateResource(0L, 1);
- VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_;
-
- // First signal that we are about to enter a blocking Expand() call.
- thread_in_expand->Add(1L);
-
- // TODO: Could cause problems if called during or after a system-wide shutdown
- llama::TAllocatedResource resource;
- llama::TUniqueId expansion_id;
- Status status = ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id,
- res, -1, &expansion_id, &resource);
- thread_in_expand->Add(-1L);
- // If signalled to exit quickly by the destructor, exit the loop now. It's important
- // to do so without accessing any class variables since they may no longer be valid.
- // Need to check after setting thread_in_expand to avoid a race.
- if (early_exit->Add(0L) != 0) {
- VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id;
- break;
- }
- if (!status.ok()) {
- VLOG_QUERY << "Could not expand CPU resources for query " << PrintId(query_id_)
- << ", reservation: " << PrintId(reservation_id_) << ". Error was: "
- << status.GetDetail();
- // Sleep to avoid flooding the resource broker, particularly if requests are being
- // rejected quickly (and therefore we stay oversubscribed)
- // TODO: configurable timeout
- SleepForMs(250);
- continue;
- }
-
- DCHECK(resource.v_cpu_cores == 1)
- << "Asked for 1 core, got: " << resource.v_cpu_cores;
- vcores_ += resource.v_cpu_cores;
-
- ExecEnv* exec_env = ExecEnv::GetInstance();
- const string& cgroup =
- exec_env->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
- int32_t num_shares = exec_env->cgroups_mgr()->VirtualCoresToCpuShares(vcores_);
- exec_env->cgroups_mgr()->SetCpuShares(cgroup, num_shares);
-
- // TODO: Only call one callback no matter how many VCores we just added; maybe call
- // all of them?
- {
- lock_guard<mutex> l(callbacks_lock_);
- if (callbacks_.size() != 0) {
- callbacks_it_->second();
- if (++callbacks_it_ == callbacks_.end()) callbacks_it_ = callbacks_.begin();
- }
- }
- }
- VLOG_QUERY << "Leaving VCore acquisition thread: " << reservation_id;
-}
-
-bool QueryResourceMgr::ShouldExit() {
- lock_guard<mutex> l(exit_lock_);
- return exit_;
-}
-
-void QueryResourceMgr::Shutdown() {
- {
- lock_guard<mutex> l(exit_lock_);
- if (exit_) return;
- exit_ = true;
- }
- {
- lock_guard<mutex> l(callbacks_lock_);
- callbacks_.clear();
- }
- threads_changed_cv_.notify_all();
-}
-
-QueryResourceMgr::~QueryResourceMgr() {
- if (acquire_vcore_thread_.get() == NULL) return;
- if (!ShouldExit()) Shutdown();
- // First, set the early exit flag. Then check to see if the thread is in Expand(). If
- // so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes
- // Expand(), and will exit immediately. It's therefore safe not to wait for it.
- early_exit_->Add(1L);
- if (thread_in_expand_->Add(0L) == 0L) {
- acquire_vcore_thread_->Join();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h
deleted file mode 100644
index 10da312..0000000
--- a/be/src/scheduling/query-resource-mgr.h
+++ /dev/null
@@ -1,227 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef SCHEDULING_QUERY_RESOURCE_MGR_H
-#define SCHEDULING_QUERY_RESOURCE_MGR_H
-
-#include "common/atomic.h"
-#include "common/status.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ResourceBrokerService.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/Frontend_types.h"
-#include "util/promise.h"
-#include "util/thread.h"
-
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-#include <string>
-
-namespace impala {
-
-class ResourceBroker;
-
-/// Utility class to map hosts to the Llama-registered resource-holding hosts
-/// (i.e. datanodes).
-class ResourceResolver {
- public:
- ResourceResolver(const boost::unordered_set<TNetworkAddress>& unique_hosts);
-
- /// Translates src into a network address suitable for identifying resources across
- /// interactions with the Llama. The MiniLlama expects resources to be requested on
- /// IP:port addresses of Hadoop DNs, whereas the regular Llama only deals with the
- /// hostnames of Yarn NMs. For MiniLlama setups this translation uses the
- /// impalad_to_dn_ mapping to populate dest. When using the regular Llama, this
- /// translation sets a fixed port of 0 in dest because the Llama strips away the port
- /// of resource locations.
- void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst);
-
- private:
- /// Impala mini clusters using the Mini Llama require translating the impalad hostports
- /// to Hadoop DN hostports registered with the Llama during resource requests
- /// (and then in reverse for translating granted resources to impalads).
- /// These maps form a bi-directional hostport mapping Hadoop DN <-> impalad.
- boost::unordered_map<TNetworkAddress, TNetworkAddress> impalad_to_dn_;
- boost::unordered_map<TNetworkAddress, TNetworkAddress> dn_to_impalad_;
-
- /// Called only in pseudo-distributed setups (i.e. testing only) to populate
- /// impalad_to_dn_ and dn_to_impalad_
- void CreateLocalLlamaNodeMapping(
- const boost::unordered_set<TNetworkAddress>& unique_hosts);
-};
-
-/// Tracks all the state necessary to create expansion requests for all fragments of a
-/// single query on a single node. Code that might need to expand the memory reservation
-/// for this query (i.e. MemTracker) can use this class to construct expansion requests
-/// that may then be submitted to the ResourceBroker.
-//
-/// If InitCpuAcquisition() is called, this class will monitor the thread token to VCore
-/// ratio (thread consumers must use NotifyThreadUsageChange() to update the thread
-/// consumption count). If the ratio gets too high (see AboveVcoreSubscriptionThreshold()
-/// for details), we will try to acquire more VCore resources from Llama asynchronously.
-/// If the ratio passes a higher threshold (see IsVcoreOverSubscribed()), we say that the
-/// query fragments are currently oversubscribing their VCore resources.
-//
-/// Threads are typically handed to a fragment by the thread resource manager, which deals
-/// in tokens. When a fragment wants to use a token to start a thread, it should only do so
-/// if the ratio of threads to VCores (which map directly onto cgroup shares) is not too
-/// large. If it is too large - i.e. the VCores are oversubscribed - the fragment should
-/// wait to spin up a new threads until more VCore resources are acquired as above. To help
-/// with this, each fragment may register one or more callbacks with their
-/// QueryResourceMgr; when more VCore resources are acquired the callbacks are invoked in
-/// round-robin fashion. The callback should try and re-acquire the previously untaken
-/// thread token, and then a new thread may be started.
-//
-/// Only CPU-heavy threads need be managed using this class.
-//
-/// TODO: Handle reducing the number of VCores when threads finish.
-/// TODO: Consider combining more closely with ThreadResourceMgr.
-/// TODO: Add counters to RuntimeProfile to track resources.
-class QueryResourceMgr {
- public:
- QueryResourceMgr(const TUniqueId& reservation_id,
- const TNetworkAddress& local_resource_location, const TUniqueId& query_id);
-
- /// Must be called only once. Starts a separate thread to monitor thread consumption,
- /// which asks for more VCores from Llama periodically.
- void InitVcoreAcquisition(int32_t init_vcores);
-
- /// Should be used to check if another thread token may be acquired by this
- /// query. Fragments may ignore this when acquiring a new CPU token, but the result will
- /// be a larger thread:VCore ratio.
- //
- /// Note that this threshold is larger than the one in
- /// AboveVcoreSubscriptionThreshold(). We want to start acquiring more VCore allocations
- /// before we get so oversubscribed that adding new threads is considered a bad idea.
- inline bool IsVcoreOverSubscribed() {
- boost::lock_guard<boost::mutex> l(threads_running_lock_);
- return threads_running_ > vcores_ * max_vcore_oversubscription_ratio_;
- }
-
- /// Called when thread consumption goes up or down. If the total consumption goes above a
- /// subscription threshold, the acquisition thread will be woken to ask for more VCores.
- void NotifyThreadUsageChange(int delta);
-
- /// All callbacks registered here are called in round-robin fashion when more VCores are
- /// acquired. Returns a unique ID that can be used as an argument to
- /// RemoveVcoreAvailableCb().
- typedef boost::function<void ()> VcoreAvailableCb;
- int32_t AddVcoreAvailableCb(const VcoreAvailableCb& callback);
-
- /// Removes the callback with the given ID.
- void RemoveVcoreAvailableCb(int32_t callback_id);
-
- /// Request an expansion of requested_bytes. If the expansion can be fulfilled within
- /// the timeout period, the number of bytes allocated is returned in allocated_bytes
- /// (which may be more than requested). Otherwise an error status is returned.
- Status RequestMemExpansion(int64_t requested_bytes, int64_t* allocated_bytes);
-
- /// Sets the exit flag for the VCore acquisiton thread, but does not block. Also clears
- /// the set of callbacks, so that after Shutdown() has returned, no callback will be
- /// invoked.
- void Shutdown();
-
- /// Waits for the VCore acquisition thread to stop.
- ~QueryResourceMgr();
-
- const TUniqueId& reservation_id() const { return reservation_id_; }
-
- private:
- /// ID of the single reservation corresponding to this query
- TUniqueId reservation_id_;
-
- /// Query ID of the query this class manages resources for.
- TUniqueId query_id_;
-
- /// Network address of the local service registered with Llama. Usually corresponds to
- /// <local-address>:0, unless a pseudo-dstributed Llama is being used (see
- /// ResourceResolver::CreateLocalLlamaNodeMapping()).
- TNetworkAddress local_resource_location_;
-
- /// Used to control shutdown of AcquireCpuResources().
- boost::mutex exit_lock_;
- bool exit_;
-
- /// Protects callbacks_ and callbacks_it_
- boost::mutex callbacks_lock_;
-
- /// List of callbacks to notify when a new VCore resource is available.
- typedef boost::unordered_map<int32_t, VcoreAvailableCb> CallbackMap;
- CallbackMap callbacks_;
-
- /// Round-robin iterator to notify callbacks about new VCores one at a time.
- CallbackMap::iterator callbacks_it_;
-
- /// Total number of callbacks that were ever registered. Used to give each callback a
- /// unique ID so that they can be removed.
- int32_t callback_count_;
-
- /// Protects threads_running_, threads_changed_cv_ and vcores_.
- boost::mutex threads_running_lock_;
-
- /// Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange().
- boost::condition_variable threads_changed_cv_;
-
- /// The number of threads we know to be running on behalf of this query.
- int64_t threads_running_;
-
- /// The number of VCores acquired for this node for this query.
- int64_t vcores_;
-
- /// Set to FLAGS_max_vcore_oversubscription_ratio in the constructor. If the ratio of
- /// threads to VCores exceeds this number, no more threads may be executed by this query
- /// until more VCore resources are acquired.
- float max_vcore_oversubscription_ratio_;
-
- /// Runs AcquireVcoreResources() after InitVcoreAcquisition() is called.
- boost::scoped_ptr<Thread> acquire_vcore_thread_;
-
- /// Signals to the vcore acquisition thread that it should exit after it exits from any
- /// pending Expand() call. Is a shared_ptr so that it will remain valid even after the
- /// parent QueryResourceMgr has been destroyed.
- /// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a
- /// thing.
- std::shared_ptr<AtomicInt32> early_exit_;
-
- /// Signals to the destructor that the vcore acquisition thread is currently in an
- /// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread
- /// to exit.
- std::shared_ptr<AtomicInt32> thread_in_expand_;
-
- /// Creates the llama resource for the memory and/or cores specified, associated with
- /// the reservation context.
- llama::TResource CreateResource(int64_t memory_mb, int64_t vcores);
-
- /// Run as a thread owned by acquire_cpu_thread_. Waits for notification from
- /// NotifyThreadUsageChange(), then checks the subscription level to decide if more
- /// VCores are needed, and starts a new expansion request if so.
- void AcquireVcoreResources(std::shared_ptr<AtomicInt32 > thread_in_expand,
- std::shared_ptr<AtomicInt32> early_exit);
-
- /// True if thread:VCore subscription is too high, meaning more VCores are required.
- /// Must be called holding threads_running_ lock.
- bool AboveVcoreSubscriptionThreshold();
-
- /// Notifies acquire_cpu_thread_ that it should terminate. Does not block.
- bool ShouldExit();
-};
-
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index f893f1c..b5745c4 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -28,7 +28,6 @@
#include "util/uid-util.h"
#include "util/debug-util.h"
#include "util/parse-util.h"
-#include "util/llama-util.h"
#include "common/names.h"
@@ -36,28 +35,13 @@ using boost::uuids::random_generator;
using boost::uuids::uuid;
using namespace impala;
-DEFINE_bool(rm_always_use_defaults, false, "If true, all queries use the same initial"
- " resource requests regardless of their computed resource estimates. Only meaningful "
- "if --enable_rm is set.");
-DEFINE_string(rm_default_memory, "4G", "The initial amount of memory that"
- " a query should reserve on each node if either it does not have an available "
- "estimate, or if --rm_always_use_defaults is set.");
-DEFINE_int32(rm_default_cpu_vcores, 2, "The initial number of virtual cores that"
- " a query should reserve on each node if either it does not have an available "
- "estimate, or if --rm_always_use_defaults is set.");
-
+// TODO: Remove for Impala 3.0.
+DEFINE_bool(rm_always_use_defaults, false, "Deprecated");
+DEFINE_string(rm_default_memory, "4G", "Deprecated");
+DEFINE_int32(rm_default_cpu_vcores, 2, "Deprecated");
namespace impala {
-// Default value for the request_timeout in a reservation request. The timeout is the
-// max time in milliseconds to wait for a resource request to be fulfilled by Llama.
-// The default value of five minutes was determined to be reasonable based on
-// experiments on a 20-node cluster with TPCDS 15TB and 8 concurrent clients.
-// Over 30% of queries timed out with a reservation timeout of 1 minute but only less
-// than 5% timed out when using 5 minutes. Still, the default value is somewhat
-// arbitrary and a good value is workload dependent.
-const int64_t DEFAULT_REQUEST_TIMEOUT_MS = 5 * 60 * 1000;
-
QuerySchedule::QuerySchedule(const TUniqueId& query_id,
const TQueryExecRequest& request, const TQueryOptions& query_options,
RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
@@ -97,10 +81,9 @@ int64_t QuerySchedule::GetClusterMemoryEstimate() const {
int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
// Precedence of different estimate sources is:
// user-supplied RM query option >
- // server-side defaults (if rm_always_use_defaults == true) >
// query option limit >
// estimate >
- // server-side defaults (if rm_always_use_defaults == false)
+ // server-side defaults
int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
bool has_query_option = false;
if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) {
@@ -116,12 +99,10 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
}
int64_t per_host_mem = 0L;
+ // TODO: Remove rm_initial_mem and associated logic when we're sure that clients won't
+ // be affected.
if (query_options_.__isset.rm_initial_mem && query_options_.rm_initial_mem > 0) {
per_host_mem = query_options_.rm_initial_mem;
- } else if (FLAGS_rm_always_use_defaults) {
- bool ignored;
- per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory,
- &ignored, 0);
} else if (has_query_option) {
per_host_mem = query_option_memory_limit;
} else if (has_estimate) {
@@ -134,115 +115,11 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
}
// Cap the memory estimate at the amount of physical memory available. The user's
// provided value or the estimate from planning can each be unreasonable.
- // TODO: Get this limit from Llama (Yarn sets it).
return min(per_host_mem, MemInfo::physical_mem());
}
-int16_t QuerySchedule::GetPerHostVCores() const {
- // Precedence of different estimate sources is:
- // server-side defaults (if rm_always_use_defaults == true) >
- // computed estimates
- // server-side defaults (if rm_always_use_defaults == false)
- int16_t v_cpu_cores = FLAGS_rm_default_cpu_vcores;
- if (!FLAGS_rm_always_use_defaults && query_options_.__isset.v_cpu_cores &&
- query_options_.v_cpu_cores > 0) {
- v_cpu_cores = query_options_.v_cpu_cores;
- }
-
- return v_cpu_cores;
-}
-
-void QuerySchedule::GetResourceHostport(const TNetworkAddress& src,
- TNetworkAddress* dst) {
- DCHECK(dst != NULL);
- DCHECK(resource_resolver_.get() != NULL)
- << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
- resource_resolver_->GetResourceHostport(src, dst);
-}
-
void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_hosts) {
unique_hosts_ = unique_hosts;
- resource_resolver_.reset(new ResourceResolver(unique_hosts_));
-}
-
-void QuerySchedule::PrepareReservationRequest(const string& pool, const string& user) {
- reservation_request_.resources.clear();
- reservation_request_.version = TResourceBrokerServiceVersion::V1;
- reservation_request_.queue = pool;
- reservation_request_.gang = true;
- // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
- // Llama checks group membership based on the short name of the principal.
- reservation_request_.user = llama::GetShortName(user);
-
- // Set optional request timeout from query options.
- if (query_options_.__isset.reservation_request_timeout) {
- DCHECK_GT(query_options_.reservation_request_timeout, 0);
- reservation_request_.__set_request_timeout(
- query_options_.reservation_request_timeout);
- }
-
- // Set the reservation timeout from the query options or use a default.
- int64_t timeout = DEFAULT_REQUEST_TIMEOUT_MS;
- if (query_options_.__isset.reservation_request_timeout) {
- timeout = query_options_.reservation_request_timeout;
- }
- reservation_request_.__set_request_timeout(timeout);
-
- int32_t memory_mb = GetPerHostMemoryEstimate() / 1024 / 1024;
- int32_t v_cpu_cores = GetPerHostVCores();
- // The memory_mb and v_cpu_cores estimates may legitimately be zero,
- // e.g., for constant selects. Do not reserve any resources in those cases.
- if (memory_mb == 0 && v_cpu_cores == 0) return;
-
- DCHECK(resource_resolver_.get() != NULL)
- << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
- random_generator uuid_generator;
- for (const TNetworkAddress& host: unique_hosts_) {
- reservation_request_.resources.push_back(llama::TResource());
- llama::TResource& resource = reservation_request_.resources.back();
- uuid id = uuid_generator();
- resource.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
- resource.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
- resource.enforcement = llama::TLocationEnforcement::MUST;
-
- TNetworkAddress resource_hostport;
- resource_resolver_->GetResourceHostport(host, &resource_hostport);
- stringstream ss;
- ss << resource_hostport;
- resource.askedLocation = ss.str();
- resource.memory_mb = memory_mb;
- resource.v_cpu_cores = v_cpu_cores;
- }
-}
-
-Status QuerySchedule::ValidateReservation() {
- if (!HasReservation()) return Status("Query schedule does not have a reservation.");
- vector<TNetworkAddress> hosts_missing_resources;
- ResourceResolver resolver(unique_hosts_);
- for (const FragmentExecParams& params: fragment_exec_params_) {
- for (const TNetworkAddress& host: params.hosts) {
- // Ignore the coordinator host which is not contained in unique_hosts_.
- if (unique_hosts_.find(host) == unique_hosts_.end()) continue;
- TNetworkAddress resource_hostport;
- resolver.GetResourceHostport(host, &resource_hostport);
- if (reservation_.allocated_resources.find(resource_hostport) ==
- reservation_.allocated_resources.end()) {
- hosts_missing_resources.push_back(host);
- }
- }
- }
- if (!hosts_missing_resources.empty()) {
- stringstream ss;
- ss << "Failed to validate reservation " << reservation_.reservation_id << "." << endl
- << "Missing resources for hosts [";
- for (int i = 0; i < hosts_missing_resources.size(); ++i) {
- ss << hosts_missing_resources[i];
- if (i + 1 != hosts_missing_resources.size()) ss << ", ";
- }
- ss << "]";
- return Status(ss.str());
- }
- return Status::OK();
}
}