You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/09/21 04:00:41 UTC

[4/7] incubator-impala git commit: IMPALA-4160: Remove Llama support.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 089ada1..0f93875 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -635,7 +635,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     DiskIoMgr io_mgr(1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&root_mem_tracker));
-    MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+    MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
     DiskIoRequestContext* reader;
     ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 
@@ -950,7 +950,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
   ASSERT_OK(io_mgr.Init(&root_mem_tracker));
   ASSERT_EQ(root_mem_tracker.consumption(), 0);
 
-  MemTracker reader_mem_tracker(-1, -1, "Reader", &root_mem_tracker);
+  MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
   DiskIoRequestContext* reader;
   ASSERT_OK(io_mgr.RegisterContext(&reader, &reader_mem_tracker));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index 5df69ed..dd45932 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -370,9 +370,9 @@ DiskIoMgr::~DiskIoMgr() {
 Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
   DCHECK(process_mem_tracker != NULL);
   free_buffer_mem_tracker_.reset(
-      new MemTracker(-1, -1, "Free Disk IO Buffers", process_mem_tracker, false));
+      new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
   unowned_buffer_mem_tracker_.reset(
-      new MemTracker(-1, -1, "Untracked Disk IO Buffers", process_mem_tracker, false));
+      new MemTracker(-1, "Untracked Disk IO Buffers", process_mem_tracker, false));
   // If we hit the process limit, see if we can reclaim some memory by removing
   // previously allocated (but unused) io buffers.
   process_mem_tracker->AddGcFunction(bind(&DiskIoMgr::GcIoBuffers, this));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 38e9f96..1b3fa14 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -24,7 +24,6 @@
 #include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
-#include "resourcebroker/resource-broker.h"
 #include "runtime/backend-client.h"
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
@@ -51,7 +50,6 @@
 #include "util/webserver.h"
 #include "util/mem-info.h"
 #include "util/debug-util.h"
-#include "util/cgroups-mgr.h"
 #include "util/memory-metrics.h"
 #include "util/pretty-printer.h"
 #include "util/thread-pool.h"
@@ -84,49 +82,21 @@ DECLARE_int32(num_cores);
 DECLARE_int32(be_port);
 DECLARE_string(mem_limit);
 
-DEFINE_bool(enable_rm, false, "Whether to enable resource management. If enabled, "
-                              "-fair_scheduler_allocation_path is required.");
-DEFINE_int32(llama_callback_port, 28000,
-             "Port where Llama notification callback should be started");
-// TODO: Deprecate llama_host and llama_port in favor of the new llama_hostports.
-// This needs to be coordinated with CM.
-DEFINE_string(llama_host, "",
-              "Host of Llama service that the resource broker should connect to");
-DEFINE_int32(llama_port, 15000,
-             "Port of Llama service that the resource broker should connect to");
-DEFINE_string(llama_addresses, "",
-             "Llama availability group given as a comma-separated list of hostports.");
-DEFINE_int64(llama_registration_timeout_secs, 30,
-             "Maximum number of seconds that Impala will attempt to (re-)register "
-             "with Llama before aborting the triggering action with an error "
-             "(e.g. Impalad startup or a Llama RPC request). "
-             "A setting of -1 means try indefinitely.");
-DEFINE_int64(llama_registration_wait_secs, 3,
-             "Number of seconds to wait between attempts during Llama registration.");
-DEFINE_int64(llama_max_request_attempts, 5,
-             "Maximum number of times a non-registration Llama RPC request "
-             "(reserve/expand/release, etc.) is retried until the request is aborted. "
-             "An attempt is counted once Impala is registered with Llama, i.e., a "
-             "request survives at most llama_max_request_attempts-1 re-registrations.");
-DEFINE_string(cgroup_hierarchy_path, "", "If Resource Management is enabled, this must "
-    "be set to the Impala-writeable root of the cgroups hierarchy into which execution "
-    "threads are assigned.");
-DEFINE_string(staging_cgroup, "impala_staging", "Name of the cgroup that a query's "
-    "execution threads are moved into once the query completes.");
-
-// Use a low default value because the reconnection logic is performed manually
-// for the purpose of faster Llama failover (otherwise we may try to reconnect to the
-// inactive Llama for a long time).
-DEFINE_int32(resource_broker_cnxn_attempts, 1, "The number of times to retry an "
-    "RPC connection to Llama. A setting of 0 means retry indefinitely");
-DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
-    "to wait between attempts to make an RPC connection to the Llama.");
-DEFINE_int32(resource_broker_send_timeout, 0, "Time to wait, in ms, "
-    "for the underlying socket of an RPC to Llama to successfully send data. "
-    "A setting of 0 means the socket will wait indefinitely.");
-DEFINE_int32(resource_broker_recv_timeout, 0, "Time to wait, in ms, "
-    "for the underlying socket of an RPC to Llama to successfully receive data. "
-    "A setting of 0 means the socket will wait indefinitely.");
+// TODO: Remove the following RM-related flags in Impala 3.0.
+DEFINE_bool(enable_rm, false, "Deprecated");
+DEFINE_int32(llama_callback_port, 28000, "Deprecated");
+DEFINE_string(llama_host, "", "Deprecated");
+DEFINE_int32(llama_port, 15000, "Deprecated");
+DEFINE_string(llama_addresses, "", "Deprecated");
+DEFINE_int64(llama_registration_timeout_secs, 30, "Deprecated");
+DEFINE_int64(llama_registration_wait_secs, 3, "Deprecated");
+DEFINE_int64(llama_max_request_attempts, 5, "Deprecated");
+DEFINE_string(cgroup_hierarchy_path, "", "Deprecated");
+DEFINE_string(staging_cgroup, "impala_staging", "Deprecated");
+DEFINE_int32(resource_broker_cnxn_attempts, 1, "Deprecated");
+DEFINE_int32(resource_broker_cnxn_retry_interval_ms, 3000, "Deprecated");
+DEFINE_int32(resource_broker_send_timeout, 0, "Deprecated");
+DEFINE_int32(resource_broker_recv_timeout, 0, "Deprecated");
 
 DEFINE_int32(coordinator_rpc_threads, 12, "(Advanced) Number of threads available to "
     "start fragments on remote Impala daemons.");
@@ -145,11 +115,6 @@ DEFINE_int32(catalog_client_connection_num_retries, 3, "Retry catalog connection
 DEFINE_int32(catalog_client_rpc_timeout_ms, 0, "(Advanced) The underlying TSocket "
     "send/recv timeout in milliseconds for a catalog client RPC.");
 
-// The key for a variable set in Impala's test environment only, to allow the
-// resource-broker to correctly map node addresses into a form that Llama understand.
-const static string PSEUDO_DISTRIBUTED_CONFIG_KEY =
-    "yarn.scheduler.include-port-in-node-name";
-
 const static string DEFAULT_FS = "fs.defaultFS";
 
 namespace impala {
@@ -160,35 +125,29 @@ ExecEnv::ExecEnv()
   : metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
-        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
-            0, FLAGS_backend_client_rpc_timeout_ms,
-            FLAGS_backend_client_rpc_timeout_ms,
-            "", !FLAGS_ssl_client_ca_certificate.empty())),
+        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
+            FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
+            !FLAGS_ssl_client_ca_certificate.empty())),
     catalogd_client_cache_(
-        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
-            0, FLAGS_catalog_client_rpc_timeout_ms,
-            FLAGS_catalog_client_rpc_timeout_ms,
-            "", !FLAGS_ssl_client_ca_certificate.empty())),
+        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0,
+            FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
+            !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver()),
     mem_tracker_(NULL),
     thread_mgr_(new ThreadResourceMgr),
-    cgroups_mgr_(NULL),
     hdfs_op_thread_pool_(
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
     tmp_file_mgr_(new TmpFileMgr),
     request_pool_service_(new RequestPoolService(metrics_.get())),
     frontend_(new Frontend()),
-    fragment_exec_thread_pool_(
-        new CallableThreadPool("coordinator-fragment-rpc", "worker",
-            FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
+    fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
+        "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     enable_webserver_(FLAGS_enable_webserver),
     is_fe_tests_(false),
-    backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
-    is_pseudo_distributed_llama_(false) {
-  if (FLAGS_enable_rm) InitRm();
+    backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
   // Initialize the scheduler either dynamically (with a statestore) or statically (with
   // a standalone single backend)
   if (FLAGS_use_statestore) {
@@ -202,33 +161,30 @@ ExecEnv::ExecEnv()
         subscriber_address, statestore_address, metrics_.get()));
 
     scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), backend_address_, metrics_.get(),
-        webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
+        statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
+        request_pool_service_.get()));
   } else {
     vector<TNetworkAddress> addresses;
     addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
-    scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
-        resource_broker_.get(), request_pool_service_.get()));
+    scheduler_.reset(new SimpleScheduler(
+        addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
   }
   if (exec_env_ == NULL) exec_env_ = this;
-  if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
 }
 
 // TODO: Need refactor to get rid of duplicated code.
 ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
-                 int webserver_port, const string& statestore_host, int statestore_port)
+    int webserver_port, const string& statestore_host, int statestore_port)
   : metrics_(new MetricGroup("impala-metrics")),
     stream_mgr_(new DataStreamMgr(metrics_.get())),
     impalad_client_cache_(
-        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries,
-            0, FLAGS_backend_client_rpc_timeout_ms,
-            FLAGS_backend_client_rpc_timeout_ms,
-            "", !FLAGS_ssl_client_ca_certificate.empty())),
+        new ImpalaBackendClientCache(FLAGS_backend_client_connection_num_retries, 0,
+            FLAGS_backend_client_rpc_timeout_ms, FLAGS_backend_client_rpc_timeout_ms, "",
+            !FLAGS_ssl_client_ca_certificate.empty())),
     catalogd_client_cache_(
-        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries,
-            0, FLAGS_catalog_client_rpc_timeout_ms,
-            FLAGS_catalog_client_rpc_timeout_ms,
-            "", !FLAGS_ssl_client_ca_certificate.empty())),
+        new CatalogServiceClientCache(FLAGS_catalog_client_connection_num_retries, 0,
+            FLAGS_catalog_client_rpc_timeout_ms, FLAGS_catalog_client_rpc_timeout_ms, "",
+            !FLAGS_ssl_client_ca_certificate.empty())),
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver(webserver_port)),
@@ -238,16 +194,13 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
         CreateHdfsOpThreadPool("hdfs-worker-pool", FLAGS_num_hdfs_worker_threads, 1024)),
     tmp_file_mgr_(new TmpFileMgr),
     frontend_(new Frontend()),
-    fragment_exec_thread_pool_(
-        new CallableThreadPool("coordinator-fragment-rpc", "worker",
-            FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
+    fragment_exec_thread_pool_(new CallableThreadPool("coordinator-fragment-rpc",
+        "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())),
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
-    backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)),
-    is_pseudo_distributed_llama_(false) {
+    backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
   request_pool_service_.reset(new RequestPoolService(metrics_.get()));
-  if (FLAGS_enable_rm) InitRm();
 
   if (FLAGS_use_statestore && statestore_port > 0) {
     TNetworkAddress subscriber_address =
@@ -260,73 +213,23 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
         subscriber_address, statestore_address, metrics_.get()));
 
     scheduler_.reset(new SimpleScheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), backend_address_, metrics_.get(),
-        webserver_.get(), resource_broker_.get(), request_pool_service_.get()));
+        statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
+        request_pool_service_.get()));
   } else {
     vector<TNetworkAddress> addresses;
     addresses.push_back(MakeNetworkAddress(hostname, backend_port));
-    scheduler_.reset(new SimpleScheduler(addresses, metrics_.get(), webserver_.get(),
-        resource_broker_.get(), request_pool_service_.get()));
+    scheduler_.reset(new SimpleScheduler(
+        addresses, metrics_.get(), webserver_.get(), request_pool_service_.get()));
   }
   if (exec_env_ == NULL) exec_env_ = this;
-  if (FLAGS_enable_rm) resource_broker_->set_scheduler(scheduler_.get());
 }
 
-void ExecEnv::InitRm() {
-  // Unique addresses from FLAGS_llama_addresses and FLAGS_llama_host/FLAGS_llama_port.
-  vector<TNetworkAddress> llama_addresses;
-  if (!FLAGS_llama_addresses.empty()) {
-    vector<string> components;
-    split(components, FLAGS_llama_addresses, is_any_of(","), token_compress_on);
-    for (int i = 0; i < components.size(); ++i) {
-      to_lower(components[i]);
-      TNetworkAddress llama_address = MakeNetworkAddress(components[i]);
-      if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
-          == llama_addresses.end()) {
-        llama_addresses.push_back(llama_address);
-      }
-    }
-  }
-  // Add Llama hostport from deprecated flags (if it does not already exist).
-  if (!FLAGS_llama_host.empty()) {
-    to_lower(FLAGS_llama_host);
-    TNetworkAddress llama_address =
-        MakeNetworkAddress(FLAGS_llama_host, FLAGS_llama_port);
-    if (find(llama_addresses.begin(), llama_addresses.end(), llama_address)
-        == llama_addresses.end()) {
-      llama_addresses.push_back(llama_address);
-    }
-  }
-  for (int i = 0; i < llama_addresses.size(); ++i) {
-    LOG(INFO) << "Llama address " << i << ": " << llama_addresses[i];
-  }
-
-  TNetworkAddress llama_callback_address =
-      MakeNetworkAddress(FLAGS_hostname, FLAGS_llama_callback_port);
-  resource_broker_.reset(new ResourceBroker(llama_addresses, llama_callback_address,
-      metrics_.get()));
-  cgroups_mgr_.reset(new CgroupsMgr(metrics_.get()));
-
-  TGetHadoopConfigRequest config_request;
-  config_request.__set_name(PSEUDO_DISTRIBUTED_CONFIG_KEY);
-  TGetHadoopConfigResponse config_response;
-  frontend_->GetHadoopConfig(config_request, &config_response);
-  if (config_response.__isset.value) {
-    to_lower(config_response.value);
-    is_pseudo_distributed_llama_ = (config_response.value == "true");
-  } else {
-    is_pseudo_distributed_llama_ = false;
-  }
-  if (is_pseudo_distributed_llama_) {
-    LOG(INFO) << "Pseudo-distributed Llama cluster detected";
-  }
-}
 
 ExecEnv::~ExecEnv() {
 }
 
 Status ExecEnv::InitForFeTests() {
-  mem_tracker_.reset(new MemTracker(-1, -1, "Process"));
+  mem_tracker_.reset(new MemTracker(-1, "Process"));
   is_fe_tests_ = true;
   return Status::OK();
 }
@@ -334,15 +237,6 @@ Status ExecEnv::InitForFeTests() {
 Status ExecEnv::StartServices() {
   LOG(INFO) << "Starting global services";
 
-  if (FLAGS_enable_rm) {
-    // Initialize the resource broker to make sure the Llama is up and reachable.
-    DCHECK(resource_broker_.get() != NULL);
-    RETURN_IF_ERROR(resource_broker_->Init());
-    DCHECK(cgroups_mgr_.get() != NULL);
-    RETURN_IF_ERROR(
-        cgroups_mgr_->Init(FLAGS_cgroup_hierarchy_path, FLAGS_staging_cgroup));
-  }
-
   // Initialize global memory limit.
   // Depending on the system configuration, we will have to calculate the process
   // memory limit either based on the available physical memory, or if overcommitting
@@ -397,7 +291,7 @@ Status ExecEnv::StartServices() {
 #ifndef ADDRESS_SANITIZER
   // Limit of -1 means no memory limit.
   mem_tracker_.reset(new MemTracker(TcmallocMetric::PHYSICAL_BYTES_RESERVED,
-      bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
+      bytes_limit > 0 ? bytes_limit : -1, "Process"));
 
   // Since tcmalloc does not free unused memory, we may exceed the process mem limit even
   // if Impala is not actually using that much memory. Add a callback to free any unused
@@ -407,7 +301,7 @@ Status ExecEnv::StartServices() {
 #else
   // tcmalloc metrics aren't defined in ASAN builds, just use the default behavior to
   // track process memory usage (sum of all children trackers).
-  mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, -1, "Process"));
+  mem_tracker_.reset(new MemTracker(bytes_limit > 0 ? bytes_limit : -1, "Process"));
 #endif
 
   mem_tracker_->RegisterMetrics(metrics_.get(), "mem-tracker.process");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index 1f37572..303876f 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -29,7 +29,6 @@
 namespace impala {
 
 class CallableThreadPool;
-class CgroupsMgr;
 class DataStreamMgr;
 class DiskIoMgr;
 class FragmentMgr;
@@ -42,7 +41,6 @@ class MemTracker;
 class MetricGroup;
 class QueryResourceMgr;
 class RequestPoolService;
-class ResourceBroker;
 class Scheduler;
 class StatestoreSubscriber;
 class TestExecEnv;
@@ -89,7 +87,6 @@ class ExecEnv {
   MetricGroup* metrics() { return metrics_.get(); }
   MemTracker* process_mem_tracker() { return mem_tracker_.get(); }
   ThreadResourceMgr* thread_mgr() { return thread_mgr_.get(); }
-  CgroupsMgr* cgroups_mgr() { return cgroups_mgr_.get(); }
   HdfsOpThreadPool* hdfs_op_thread_pool() { return hdfs_op_thread_pool_.get(); }
   TmpFileMgr* tmp_file_mgr() { return tmp_file_mgr_.get(); }
   CallableThreadPool* fragment_exec_thread_pool() {
@@ -102,7 +99,6 @@ class ExecEnv {
 
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
-  ResourceBroker* resource_broker() { return resource_broker_.get(); }
   Scheduler* scheduler() { return scheduler_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
 
@@ -119,11 +115,6 @@ class ExecEnv {
   /// differently.
   bool is_fe_tests() { return is_fe_tests_; }
 
-  /// Returns true if the Llama in use is pseudo-distributed, used for development
-  /// purposes. The pseudo-distributed version has special requirements for specifying
-  /// resource locations.
-  bool is_pseudo_distributed_llama() { return is_pseudo_distributed_llama_; }
-
   /// Returns the configured defaultFs set in core-site.xml
   string default_fs() { return default_fs_; }
 
@@ -131,7 +122,6 @@ class ExecEnv {
   /// Leave protected so that subclasses can override
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<DataStreamMgr> stream_mgr_;
-  boost::scoped_ptr<ResourceBroker> resource_broker_;
   boost::scoped_ptr<Scheduler> scheduler_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
   boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
@@ -141,7 +131,6 @@ class ExecEnv {
   boost::scoped_ptr<Webserver> webserver_;
   boost::scoped_ptr<MemTracker> mem_tracker_;
   boost::scoped_ptr<ThreadResourceMgr> thread_mgr_;
-  boost::scoped_ptr<CgroupsMgr> cgroups_mgr_;
   boost::scoped_ptr<HdfsOpThreadPool> hdfs_op_thread_pool_;
   boost::scoped_ptr<TmpFileMgr> tmp_file_mgr_;
   boost::scoped_ptr<RequestPoolService> request_pool_service_;
@@ -161,16 +150,8 @@ class ExecEnv {
   /// Address of the Impala backend server instance
   TNetworkAddress backend_address_;
 
-  /// True if the cluster has set 'yarn.scheduler.include-port-in-node-name' to true,
-  /// indicating that this cluster is pseudo-distributed. Should not be true in real
-  /// deployments.
-  bool is_pseudo_distributed_llama_;
-
   /// fs.defaultFs value set in core-site.xml
   std::string default_fs_;
-
-  /// Initialise cgroups manager, detect test RM environment and init resource broker.
-  void InitRm();
 };
 
 } // namespace impala

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 99cec9a..604923c 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -224,8 +224,8 @@ TEST(MemPoolTest, ReturnPartial) {
 
 TEST(MemPoolTest, Limits) {
   MemTracker limit3(4 * MemPoolTest::INITIAL_CHUNK_SIZE);
-  MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3);
-  MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, -1, "", &limit3);
+  MemTracker limit1(2 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
+  MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
 
   MemPool* p1 = new MemPool(&limit1);
   EXPECT_FALSE(limit1.AnyLimitExceeded());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 87eec14..546b8ab 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -64,7 +64,7 @@ TEST(MemTestTest, ConsumptionMetric) {
   UIntGauge metric(md, 0);
   EXPECT_EQ(metric.value(), 0);
 
-  MemTracker t(&metric, 100, -1, "");
+  MemTracker t(&metric, 100, "");
   EXPECT_TRUE(t.has_limit());
   EXPECT_EQ(t.consumption(), 0);
 
@@ -112,8 +112,8 @@ TEST(MemTestTest, ConsumptionMetric) {
 
 TEST(MemTestTest, TrackerHierarchy) {
   MemTracker p(100);
-  MemTracker c1(80, -1, "", &p);
-  MemTracker c2(50, -1, "", &p);
+  MemTracker c1(80, "", &p);
+  MemTracker c2(50, "", &p);
 
   // everything below limits
   c1.Consume(60);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index a9ceb76..a9de160 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -23,10 +23,8 @@
 #include <gutil/strings/substitute.h>
 
 #include "bufferpool/reservation-tracker-counters.h"
-#include "resourcebroker/resource-broker.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
-#include "scheduling/query-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/mem-info.h"
 #include "util/pretty-printer.h"
@@ -49,10 +47,9 @@ AtomicInt64 MemTracker::released_memory_since_gc_;
 // Name for request pool MemTrackers. '$0' is replaced with the pool name.
 const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
 
-MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const string& label,
-    MemTracker* parent, bool log_usage_if_zero)
+MemTracker::MemTracker(
+    int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero)
   : limit_(byte_limit),
-    rm_reserved_limit_(rm_reserved_limit),
     label_(label),
     parent_(parent),
     consumption_(&local_counter_),
@@ -60,7 +57,6 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri
     consumption_metric_(NULL),
     auto_unregister_(false),
     log_usage_if_zero_(log_usage_if_zero),
-    query_resource_mgr_(NULL),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
     bytes_over_limit_metric_(NULL),
@@ -69,11 +65,9 @@ MemTracker::MemTracker(int64_t byte_limit, int64_t rm_reserved_limit, const stri
   Init();
 }
 
-MemTracker::MemTracker(
-    RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit,
+MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
     const std::string& label, MemTracker* parent)
   : limit_(byte_limit),
-    rm_reserved_limit_(rm_reserved_limit),
     label_(label),
     parent_(parent),
     consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
@@ -81,7 +75,6 @@ MemTracker::MemTracker(
     consumption_metric_(NULL),
     auto_unregister_(false),
     log_usage_if_zero_(true),
-    query_resource_mgr_(NULL),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
     bytes_over_limit_metric_(NULL),
@@ -90,10 +83,9 @@ MemTracker::MemTracker(
   Init();
 }
 
-MemTracker::MemTracker(UIntGauge* consumption_metric,
-    int64_t byte_limit, int64_t rm_reserved_limit, const string& label)
+MemTracker::MemTracker(
+    UIntGauge* consumption_metric, int64_t byte_limit, const string& label)
   : limit_(byte_limit),
-    rm_reserved_limit_(rm_reserved_limit),
     label_(label),
     parent_(NULL),
     consumption_(&local_counter_),
@@ -101,7 +93,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric,
     consumption_metric_(consumption_metric),
     auto_unregister_(false),
     log_usage_if_zero_(true),
-    query_resource_mgr_(NULL),
     num_gcs_metric_(NULL),
     bytes_freed_by_last_gc_metric_(NULL),
     bytes_over_limit_metric_(NULL),
@@ -111,7 +102,6 @@ MemTracker::MemTracker(UIntGauge* consumption_metric,
 
 void MemTracker::Init() {
   DCHECK_GE(limit_, -1);
-  DCHECK(rm_reserved_limit_ == -1 || limit_ == -1 || rm_reserved_limit_ <= limit_);
   // populate all_trackers_ and limit_trackers_
   MemTracker* tracker = this;
   while (tracker != NULL) {
@@ -173,9 +163,8 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name,
   } else {
     if (parent == NULL) return NULL;
     // First time this pool_name registered, make a new object.
-    MemTracker* tracker = new MemTracker(-1, -1,
-          Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name),
-          parent);
+    MemTracker* tracker = new MemTracker(
+        -1, Substitute(REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT, pool_name), parent);
     tracker->auto_unregister_ = true;
     tracker->pool_name_ = pool_name;
     pool_to_mem_trackers_[pool_name] = tracker;
@@ -184,8 +173,7 @@ MemTracker* MemTracker::GetRequestPoolMemTracker(const string& pool_name,
 }
 
 shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
-    const TUniqueId& id, int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
-    QueryResourceMgr* res_mgr) {
+    const TUniqueId& id, int64_t byte_limit, MemTracker* parent) {
   if (byte_limit != -1) {
     if (byte_limit > MemInfo::physical_mem()) {
       LOG(WARNING) << "Memory limit "
@@ -210,12 +198,11 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
   } else {
     // First time this id registered, make a new object.  Give a shared ptr to
     // the caller and put a weak ptr in the map.
-    shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit,
-        rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
+    shared_ptr<MemTracker> tracker = make_shared<MemTracker>(
+        byte_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
     tracker->auto_unregister_ = true;
     tracker->query_id_ = id;
     request_to_mem_trackers_[id] = tracker;
-    if (res_mgr != NULL) tracker->SetQueryResourceMgr(res_mgr);
     return tracker;
   }
 }
@@ -278,9 +265,6 @@ string MemTracker::LogUsage(const string& prefix) const {
   ss << prefix << label_ << ":";
   if (CheckLimitExceeded()) ss << " memory limit exceeded.";
   if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
-  if (rm_reserved_limit_ > 0) {
-    ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES);
-  }
 
   int64_t total = consumption();
   int64_t peak = consumption_->value();
@@ -358,45 +342,4 @@ void MemTracker::GcTcmalloc() {
 #endif
 }
 
-bool MemTracker::ExpandRmReservation(int64_t bytes) {
-  if (query_resource_mgr_ == NULL || rm_reserved_limit_ == -1) return false;
-  // TODO: Make this asynchronous after IO mgr changes to use TryConsume() are done.
-  lock_guard<mutex> l(resource_acquisition_lock_);
-  int64_t requested = consumption_->current_value() + bytes;
-  // Can't exceed the hard limit under any circumstance
-  if (requested >= limit_ && limit_ != -1) return false;
-  // Test to see if we can satisfy the limit anyhow; maybe a different request was already
-  // in flight.
-  if (requested < rm_reserved_limit_) return true;
-
-  int64_t bytes_allocated;
-  Status status = query_resource_mgr_->RequestMemExpansion(bytes, &bytes_allocated);
-  if (!status.ok()) {
-    LOG(INFO) << "Failed to expand memory limit by "
-              << PrettyPrinter::Print(bytes, TUnit::BYTES) << ": "
-              << status.GetDetail();
-    return false;
-  }
-
-  for (const MemTracker* tracker: limit_trackers_) {
-    if (tracker == this) continue;
-    if (tracker->consumption_->current_value() + bytes_allocated > tracker->limit_) {
-      // TODO: Allocation may be larger than needed and might exceed some parent
-      // tracker limit. IMPALA-2182.
-      VLOG_RPC << "Failed to use " << bytes_allocated << " bytes allocated over "
-               << tracker->label() << " tracker limit=" << tracker->limit_
-               << " consumption=" << tracker->consumption();
-      // Don't adjust our limit; rely on query tear-down to release the resource.
-      return false;
-    }
-  }
-
-  rm_reserved_limit_ += bytes_allocated;
-  // Resource broker might give us more than we ask for
-  if (limit_ != -1) rm_reserved_limit_ = min(rm_reserved_limit_, limit_);
-  VLOG_RPC << "Reservation bytes_allocated=" << bytes_allocated << " rm_reserved_limit="
-           << rm_reserved_limit_ << " limit=" << limit_;
-  return true;
-}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index e3548cc..a2c3e9b 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -39,7 +39,6 @@ namespace impala {
 
 class ReservationTrackerCounters;
 class MemTracker;
-class QueryResourceMgr;
 
 /// A MemTracker tracks memory consumption; it contains an optional limit
 /// and can be arranged into a tree structure such that the consumption tracked
@@ -66,19 +65,17 @@ class MemTracker {
   /// 'label' is the label used in the usage string (LogUsage())
   /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included
   /// in LogUsage() output if consumption is 0.
-  MemTracker(int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
-      const std::string& label = std::string(), MemTracker* parent = NULL,
-      bool log_usage_if_zero = true);
+  MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
+      MemTracker* parent = NULL, bool log_usage_if_zero = true);
 
   /// C'tor for tracker for which consumption counter is created as part of a profile.
   /// The counter is created with name COUNTER_NAME.
-  MemTracker(RuntimeProfile* profile, int64_t byte_limit, int64_t rm_reserved_limit = -1,
+  MemTracker(RuntimeProfile* profile, int64_t byte_limit,
       const std::string& label = std::string(), MemTracker* parent = NULL);
 
   /// C'tor for tracker that uses consumption_metric as the consumption value.
   /// Consume()/Release() can still be called. This is used for the process tracker.
-  MemTracker(UIntGauge* consumption_metric,
-      int64_t byte_limit = -1, int64_t rm_reserved_limit = -1,
+  MemTracker(UIntGauge* consumption_metric, int64_t byte_limit = -1,
       const std::string& label = std::string());
 
   ~MemTracker();
@@ -98,9 +95,8 @@ class MemTracker {
   /// 'parent' as the parent tracker.
   /// byte_limit and parent must be the same for all GetMemTracker() calls with the
   /// same id.
-  static std::shared_ptr<MemTracker> GetQueryMemTracker(const TUniqueId& id,
-      int64_t byte_limit, int64_t rm_reserved_limit, MemTracker* parent,
-      QueryResourceMgr* res_mgr);
+  static std::shared_ptr<MemTracker> GetQueryMemTracker(
+      const TUniqueId& id, int64_t byte_limit, MemTracker* parent);
 
   /// Returns a MemTracker object for request pool 'pool_name'. Calling this with the same
   /// 'pool_name' will return the same MemTracker object. This is used to track the local
@@ -112,14 +108,6 @@ class MemTracker {
   static MemTracker* GetRequestPoolMemTracker(const std::string& pool_name,
       MemTracker* parent);
 
-  /// Returns the minimum of limit and rm_reserved_limit
-  int64_t effective_limit() const {
-    // TODO: maybe no limit should be MAX_LONG?
-    DCHECK(rm_reserved_limit_ <= limit_ || limit_ == -1);
-    if (rm_reserved_limit_ == -1) return limit_;
-    return rm_reserved_limit_;
-  }
-
   /// Increases consumption of this tracker and its ancestors by 'bytes'.
   void Consume(int64_t bytes) {
     if (bytes <= 0) {
@@ -166,49 +154,33 @@ class MemTracker {
     if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
     if (UNLIKELY(bytes <= 0)) return true;
     int i;
-    // Walk the tracker tree top-down, to avoid expanding a limit on a child whose parent
-    // won't accommodate the change.
+    // Walk the tracker tree top-down.
     for (i = all_trackers_.size() - 1; i >= 0; --i) {
       MemTracker* tracker = all_trackers_[i];
-      int64_t limit = tracker->effective_limit();
+      const int64_t limit = tracker->limit();
       if (limit < 0) {
         tracker->consumption_->Add(bytes); // No limit at this tracker.
       } else {
-        // If TryConsume fails, we can try to GC or expand the RM reservation, but we may
-        // need to try several times if there are concurrent consumers because we don't
-        // take a lock before trying to update consumption_.
+        // If TryConsume fails, we can try to GC, but we may need to try several times if
+        // there are concurrent consumers because we don't take a lock before trying to
+        // update consumption_.
         while (true) {
           if (LIKELY(tracker->consumption_->TryAdd(bytes, limit))) break;
 
           VLOG_RPC << "TryConsume failed, bytes=" << bytes
                    << " consumption=" << tracker->consumption_->current_value()
-                   << " limit=" << limit << " attempting to GC and expand reservation";
-          // TODO: This may not be right if more than one tracker can actually change its
-          // RM reservation limit.
-          if (UNLIKELY(tracker->GcMemory(limit - bytes) &&
-                  !tracker->ExpandRmReservation(bytes))) {
+                   << " limit=" << limit << " attempting to GC";
+          if (UNLIKELY(tracker->GcMemory(limit - bytes))) {
             DCHECK_GE(i, 0);
             // Failed for this mem tracker. Roll back the ones that succeeded.
-            // TODO: this doesn't roll it back completely since the max values for
-            // the updated trackers aren't decremented. The max values are only used
-            // for error reporting so this is probably okay. Rolling those back is
-            // pretty hard; we'd need something like 2PC.
-            //
-            // TODO: This might leave us with an allocated resource that we can't use.
-            // Specifically, the RM reservation of some ancestors' trackers may have been
-            // expanded only to fail at the current tracker. This may be wasteful as
-            // subsequent TryConsume() never gets to use the reserved resources. Consider
-            // adjusting the reservation of the ancestors' trackers.
             for (int j = all_trackers_.size() - 1; j > i; --j) {
               all_trackers_[j]->consumption_->Add(-bytes);
             }
             return false;
           }
-          VLOG_RPC << "GC or expansion succeeded, TryConsume bytes=" << bytes
+          VLOG_RPC << "GC succeeded, TryConsume bytes=" << bytes
                    << " consumption=" << tracker->consumption_->current_value()
-                   << " new limit=" << tracker->effective_limit() << " prev=" << limit;
-          // Need to update the limit if the RM reservation was expanded.
-          limit = tracker->effective_limit();
+                   << " limit=" << limit;
         }
       }
     }
@@ -363,11 +335,6 @@ class MemTracker {
   /// can cause us to go way over mem limits.
   void GcTcmalloc();
 
-  /// Set the resource mgr to allow expansion of limits (if NULL, no expansion is possible)
-  void SetQueryResourceMgr(QueryResourceMgr* context) {
-    query_resource_mgr_ = context;
-  }
-
   /// Walks the MemTracker hierarchy and populates all_trackers_ and
   /// limit_trackers_
   void Init();
@@ -378,11 +345,6 @@ class MemTracker {
   static std::string LogUsage(const std::string& prefix,
       const std::list<MemTracker*>& trackers);
 
-  /// Try to expand the limit (by asking the resource broker for more memory) by at least
-  /// 'bytes'. Returns false if not possible, true if the request succeeded. May allocate
-  /// more memory than was requested.
-  bool ExpandRmReservation(int64_t bytes);
-
   /// Size, in bytes, that is considered a large value for Release() (or Consume() with
   /// a negative value). If tcmalloc is used, this can trigger it to GC.
   /// A higher value will make us call into tcmalloc less often (and therefore more
@@ -425,11 +387,6 @@ class MemTracker {
   /// there is no consumption limit.
   int64_t limit_;
 
-  /// If > -1, when RM is enabled this is the limit after which this memtracker needs to
-  /// acquire more memory from Llama.
-  /// This limit is always less than or equal to the hard limit.
-  int64_t rm_reserved_limit_;
-
   std::string label_;
   MemTracker* parent_;
 
@@ -476,14 +433,6 @@ class MemTracker {
   /// if consumption is 0.
   bool log_usage_if_zero_;
 
-  /// Lock is taken during ExpandRmReservation() to prevent concurrent acquisition of new
-  /// resources.
-  boost::mutex resource_acquisition_lock_;
-
-  /// If non-NULL, contains all the information required to expand resource reservations if
-  /// required.
-  QueryResourceMgr* query_resource_mgr_;
-
   /// The number of times the GcFunctions were called.
   IntCounter* num_gcs_metric_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc b/be/src/runtime/plan-fragment-executor.cc
index 1e52d08..7300f44 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -32,26 +32,21 @@
 #include "exec/hdfs-scan-node.h"
 #include "exec/hbase-table-scanner.h"
 #include "exprs/expr.h"
-#include "resourcebroker/resource-broker.h"
 #include "runtime/descriptors.h"
 #include "runtime/data-stream-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter-bank.h"
 #include "runtime/mem-tracker.h"
-#include "scheduling/query-resource-mgr.h"
-#include "util/cgroups-mgr.h"
 #include "util/cpu-info.h"
 #include "util/debug-util.h"
 #include "util/container-util.h"
 #include "util/parse-util.h"
 #include "util/mem-info.h"
 #include "util/periodic-counter-updater.h"
-#include "util/llama-util.h"
 #include "util/pretty-printer.h"
 
 DEFINE_bool(serialize_batch, false, "serialize and deserialize each returned row batch");
 DEFINE_int32(status_report_interval, 5, "interval between profile reports; in seconds");
-DECLARE_bool(enable_rm);
 
 #include "common/names.h"
 
@@ -76,9 +71,6 @@ PlanFragmentExecutor::PlanFragmentExecutor(ExecEnv* exec_env,
 
 PlanFragmentExecutor::~PlanFragmentExecutor() {
   Close();
-  if (is_prepared_ && runtime_state_->query_resource_mgr() != NULL) {
-    exec_env_->resource_broker()->UnregisterQueryResourceMgr(query_id_);
-  }
   // at this point, the report thread should have been stopped
   DCHECK(!report_thread_active_);
 }
@@ -100,58 +92,15 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
   VLOG(2) << "fragment_instance_ctx:\n" << ThriftDebugString(fragment_instance_ctx);
 
   DCHECK(request.__isset.fragment_ctx);
-  bool request_has_reserved_resource =
-      request.fragment_instance_ctx.__isset.reserved_resource;
-  if (request_has_reserved_resource) {
-    VLOG_QUERY << "Executing fragment in reserved resource:\n"
-               << request.fragment_instance_ctx.reserved_resource;
-  }
-
-  string cgroup = "";
-  if (FLAGS_enable_rm && request_has_reserved_resource) {
-    cgroup = exec_env_->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
-  }
 
   // Prepare() must not return before runtime_state_ is set if is_prepared_ was
   // set. Having runtime_state_.get() != NULL is a postcondition of this method in that
   // case. Do not call RETURN_IF_ERROR or explicitly return before this line.
-  runtime_state_.reset(new RuntimeState(request, cgroup, exec_env_));
+  runtime_state_.reset(new RuntimeState(request, exec_env_));
 
   // total_time_counter() is in the runtime_state_ so start it up now.
   SCOPED_TIMER(profile()->total_time_counter());
 
-  // Register after setting runtime_state_ to ensure proper cleanup.
-  if (FLAGS_enable_rm && !cgroup.empty() && request_has_reserved_resource) {
-    bool is_first;
-    RETURN_IF_ERROR(exec_env_->cgroups_mgr()->RegisterFragment(
-        request.fragment_instance_ctx.fragment_instance_id, cgroup, &is_first));
-    // The first fragment using cgroup sets the cgroup's CPU shares based on the reserved
-    // resource.
-    if (is_first) {
-      DCHECK(request_has_reserved_resource);
-      int32_t cpu_shares = exec_env_->cgroups_mgr()->VirtualCoresToCpuShares(
-          request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
-      RETURN_IF_ERROR(exec_env_->cgroups_mgr()->SetCpuShares(cgroup, cpu_shares));
-    }
-  }
-
-  // TODO: Find the reservation id when the resource request is not set
-  if (FLAGS_enable_rm && request_has_reserved_resource) {
-    TUniqueId reservation_id;
-    reservation_id << request.fragment_instance_ctx.reserved_resource.reservation_id;
-
-    // TODO: Combine this with RegisterFragment() etc.
-    QueryResourceMgr* res_mgr;
-    bool is_first = exec_env_->resource_broker()->GetQueryResourceMgr(query_id_,
-        reservation_id, request.fragment_instance_ctx.local_resource_address, &res_mgr);
-    DCHECK(res_mgr != NULL);
-    runtime_state_->SetQueryResourceMgr(res_mgr);
-    if (is_first) {
-      runtime_state_->query_resource_mgr()->InitVcoreAcquisition(
-          request.fragment_instance_ctx.reserved_resource.v_cpu_cores);
-    }
-  }
-
   // reservation or a query option.
   int64_t bytes_limit = -1;
   if (runtime_state_->query_options().__isset.mem_limit &&
@@ -161,36 +110,14 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
                << PrettyPrinter::Print(bytes_limit, TUnit::BYTES);
   }
 
-  int64_t rm_reservation_size_bytes = -1;
-  if (request_has_reserved_resource &&
-      request.fragment_instance_ctx.reserved_resource.memory_mb > 0) {
-    int64_t rm_reservation_size_mb =
-      static_cast<int64_t>(request.fragment_instance_ctx.reserved_resource.memory_mb);
-    rm_reservation_size_bytes = rm_reservation_size_mb * 1024L * 1024L;
-    // Queries that use more than the hard limit will be killed, so it's not useful to
-    // have a reservation larger than the hard limit. Clamp reservation bytes limit to the
-    // hard limit (if it exists).
-    if (rm_reservation_size_bytes > bytes_limit && bytes_limit != -1) {
-      runtime_state_->LogError(ErrorMsg(TErrorCode::FRAGMENT_EXECUTOR,
-          PrettyPrinter::PrintBytes(rm_reservation_size_bytes),
-          PrettyPrinter::PrintBytes(bytes_limit)));
-      rm_reservation_size_bytes = bytes_limit;
-    }
-    VLOG_QUERY << "Using RM reservation memory limit from resource reservation: "
-               << PrettyPrinter::Print(rm_reservation_size_bytes, TUnit::BYTES);
-  }
-
   DCHECK(!fragment_instance_ctx.request_pool.empty());
-  runtime_state_->InitMemTrackers(query_id_, &fragment_instance_ctx.request_pool,
-      bytes_limit, rm_reservation_size_bytes);
+  runtime_state_->InitMemTrackers(
+      query_id_, &fragment_instance_ctx.request_pool, bytes_limit);
   RETURN_IF_ERROR(runtime_state_->CreateBlockMgr());
   runtime_state_->InitFilterBank();
 
   // Reserve one main thread from the pool
   runtime_state_->resource_pool()->AcquireThreadToken();
-  if (runtime_state_->query_resource_mgr() != NULL) {
-    runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
-  }
   has_thread_token_ = true;
 
   average_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
@@ -266,8 +193,8 @@ Status PlanFragmentExecutor::Prepare(const TExecPlanFragmentParams& request) {
         obj_pool(), request.fragment_ctx.fragment.output_sink,
         request.fragment_ctx.fragment.output_exprs,
         fragment_instance_ctx, row_desc(), &sink_));
-    sink_mem_tracker_.reset(new MemTracker(-1, -1, sink_->GetName(),
-        runtime_state_->instance_mem_tracker(), true));
+    sink_mem_tracker_.reset(new MemTracker(
+        -1, sink_->GetName(), runtime_state_->instance_mem_tracker(), true));
     RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
 
     RuntimeProfile* sink_profile = sink_->profile();
@@ -565,9 +492,6 @@ void PlanFragmentExecutor::ReleaseThreadToken() {
   if (has_thread_token_) {
     has_thread_token_ = false;
     runtime_state_->resource_pool()->ReleaseThreadToken(true);
-    if (runtime_state_->query_resource_mgr() != NULL) {
-      runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
-    }
     PeriodicCounterUpdater::StopSamplingCounter(average_thread_tokens_);
     PeriodicCounterUpdater::StopTimeSeriesCounter(
         thread_usage_sampled_counter_);
@@ -583,10 +507,6 @@ void PlanFragmentExecutor::Close() {
   }
   // Prepare may not have been called, which sets runtime_state_
   if (runtime_state_.get() != NULL) {
-    if (runtime_state_->query_resource_mgr() != NULL) {
-      exec_env_->cgroups_mgr()->UnregisterFragment(
-          runtime_state_->fragment_instance_id(), runtime_state_->cgroup());
-    }
     if (plan_ != NULL) plan_->Close(runtime_state_.get());
     for (DiskIoRequestContext* context: *runtime_state_->reader_contexts()) {
       runtime_state_->io_mgr()->UnregisterContext(context);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-filter-bank.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-filter-bank.cc b/be/src/runtime/runtime-filter-bank.cc
index 8d47431..a25bf8d 100644
--- a/be/src/runtime/runtime-filter-bank.cc
+++ b/be/src/runtime/runtime-filter-bank.cc
@@ -66,8 +66,8 @@ RuntimeFilterBank::RuntimeFilterBank(const TQueryCtx& query_ctx, RuntimeState* s
   default_filter_size_ =
       BitUtil::RoundUpToPowerOfTwo(min<int64_t>(default_filter_size_, max_filter_size_));
 
-  filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter Bank",
-      state->instance_mem_tracker(), false));
+  filter_mem_tracker_.reset(
+      new MemTracker(-1, "Runtime Filter Bank", state->instance_mem_tracker(), false));
 }
 
 RuntimeFilter* RuntimeFilterBank::RegisterFilter(const TRuntimeFilterDesc& filter_desc,
@@ -226,4 +226,3 @@ void RuntimeFilterBank::Close() {
   filter_mem_tracker_->Release(memory_allocated_->value());
   filter_mem_tracker_->UnregisterFromParent();
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 57aae58..5249076 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -67,18 +67,15 @@ static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024;
 
 namespace impala {
 
-RuntimeState::RuntimeState(const TExecPlanFragmentParams& fragment_params,
-    const string& cgroup, ExecEnv* exec_env)
+RuntimeState::RuntimeState(
+    const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env)
   : obj_pool_(new ObjectPool()),
     fragment_params_(fragment_params),
-    now_(new TimestampValue(query_ctx().now_string.c_str(),
-        query_ctx().now_string.size())),
-    cgroup_(cgroup),
+    now_(new TimestampValue(
+        query_ctx().now_string.c_str(), query_ctx().now_string.size())),
     codegen_expr_(false),
-    profile_(obj_pool_.get(),
-        "Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
+    profile_(obj_pool_.get(), "Fragment " + PrintId(fragment_ctx().fragment_instance_id)),
     is_cancelled_(false),
-    query_resource_mgr_(NULL),
     root_node_id_(-1) {
   Status status = Init(exec_env);
   DCHECK(status.ok()) << status.GetDetail();
@@ -92,7 +89,6 @@ RuntimeState::RuntimeState(const TQueryCtx& query_ctx)
     codegen_expr_(false),
     profile_(obj_pool_.get(), "<unnamed>"),
     is_cancelled_(false),
-    query_resource_mgr_(NULL),
     root_node_id_(-1) {
   fragment_params_.__set_query_ctx(query_ctx);
   fragment_params_.query_ctx.request.query_options.__set_batch_size(DEFAULT_BATCH_SIZE);
@@ -147,18 +143,17 @@ Status RuntimeState::Init(ExecEnv* exec_env) {
   return Status::OK();
 }
 
-void RuntimeState::InitMemTrackers(const TUniqueId& query_id, const string* pool_name,
-    int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes) {
+void RuntimeState::InitMemTrackers(
+    const TUniqueId& query_id, const string* pool_name, int64_t query_bytes_limit) {
   MemTracker* query_parent_tracker = exec_env_->process_mem_tracker();
   if (pool_name != NULL) {
     query_parent_tracker = MemTracker::GetRequestPoolMemTracker(*pool_name,
         query_parent_tracker);
   }
   query_mem_tracker_ =
-      MemTracker::GetQueryMemTracker(query_id, query_bytes_limit,
-          query_rm_reservation_limit_bytes, query_parent_tracker, query_resource_mgr());
-  instance_mem_tracker_.reset(new MemTracker(runtime_profile(), -1, -1,
-      runtime_profile()->name(), query_mem_tracker_.get()));
+      MemTracker::GetQueryMemTracker(query_id, query_bytes_limit, query_parent_tracker);
+  instance_mem_tracker_.reset(new MemTracker(
+      runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker_.get()));
 }
 
 void RuntimeState::InitFilterBank() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/runtime-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h
index b5f7882..0bf9db5 100644
--- a/be/src/runtime/runtime-state.h
+++ b/be/src/runtime/runtime-state.h
@@ -65,8 +65,7 @@ typedef std::map<std::string, std::string> FileMoveMap;
 /// query and shared across all execution nodes of that query.
 class RuntimeState {
  public:
-  RuntimeState(const TExecPlanFragmentParams& fragment_params,
-      const std::string& cgroup, ExecEnv* exec_env);
+  RuntimeState(const TExecPlanFragmentParams& fragment_params, ExecEnv* exec_env);
 
   /// RuntimeState for executing expr in fe-support.
   RuntimeState(const TQueryCtx& query_ctx);
@@ -81,7 +80,7 @@ class RuntimeState {
   /// tracker (in the fifth level). If 'request_pool' is null, no request pool mem
   /// tracker is set up, i.e. query pools will have the process mem pool as the parent.
   void InitMemTrackers(const TUniqueId& query_id, const std::string* request_pool,
-      int64_t query_bytes_limit, int64_t query_rm_reservation_limit_bytes = -1);
+      int64_t query_bytes_limit);
 
   /// Initializes the runtime filter bank. Must be called after InitMemTrackers().
   void InitFilterBank();
@@ -124,7 +123,6 @@ class RuntimeState {
   const TUniqueId& fragment_instance_id() const {
     return fragment_ctx().fragment_instance_id;
   }
-  const std::string& cgroup() const { return cgroup_; }
   ExecEnv* exec_env() { return exec_env_; }
   DataStreamMgr* stream_mgr() { return exec_env_->stream_mgr(); }
   HBaseTableFactory* htable_factory() { return exec_env_->htable_factory(); }
@@ -262,9 +260,6 @@ class RuntimeState {
   /// execution doesn't continue if the query terminates abnormally.
   Status CheckQueryState();
 
-  QueryResourceMgr* query_resource_mgr() const { return query_resource_mgr_; }
-  void SetQueryResourceMgr(QueryResourceMgr* res_mgr) { query_resource_mgr_ = res_mgr; }
-
  private:
   /// Allow TestEnv to set block_mgr manually for testing.
   friend class TestEnv;
@@ -301,9 +296,6 @@ class RuntimeState {
   /// Use pointer to avoid inclusion of timestampvalue.h and avoid clang issues.
   boost::scoped_ptr<TimestampValue> now_;
 
-  /// The Impala-internal cgroup into which execution threads are assigned.
-  /// If empty, no RM is enabled.
-  std::string cgroup_;
   ExecEnv* exec_env_;
   boost::scoped_ptr<LlvmCodeGen> codegen_;
 
@@ -351,10 +343,6 @@ class RuntimeState {
   SpinLock query_status_lock_;
   Status query_status_;
 
-  /// Query-wide resource manager for resource expansion etc. Not owned by us; owned by
-  /// the ResourceBroker instead.
-  QueryResourceMgr* query_resource_mgr_;
-
   /// Reader contexts that need to be closed when the fragment is closed.
   std::vector<DiskIoRequestContext*> reader_contexts_;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index a5818af..8690e39 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -70,7 +70,7 @@ RuntimeState* TestEnv::CreateRuntimeState(int64_t query_id) {
   TExecPlanFragmentParams plan_params = TExecPlanFragmentParams();
   plan_params.query_ctx.query_id.hi = 0;
   plan_params.query_ctx.query_id.lo = query_id;
-  return new RuntimeState(plan_params, "", exec_env_.get());
+  return new RuntimeState(plan_params, exec_env_.get());
 }
 
 Status TestEnv::CreateQueryState(int64_t query_id, int max_buffers, int block_size,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/scheduling/CMakeLists.txt b/be/src/scheduling/CMakeLists.txt
index c5b4eb4..9cfb672 100644
--- a/be/src/scheduling/CMakeLists.txt
+++ b/be/src/scheduling/CMakeLists.txt
@@ -26,7 +26,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/scheduling")
 add_library(Scheduling STATIC
   admission-controller.cc
   backend-config.cc
-  query-resource-mgr.cc
   query-schedule.cc
   request-pool-service.cc
   simple-scheduler.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.cc b/be/src/scheduling/query-resource-mgr.cc
deleted file mode 100644
index abfe085..0000000
--- a/be/src/scheduling/query-resource-mgr.cc
+++ /dev/null
@@ -1,271 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "scheduling/query-resource-mgr.h"
-
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <gutil/strings/substitute.h>
-#include <sstream>
-
-#include "runtime/exec-env.h"
-#include "resourcebroker/resource-broker.h"
-#include "util/bit-util.h"
-#include "util/cgroups-mgr.h"
-#include "util/container-util.h"
-#include "util/network-util.h"
-#include "util/promise.h"
-#include "util/time.h"
-
-#include "common/names.h"
-
-using boost::uuids::random_generator;
-using boost::uuids::uuid;
-using namespace impala;
-using namespace strings;
-
-DEFINE_int64(rm_mem_expansion_timeout_ms, 5000, "The amount of time to wait (ms) "
-    "for a memory expansion request.");
-DEFINE_double(max_vcore_oversubscription_ratio, 2.5, "(Advanced) The maximum ratio "
-    "allowed between running threads and acquired VCore resources for a query's fragments"
-    " on a single node");
-
-ResourceResolver::ResourceResolver(const unordered_set<TNetworkAddress>& unique_hosts) {
-  if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
-    CreateLocalLlamaNodeMapping(unique_hosts);
-  }
-}
-
-void ResourceResolver::GetResourceHostport(const TNetworkAddress& src,
-    TNetworkAddress* dest) {
-  if (ExecEnv::GetInstance()->is_pseudo_distributed_llama()) {
-    *dest = impalad_to_dn_[src];
-  } else {
-    dest->hostname = src.hostname;
-    dest->port = 0;
-  }
-}
-
-void ResourceResolver::CreateLocalLlamaNodeMapping(
-    const unordered_set<TNetworkAddress>& unique_hosts) {
-  DCHECK(ExecEnv::GetInstance()->is_pseudo_distributed_llama());
-  const vector<string>& llama_nodes =
-      ExecEnv::GetInstance()->resource_broker()->llama_nodes();
-  DCHECK(!llama_nodes.empty());
-  int llama_node_ix = 0;
-  for (const TNetworkAddress& host: unique_hosts) {
-    TNetworkAddress dn_hostport = MakeNetworkAddress(llama_nodes[llama_node_ix]);
-    impalad_to_dn_[host] = dn_hostport;
-    dn_to_impalad_[dn_hostport] = host;
-    LOG(INFO) << "Mapping Datanode " << dn_hostport << " to Impalad: " << host;
-    // Round robin the registered Llama nodes.
-    llama_node_ix = (llama_node_ix + 1) % llama_nodes.size();
-  }
-}
-
-QueryResourceMgr::QueryResourceMgr(const TUniqueId& reservation_id,
-    const TNetworkAddress& local_resource_location, const TUniqueId& query_id)
-    : reservation_id_(reservation_id), query_id_(query_id),
-      local_resource_location_(local_resource_location), exit_(false), callback_count_(0),
-      threads_running_(0), vcores_(0) {
-  max_vcore_oversubscription_ratio_ = FLAGS_max_vcore_oversubscription_ratio;
-}
-
-void QueryResourceMgr::InitVcoreAcquisition(int32_t init_vcores) {
-  LOG(INFO) << "Initialising vcore acquisition thread for query " << PrintId(query_id_)
-            << " (" << init_vcores << " initial vcores)";
-  DCHECK(acquire_vcore_thread_.get() == NULL)
-      << "Double initialisation of QueryResourceMgr::InitCpuAcquisition()";
-  vcores_ = init_vcores;
-
-  // These shared pointers to atomic values are used to communicate between the vcore
-  // acquisition thread and the class destructor. If the acquisition thread is in the
-  // middle of an Expand() call, the destructor might have to wait 5s (the default
-  // timeout) to return. This holds up query close operations. So instead check to see if
-  // the thread is in Expand(), and if so we set a synchronised flag early_exit_ which it
-  // inspects immediately after exiting Expand(), and if true, exits before touching any
-  // of the class-wide state (because the destructor may have finished before this point).
-
-  thread_in_expand_.reset(new AtomicInt32());
-  early_exit_.reset(new AtomicInt32());
-  acquire_vcore_thread_.reset(
-      new Thread("resource-mgmt", Substitute("acquire-cpu-$0", PrintId(query_id_)),
-          bind<void>(mem_fn(&QueryResourceMgr::AcquireVcoreResources), this,
-              thread_in_expand_, early_exit_)));
-}
-
-llama::TResource QueryResourceMgr::CreateResource(int64_t memory_mb, int64_t vcores) {
-  DCHECK(memory_mb > 0 || vcores > 0);
-  DCHECK(reservation_id_ != TUniqueId()) << "Expansion requires existing reservation";
-
-  unordered_set<TNetworkAddress> hosts;
-  hosts.insert(local_resource_location_);
-  ResourceResolver resolver(hosts);
-  llama::TResource res;
-  res.memory_mb = memory_mb;
-  res.v_cpu_cores = vcores;
-  TNetworkAddress res_address;
-  resolver.GetResourceHostport(local_resource_location_, &res_address);
-  res.__set_askedLocation(TNetworkAddressToString(res_address));
-
-  random_generator uuid_generator;
-  uuid id = uuid_generator();
-  res.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
-  res.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
-  res.enforcement = llama::TLocationEnforcement::MUST;
-  return res;
-}
-
-bool QueryResourceMgr::AboveVcoreSubscriptionThreshold() {
-  return threads_running_ > vcores_ * (max_vcore_oversubscription_ratio_ * 0.8);
-}
-
-void QueryResourceMgr::NotifyThreadUsageChange(int delta) {
-  lock_guard<mutex> l(threads_running_lock_);
-  threads_running_ += delta;
-  DCHECK(threads_running_ >= 0L);
-  if (AboveVcoreSubscriptionThreshold()) threads_changed_cv_.notify_all();
-}
-
-int32_t QueryResourceMgr::AddVcoreAvailableCb(const VcoreAvailableCb& callback) {
-  lock_guard<mutex> l(callbacks_lock_);
-  callbacks_[callback_count_] = callback;
-  callbacks_it_ = callbacks_.begin();
-  return callback_count_++;
-}
-
-void QueryResourceMgr::RemoveVcoreAvailableCb(int32_t callback_id) {
-  lock_guard<mutex> l(callbacks_lock_);
-  CallbackMap::iterator it = callbacks_.find(callback_id);
-  DCHECK(it != callbacks_.end()) << "Could not find callback with id: " << callback_id;
-  callbacks_.erase(it);
-  callbacks_it_ = callbacks_.begin();
-}
-
-Status QueryResourceMgr::RequestMemExpansion(int64_t requested_bytes,
-    int64_t* allocated_bytes) {
-  DCHECK(allocated_bytes != NULL);
-  *allocated_bytes = 0;
-  int64_t requested_mb = BitUtil::Ceil(requested_bytes, 1024L * 1024L);
-  llama::TResource res = CreateResource(max<int64_t>(1, requested_mb), 0);
-  llama::TUniqueId expansion_id;
-  llama::TAllocatedResource resource;
-  RETURN_IF_ERROR(ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id_,
-      res, FLAGS_rm_mem_expansion_timeout_ms, &expansion_id, &resource));
-
-  DCHECK_EQ(resource.v_cpu_cores, 0L) << "Unexpected VCPUs returned by Llama";
-  *allocated_bytes = resource.memory_mb * 1024L * 1024L;
-  return Status::OK();
-}
-
-void QueryResourceMgr::AcquireVcoreResources(
-    shared_ptr<AtomicInt32> thread_in_expand,
-    shared_ptr<AtomicInt32> early_exit) {
-  // Take a copy because we'd like to print it in some cases after the destructor.
-  TUniqueId reservation_id = reservation_id_;
-  VLOG_QUERY << "Starting Vcore acquisition for: " << reservation_id;
-  while (!ShouldExit()) {
-    {
-      unique_lock<mutex> l(threads_running_lock_);
-      while (!AboveVcoreSubscriptionThreshold() && !ShouldExit()) {
-        threads_changed_cv_.wait(l);
-      }
-    }
-    if (ShouldExit()) break;
-
-    llama::TResource res = CreateResource(0L, 1);
-    VLOG_QUERY << "Expanding VCore allocation: " << reservation_id_;
-
-    // First signal that we are about to enter a blocking Expand() call.
-    thread_in_expand->Add(1L);
-
-    // TODO: Could cause problems if called during or after a system-wide shutdown
-    llama::TAllocatedResource resource;
-    llama::TUniqueId expansion_id;
-    Status status = ExecEnv::GetInstance()->resource_broker()->Expand(reservation_id,
-        res, -1, &expansion_id, &resource);
-    thread_in_expand->Add(-1L);
-    // If signalled to exit quickly by the destructor, exit the loop now. It's important
-    // to do so without accessing any class variables since they may no longer be valid.
-    // Need to check after setting thread_in_expand to avoid a race.
-    if (early_exit->Add(0L) != 0) {
-      VLOG_QUERY << "Fragment finished during Expand(): " << reservation_id;
-      break;
-    }
-    if (!status.ok()) {
-      VLOG_QUERY << "Could not expand CPU resources for query " << PrintId(query_id_)
-                 << ", reservation: " << PrintId(reservation_id_) << ". Error was: "
-                 << status.GetDetail();
-      // Sleep to avoid flooding the resource broker, particularly if requests are being
-      // rejected quickly (and therefore we stay oversubscribed)
-      // TODO: configurable timeout
-      SleepForMs(250);
-      continue;
-    }
-
-    DCHECK(resource.v_cpu_cores == 1)
-        << "Asked for 1 core, got: " << resource.v_cpu_cores;
-    vcores_ += resource.v_cpu_cores;
-
-    ExecEnv* exec_env = ExecEnv::GetInstance();
-    const string& cgroup =
-        exec_env->cgroups_mgr()->UniqueIdToCgroup(PrintId(query_id_, "_"));
-    int32_t num_shares = exec_env->cgroups_mgr()->VirtualCoresToCpuShares(vcores_);
-    exec_env->cgroups_mgr()->SetCpuShares(cgroup, num_shares);
-
-    // TODO: Only call one callback no matter how many VCores we just added; maybe call
-    // all of them?
-    {
-      lock_guard<mutex> l(callbacks_lock_);
-      if (callbacks_.size() != 0) {
-        callbacks_it_->second();
-        if (++callbacks_it_ == callbacks_.end()) callbacks_it_ = callbacks_.begin();
-      }
-    }
-  }
-  VLOG_QUERY << "Leaving VCore acquisition thread: " << reservation_id;
-}
-
-bool QueryResourceMgr::ShouldExit() {
-  lock_guard<mutex> l(exit_lock_);
-  return exit_;
-}
-
-void QueryResourceMgr::Shutdown() {
-  {
-    lock_guard<mutex> l(exit_lock_);
-    if (exit_) return;
-    exit_ = true;
-  }
-  {
-    lock_guard<mutex> l(callbacks_lock_);
-    callbacks_.clear();
-  }
-  threads_changed_cv_.notify_all();
-}
-
-QueryResourceMgr::~QueryResourceMgr() {
-  if (acquire_vcore_thread_.get() == NULL) return;
-  if (!ShouldExit()) Shutdown();
-  // First, set the early exit flag. Then check to see if the thread is in Expand(). If
-  // so, the acquisition thread is guaranteed to see early_exit_ == 1L once it finishes
-  // Expand(), and will exit immediately. It's therefore safe not to wait for it.
-  early_exit_->Add(1L);
-  if (thread_in_expand_->Add(0L) == 0L) {
-    acquire_vcore_thread_->Join();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-resource-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-resource-mgr.h b/be/src/scheduling/query-resource-mgr.h
deleted file mode 100644
index 10da312..0000000
--- a/be/src/scheduling/query-resource-mgr.h
+++ /dev/null
@@ -1,227 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef SCHEDULING_QUERY_RESOURCE_MGR_H
-#define SCHEDULING_QUERY_RESOURCE_MGR_H
-
-#include "common/atomic.h"
-#include "common/status.h"
-#include "gen-cpp/Types_types.h"
-#include "gen-cpp/ResourceBrokerService.h"
-#include "gen-cpp/ImpalaInternalService.h"
-#include "gen-cpp/Frontend_types.h"
-#include "util/promise.h"
-#include "util/thread.h"
-
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-#include <string>
-
-namespace impala {
-
-class ResourceBroker;
-
-/// Utility class to map hosts to the Llama-registered resource-holding hosts
-/// (i.e. datanodes).
-class ResourceResolver {
- public:
-  ResourceResolver(const boost::unordered_set<TNetworkAddress>& unique_hosts);
-
-  /// Translates src into a network address suitable for identifying resources across
-  /// interactions with the Llama. The MiniLlama expects resources to be requested on
-  /// IP:port addresses of Hadoop DNs, whereas the regular Llama only deals with the
-  /// hostnames of Yarn NMs. For MiniLlama setups this translation uses the
-  /// impalad_to_dn_ mapping to populate dest. When using the regular Llama, this
-  /// translation sets a fixed port of 0 in dest because the Llama strips away the port
-  /// of resource locations.
-  void GetResourceHostport(const TNetworkAddress& src, TNetworkAddress* dst);
-
- private:
-  /// Impala mini clusters using the Mini Llama require translating the impalad hostports
-  /// to Hadoop DN hostports registered with the Llama during resource requests
-  /// (and then in reverse for translating granted resources to impalads).
-  /// These maps form a bi-directional hostport mapping Hadoop DN <-> impalad.
-  boost::unordered_map<TNetworkAddress, TNetworkAddress> impalad_to_dn_;
-  boost::unordered_map<TNetworkAddress, TNetworkAddress> dn_to_impalad_;
-
-  /// Called only in pseudo-distributed setups (i.e. testing only) to populate
-  /// impalad_to_dn_ and dn_to_impalad_
-  void CreateLocalLlamaNodeMapping(
-      const boost::unordered_set<TNetworkAddress>& unique_hosts);
-};
-
-/// Tracks all the state necessary to create expansion requests for all fragments of a
-/// single query on a single node. Code that might need to expand the memory reservation
-/// for this query (i.e. MemTracker) can use this class to construct expansion requests
-/// that may then be submitted to the ResourceBroker.
-//
-/// If InitCpuAcquisition() is called, this class will monitor the thread token to VCore
-/// ratio (thread consumers must use NotifyThreadUsageChange() to update the thread
-/// consumption count). If the ratio gets too high (see AboveVcoreSubscriptionThreshold()
-/// for details), we will try to acquire more VCore resources from Llama asynchronously.
-/// If the ratio passes a higher threshold (see IsVcoreOverSubscribed()), we say that the
-/// query fragments are currently oversubscribing their VCore resources.
-//
-/// Threads are typically handed to a fragment by the thread resource manager, which deals
-/// in tokens. When a fragment wants to use a token to start a thread, it should only do so
-/// if the ratio of threads to VCores (which map directly onto cgroup shares) is not too
-/// large. If it is too large - i.e. the VCores are oversubscribed - the fragment should
-/// wait to spin up a new threads until more VCore resources are acquired as above. To help
-/// with this, each fragment may register one or more callbacks with their
-/// QueryResourceMgr; when more VCore resources are acquired the callbacks are invoked in
-/// round-robin fashion. The callback should try and re-acquire the previously untaken
-/// thread token, and then a new thread may be started.
-//
-/// Only CPU-heavy threads need be managed using this class.
-//
-/// TODO: Handle reducing the number of VCores when threads finish.
-/// TODO: Consider combining more closely with ThreadResourceMgr.
-/// TODO: Add counters to RuntimeProfile to track resources.
-class QueryResourceMgr {
- public:
-  QueryResourceMgr(const TUniqueId& reservation_id,
-      const TNetworkAddress& local_resource_location, const TUniqueId& query_id);
-
-  /// Must be called only once. Starts a separate thread to monitor thread consumption,
-  /// which asks for more VCores from Llama periodically.
-  void InitVcoreAcquisition(int32_t init_vcores);
-
-  /// Should be used to check if another thread token may be acquired by this
-  /// query. Fragments may ignore this when acquiring a new CPU token, but the result will
-  /// be a larger thread:VCore ratio.
-  //
-  /// Note that this threshold is larger than the one in
-  /// AboveVcoreSubscriptionThreshold(). We want to start acquiring more VCore allocations
-  /// before we get so oversubscribed that adding new threads is considered a bad idea.
-  inline bool IsVcoreOverSubscribed() {
-    boost::lock_guard<boost::mutex> l(threads_running_lock_);
-    return threads_running_ > vcores_ * max_vcore_oversubscription_ratio_;
-  }
-
-  /// Called when thread consumption goes up or down. If the total consumption goes above a
-  /// subscription threshold, the acquisition thread will be woken to ask for more VCores.
-  void NotifyThreadUsageChange(int delta);
-
-  /// All callbacks registered here are called in round-robin fashion when more VCores are
-  /// acquired. Returns a unique ID that can be used as an argument to
-  /// RemoveVcoreAvailableCb().
-  typedef boost::function<void ()> VcoreAvailableCb;
-  int32_t AddVcoreAvailableCb(const VcoreAvailableCb& callback);
-
-  /// Removes the callback with the given ID.
-  void RemoveVcoreAvailableCb(int32_t callback_id);
-
-  /// Request an expansion of requested_bytes. If the expansion can be fulfilled within
-  /// the timeout period, the number of bytes allocated is returned in allocated_bytes
-  /// (which may be more than requested). Otherwise an error status is returned.
-  Status RequestMemExpansion(int64_t requested_bytes, int64_t* allocated_bytes);
-
-  /// Sets the exit flag for the VCore acquisiton thread, but does not block. Also clears
-  /// the set of callbacks, so that after Shutdown() has returned, no callback will be
-  /// invoked.
-  void Shutdown();
-
-  /// Waits for the VCore acquisition thread to stop.
-  ~QueryResourceMgr();
-
-  const TUniqueId& reservation_id() const { return reservation_id_; }
-
- private:
-  /// ID of the single reservation corresponding to this query
-  TUniqueId reservation_id_;
-
-  /// Query ID of the query this class manages resources for.
-  TUniqueId query_id_;
-
-  /// Network address of the local service registered with Llama. Usually corresponds to
-  /// <local-address>:0, unless a pseudo-dstributed Llama is being used (see
-  /// ResourceResolver::CreateLocalLlamaNodeMapping()).
-  TNetworkAddress local_resource_location_;
-
-  /// Used to control shutdown of AcquireCpuResources().
-  boost::mutex exit_lock_;
-  bool exit_;
-
-  /// Protects callbacks_ and callbacks_it_
-  boost::mutex callbacks_lock_;
-
-  /// List of callbacks to notify when a new VCore resource is available.
-  typedef boost::unordered_map<int32_t, VcoreAvailableCb> CallbackMap;
-  CallbackMap callbacks_;
-
-  /// Round-robin iterator to notify callbacks about new VCores one at a time.
-  CallbackMap::iterator callbacks_it_;
-
-  /// Total number of callbacks that were ever registered. Used to give each callback a
-  /// unique ID so that they can be removed.
-  int32_t callback_count_;
-
-  /// Protects threads_running_, threads_changed_cv_ and vcores_.
-  boost::mutex threads_running_lock_;
-
-  /// Waited on by AcquireCpuResources(), and notified by NotifyThreadUsageChange().
-  boost::condition_variable threads_changed_cv_;
-
-  /// The number of threads we know to be running on behalf of this query.
-  int64_t threads_running_;
-
-  /// The number of VCores acquired for this node for this query.
-  int64_t vcores_;
-
-  /// Set to FLAGS_max_vcore_oversubscription_ratio in the constructor. If the ratio of
-  /// threads to VCores exceeds this number, no more threads may be executed by this query
-  /// until more VCore resources are acquired.
-  float max_vcore_oversubscription_ratio_;
-
-  /// Runs AcquireVcoreResources() after InitVcoreAcquisition() is called.
-  boost::scoped_ptr<Thread> acquire_vcore_thread_;
-
-  /// Signals to the vcore acquisition thread that it should exit after it exits from any
-  /// pending Expand() call. Is a shared_ptr so that it will remain valid even after the
-  /// parent QueryResourceMgr has been destroyed.
-  /// TODO: Combine with ShouldExit(), and replace with AtomicBool when we have such a
-  /// thing.
-  std::shared_ptr<AtomicInt32> early_exit_;
-
-  /// Signals to the destructor that the vcore acquisition thread is currently in an
-  /// Expand() RPC. If so, the destructor does not need to wait for the acquisition thread
-  /// to exit.
-  std::shared_ptr<AtomicInt32> thread_in_expand_;
-
-  /// Creates the llama resource for the memory and/or cores specified, associated with
-  /// the reservation context.
-  llama::TResource CreateResource(int64_t memory_mb, int64_t vcores);
-
-  /// Run as a thread owned by acquire_cpu_thread_. Waits for notification from
-  /// NotifyThreadUsageChange(), then checks the subscription level to decide if more
-  /// VCores are needed, and starts a new expansion request if so.
-  void AcquireVcoreResources(std::shared_ptr<AtomicInt32 > thread_in_expand,
-      std::shared_ptr<AtomicInt32> early_exit);
-
-  /// True if thread:VCore subscription is too high, meaning more VCores are required.
-  /// Must be called holding threads_running_ lock.
-  bool AboveVcoreSubscriptionThreshold();
-
-  /// Notifies acquire_cpu_thread_ that it should terminate. Does not block.
-  bool ShouldExit();
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/scheduling/query-schedule.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/query-schedule.cc b/be/src/scheduling/query-schedule.cc
index f893f1c..b5745c4 100644
--- a/be/src/scheduling/query-schedule.cc
+++ b/be/src/scheduling/query-schedule.cc
@@ -28,7 +28,6 @@
 #include "util/uid-util.h"
 #include "util/debug-util.h"
 #include "util/parse-util.h"
-#include "util/llama-util.h"
 
 #include "common/names.h"
 
@@ -36,28 +35,13 @@ using boost::uuids::random_generator;
 using boost::uuids::uuid;
 using namespace impala;
 
-DEFINE_bool(rm_always_use_defaults, false, "If true, all queries use the same initial"
-    " resource requests regardless of their computed resource estimates. Only meaningful "
-    "if --enable_rm is set.");
-DEFINE_string(rm_default_memory, "4G", "The initial amount of memory that"
-    " a query should reserve on each node if either it does not have an available "
-    "estimate, or if --rm_always_use_defaults is set.");
-DEFINE_int32(rm_default_cpu_vcores, 2, "The initial number of virtual cores that"
-    " a query should reserve on each node if either it does not have an available "
-    "estimate, or if --rm_always_use_defaults is set.");
-
+// TODO: Remove for Impala 3.0.
+DEFINE_bool(rm_always_use_defaults, false, "Deprecated");
+DEFINE_string(rm_default_memory, "4G", "Deprecated");
+DEFINE_int32(rm_default_cpu_vcores, 2, "Deprecated");
 
 namespace impala {
 
-// Default value for the request_timeout in a reservation request. The timeout is the
-// max time in milliseconds to wait for a resource request to be fulfilled by Llama.
-// The default value of five minutes was determined to be reasonable based on
-// experiments on a 20-node cluster with TPCDS 15TB and 8 concurrent clients.
-// Over 30% of queries timed out with a reservation timeout of 1 minute but only less
-// than 5% timed out when using 5 minutes. Still, the default value is somewhat
-// arbitrary and a good value is workload dependent.
-const int64_t DEFAULT_REQUEST_TIMEOUT_MS = 5 * 60 * 1000;
-
 QuerySchedule::QuerySchedule(const TUniqueId& query_id,
     const TQueryExecRequest& request, const TQueryOptions& query_options,
     RuntimeProfile* summary_profile, RuntimeProfile::EventSequence* query_events)
@@ -97,10 +81,9 @@ int64_t QuerySchedule::GetClusterMemoryEstimate() const {
 int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
   // Precedence of different estimate sources is:
   // user-supplied RM query option >
-  //   server-side defaults (if rm_always_use_defaults == true) >
   //     query option limit >
   //       estimate >
-  //         server-side defaults (if rm_always_use_defaults == false)
+  //         server-side defaults
   int64_t query_option_memory_limit = numeric_limits<int64_t>::max();
   bool has_query_option = false;
   if (query_options_.__isset.mem_limit && query_options_.mem_limit > 0) {
@@ -116,12 +99,10 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
   }
 
   int64_t per_host_mem = 0L;
+  // TODO: Remove rm_initial_mem and associated logic when we're sure that clients won't
+  // be affected.
   if (query_options_.__isset.rm_initial_mem && query_options_.rm_initial_mem > 0) {
     per_host_mem = query_options_.rm_initial_mem;
-  } else if (FLAGS_rm_always_use_defaults) {
-    bool ignored;
-    per_host_mem = ParseUtil::ParseMemSpec(FLAGS_rm_default_memory,
-        &ignored, 0);
   } else if (has_query_option) {
     per_host_mem = query_option_memory_limit;
   } else if (has_estimate) {
@@ -134,115 +115,11 @@ int64_t QuerySchedule::GetPerHostMemoryEstimate() const {
   }
   // Cap the memory estimate at the amount of physical memory available. The user's
   // provided value or the estimate from planning can each be unreasonable.
-  // TODO: Get this limit from Llama (Yarn sets it).
   return min(per_host_mem, MemInfo::physical_mem());
 }
 
-int16_t QuerySchedule::GetPerHostVCores() const {
-  // Precedence of different estimate sources is:
-  // server-side defaults (if rm_always_use_defaults == true) >
-  //   computed estimates
-  //     server-side defaults (if rm_always_use_defaults == false)
-  int16_t v_cpu_cores = FLAGS_rm_default_cpu_vcores;
-  if (!FLAGS_rm_always_use_defaults && query_options_.__isset.v_cpu_cores &&
-      query_options_.v_cpu_cores > 0) {
-    v_cpu_cores = query_options_.v_cpu_cores;
-  }
-
-  return v_cpu_cores;
-}
-
-void QuerySchedule::GetResourceHostport(const TNetworkAddress& src,
-    TNetworkAddress* dst) {
-  DCHECK(dst != NULL);
-  DCHECK(resource_resolver_.get() != NULL)
-      << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
-  resource_resolver_->GetResourceHostport(src, dst);
-}
-
 void QuerySchedule::SetUniqueHosts(const unordered_set<TNetworkAddress>& unique_hosts) {
   unique_hosts_ = unique_hosts;
-  resource_resolver_.reset(new ResourceResolver(unique_hosts_));
-}
-
-void QuerySchedule::PrepareReservationRequest(const string& pool, const string& user) {
-  reservation_request_.resources.clear();
-  reservation_request_.version = TResourceBrokerServiceVersion::V1;
-  reservation_request_.queue = pool;
-  reservation_request_.gang = true;
-  // Convert the user name to a short name (e.g. 'user1@domain' to 'user1') because
-  // Llama checks group membership based on the short name of the principal.
-  reservation_request_.user = llama::GetShortName(user);
-
-  // Set optional request timeout from query options.
-  if (query_options_.__isset.reservation_request_timeout) {
-    DCHECK_GT(query_options_.reservation_request_timeout, 0);
-    reservation_request_.__set_request_timeout(
-        query_options_.reservation_request_timeout);
-  }
-
-  // Set the reservation timeout from the query options or use a default.
-  int64_t timeout = DEFAULT_REQUEST_TIMEOUT_MS;
-  if (query_options_.__isset.reservation_request_timeout) {
-    timeout = query_options_.reservation_request_timeout;
-  }
-  reservation_request_.__set_request_timeout(timeout);
-
-  int32_t memory_mb = GetPerHostMemoryEstimate() / 1024 / 1024;
-  int32_t v_cpu_cores = GetPerHostVCores();
-  // The memory_mb and v_cpu_cores estimates may legitimately be zero,
-  // e.g., for constant selects. Do not reserve any resources in those cases.
-  if (memory_mb == 0 && v_cpu_cores == 0) return;
-
-  DCHECK(resource_resolver_.get() != NULL)
-      << "resource_resolver_ is NULL, didn't call SetUniqueHosts()?";
-  random_generator uuid_generator;
-  for (const TNetworkAddress& host: unique_hosts_) {
-    reservation_request_.resources.push_back(llama::TResource());
-    llama::TResource& resource = reservation_request_.resources.back();
-    uuid id = uuid_generator();
-    resource.client_resource_id.hi = *reinterpret_cast<uint64_t*>(&id.data[0]);
-    resource.client_resource_id.lo = *reinterpret_cast<uint64_t*>(&id.data[8]);
-    resource.enforcement = llama::TLocationEnforcement::MUST;
-
-    TNetworkAddress resource_hostport;
-    resource_resolver_->GetResourceHostport(host, &resource_hostport);
-    stringstream ss;
-    ss << resource_hostport;
-    resource.askedLocation = ss.str();
-    resource.memory_mb = memory_mb;
-    resource.v_cpu_cores = v_cpu_cores;
-  }
-}
-
-Status QuerySchedule::ValidateReservation() {
-  if (!HasReservation()) return Status("Query schedule does not have a reservation.");
-  vector<TNetworkAddress> hosts_missing_resources;
-  ResourceResolver resolver(unique_hosts_);
-  for (const FragmentExecParams& params: fragment_exec_params_) {
-    for (const TNetworkAddress& host: params.hosts) {
-      // Ignore the coordinator host which is not contained in unique_hosts_.
-      if (unique_hosts_.find(host) == unique_hosts_.end()) continue;
-      TNetworkAddress resource_hostport;
-      resolver.GetResourceHostport(host, &resource_hostport);
-      if (reservation_.allocated_resources.find(resource_hostport) ==
-          reservation_.allocated_resources.end()) {
-        hosts_missing_resources.push_back(host);
-      }
-    }
-  }
-  if (!hosts_missing_resources.empty()) {
-    stringstream ss;
-    ss << "Failed to validate reservation " << reservation_.reservation_id << "." << endl
-       << "Missing resources for hosts [";
-    for (int i = 0; i < hosts_missing_resources.size(); ++i) {
-      ss << hosts_missing_resources[i];
-      if (i + 1 !=  hosts_missing_resources.size()) ss << ", ";
-    }
-    ss << "]";
-    return Status(ss.str());
-  }
-  return Status::OK();
 }
 
 }