You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/03/29 02:53:43 UTC

[02/14] incubator-impala git commit: IMPALA-4041: Limit catalog and admission control updates to coordinators

IMPALA-4041: Limit catalog and admission control updates to coordinators

With this commit we add the ability to limit catalog updates to a
limited set of coordinator nodes. A new startup option, termed
'is_coordinator' is added to indicate if a node is a coordinator.
Coordinators accept connections through HS2 and Beeswax interfaces
and can also participate in query execution. Non-coordinator nodes
do not receive catalog updates from the statestore, do not initialize
a query scheduler and cannot accept Beeswax and HS2 client connections.

Testing:
- Added a custom cluster test that launches a cluster in which the
number of coordinators is less than the cluster size and runs a number
of smoke queries.
- Successfully run exhaustive tests.

Change-Id: I5f2c74abdbcd60ac050efa323616bd41182ceff3
Reviewed-on: http://gerrit.cloudera.org:8080/6344
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/296df3c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/296df3c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/296df3c8

Branch: refs/heads/master
Commit: 296df3c8269b7be16120397d60e97b6f24fc390b
Parents: 4aa8e2d
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Mar 9 15:24:51 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 28 22:27:25 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/exec-env.cc                |  53 +++++---
 be/src/runtime/exec-env.h                 |   7 +-
 be/src/scheduling/admission-controller.cc |  26 ++--
 be/src/scheduling/admission-controller.h  |  11 +-
 be/src/scheduling/scheduler-test-util.cc  |  16 +--
 be/src/scheduling/scheduler-test.cc       |   9 +-
 be/src/scheduling/scheduler.cc            |  89 +++----------
 be/src/scheduling/scheduler.h             |   8 +-
 be/src/service/impala-server.cc           | 173 +++++++++++++++++--------
 be/src/service/impala-server.h            |  18 ++-
 be/src/service/impalad-main.cc            |  24 ++--
 be/src/service/query-exec-state.cc        |  19 ++-
 be/src/testutil/in-process-servers.h      |   9 +-
 be/src/util/webserver.cc                  |  33 +++--
 bin/start-impala-cluster.py               |  24 +++-
 tests/common/custom_cluster_test_suite.py |   7 +-
 tests/common/impala_service.py            |  14 ++
 tests/custom_cluster/test_coordinators.py |  86 ++++++++++++
 www/root.tmpl                             |   4 +
 19 files changed, 415 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 611520c..dd27a91 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -40,6 +40,7 @@
 #include "runtime/query-exec-mgr.h"
 #include "runtime/thread-resource-mgr.h"
 #include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/request-pool-service.h"
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
@@ -79,12 +80,14 @@ DEFINE_int32(state_store_subscriber_port, 23000,
     "port where StatestoreSubscriberService should be exported");
 DEFINE_int32(num_hdfs_worker_threads, 16,
     "(Advanced) The number of threads in the global HDFS operation pool");
+DEFINE_bool(disable_admission_control, false, "Disables admission control.");
 
 DECLARE_int32(state_store_port);
 DECLARE_int32(num_threads_per_core);
 DECLARE_int32(num_cores);
 DECLARE_int32(be_port);
 DECLARE_string(mem_limit);
+DECLARE_bool(is_coordinator);
 
 // TODO: Remove the following RM-related flags in Impala 3.0.
 DEFINE_bool(enable_rm, false, "Deprecated");
@@ -123,7 +126,7 @@ const static string DEFAULT_FS = "fs.defaultFS";
 
 namespace impala {
 
-ExecEnv* ExecEnv::exec_env_ = NULL;
+ExecEnv* ExecEnv::exec_env_ = nullptr;
 
 ExecEnv::ExecEnv()
   : metrics_(new MetricGroup("impala-metrics")),
@@ -139,7 +142,7 @@ ExecEnv::ExecEnv()
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver()),
-    mem_tracker_(NULL),
+    mem_tracker_(nullptr),
     pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),
     hdfs_op_thread_pool_(
@@ -168,10 +171,19 @@ ExecEnv::ExecEnv()
         Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
         subscriber_address, statestore_address, metrics_.get()));
 
-    scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
-        request_pool_service_.get()));
-  } else {
+    if (FLAGS_is_coordinator) {
+      scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
+          statestore_subscriber_->id(), backend_address_, metrics_.get(),
+          webserver_.get(), request_pool_service_.get()));
+    }
+
+    if (!FLAGS_disable_admission_control) {
+      admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
+          request_pool_service_.get(), metrics_.get(), backend_address_));
+    } else {
+      LOG(INFO) << "Admission control is disabled.";
+    }
+  } else if (FLAGS_is_coordinator) {
     vector<TNetworkAddress> addresses;
     addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
     scheduler_.reset(new Scheduler(
@@ -196,7 +208,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     htable_factory_(new HBaseTableFactory()),
     disk_io_mgr_(new DiskIoMgr()),
     webserver_(new Webserver(webserver_port)),
-    mem_tracker_(NULL),
+    mem_tracker_(nullptr),
     pool_mem_trackers_(new PoolMemTrackerRegistry),
     thread_mgr_(new ThreadResourceMgr),
     hdfs_op_thread_pool_(
@@ -208,7 +220,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
     async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
     query_exec_mgr_(new QueryExecMgr()),
     buffer_reservation_(nullptr),
-    buffer_pool_(NULL),
+    buffer_pool_(nullptr),
     enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
     is_fe_tests_(false),
     backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -224,10 +236,18 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
         Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
         subscriber_address, statestore_address, metrics_.get()));
 
-    scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
-        statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
-        request_pool_service_.get()));
-  } else {
+    if (FLAGS_is_coordinator) {
+      scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
+          statestore_subscriber_->id(), backend_address_, metrics_.get(),
+          webserver_.get(), request_pool_service_.get()));
+    }
+
+    if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
+    if (!FLAGS_disable_admission_control) {
+      admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
+          request_pool_service_.get(), metrics_.get(), backend_address_));
+    }
+  } else if (FLAGS_is_coordinator) {
     vector<TNetworkAddress> addresses;
     addresses.push_back(MakeNetworkAddress(hostname, backend_port));
     scheduler_.reset(new Scheduler(
@@ -284,7 +304,7 @@ Status ExecEnv::StartServices() {
     return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
   }
 
-  metrics_->Init(enable_webserver_ ? webserver_.get() : NULL);
+  metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
   impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
   catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
   RETURN_IF_ERROR(RegisterMemoryMetrics(metrics_.get(), true));
@@ -326,7 +346,8 @@ Status ExecEnv::StartServices() {
     LOG(INFO) << "Not starting webserver";
   }
 
-  if (scheduler_ != NULL) RETURN_IF_ERROR(scheduler_->Init());
+  if (scheduler_ != nullptr) RETURN_IF_ERROR(scheduler_->Init());
+  if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
 
   // Get the fs.defaultFS value set in core-site.xml and assign it to
   // configured_defaultFs
@@ -340,7 +361,7 @@ Status ExecEnv::StartServices() {
     default_fs_ = "hdfs://";
   }
   // Must happen after all topic registrations / callbacks are done
-  if (statestore_subscriber_.get() != NULL) {
+  if (statestore_subscriber_.get() != nullptr) {
     Status status = statestore_subscriber_->Start();
     if (!status.ok()) {
       status.AddDetail("Statestore subscriber did not start up.");
@@ -355,6 +376,6 @@ void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
   DCHECK(buffer_pool_ == nullptr);
   buffer_pool_.reset(new BufferPool(min_page_size, capacity));
   buffer_reservation_.reset(new ReservationTracker());
-  buffer_reservation_->InitRootTracker(NULL, capacity);
+  buffer_reservation_->InitRootTracker(nullptr, capacity);
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index a5777ef..87824ba 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -28,6 +28,7 @@
 
 namespace impala {
 
+class AdmissionController;
 class BufferPool;
 class CallableThreadPool;
 class DataStreamMgr;
@@ -72,10 +73,6 @@ class ExecEnv {
 
   void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
 
-  StatestoreSubscriber* statestore_subscriber() {
-    return statestore_subscriber_.get();
-  }
-
   DataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
   ImpalaBackendClientCache* impalad_client_cache() {
     return impalad_client_cache_.get();
@@ -106,6 +103,7 @@ class ExecEnv {
   void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
 
   Scheduler* scheduler() { return scheduler_.get(); }
+  AdmissionController* admission_controller() { return admission_controller_.get(); }
   StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
 
   const TNetworkAddress& backend_address() const { return backend_address_; }
@@ -129,6 +127,7 @@ class ExecEnv {
   boost::scoped_ptr<MetricGroup> metrics_;
   boost::scoped_ptr<DataStreamMgr> stream_mgr_;
   boost::scoped_ptr<Scheduler> scheduler_;
+  boost::scoped_ptr<AdmissionController> admission_controller_;
   boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
   boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
   boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 3c9947a..efec78a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -199,9 +199,11 @@ string AdmissionController::PoolStats::DebugString() const {
 
 // TODO: do we need host_id_ to come from host_addr or can it just take the same id
 // the Scheduler has (coming from the StatestoreSubscriber)?
-AdmissionController::AdmissionController(RequestPoolService* request_pool_service,
-    MetricGroup* metrics, const TNetworkAddress& host_addr)
-    : request_pool_service_(request_pool_service),
+AdmissionController::AdmissionController(StatestoreSubscriber* subscriber,
+    RequestPoolService* request_pool_service, MetricGroup* metrics,
+    const TNetworkAddress& host_addr)
+    : subscriber_(subscriber),
+      request_pool_service_(request_pool_service),
       metrics_group_(metrics),
       host_id_(TNetworkAddressToString(host_addr)),
       thrift_serializer_(false),
@@ -224,10 +226,10 @@ AdmissionController::~AdmissionController() {
   dequeue_thread_->Join();
 }
 
-Status AdmissionController::Init(StatestoreSubscriber* subscriber) {
+Status AdmissionController::Init() {
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = subscriber->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }
@@ -578,12 +580,12 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
   if (VLOG_ROW_IS_ON) {
     stringstream ss;
     ss << "Stats update for pool=" << name_ << " backend=" << host_id;
-    if (host_stats == NULL) ss << " topic deletion";
+    if (host_stats == nullptr) ss << " topic deletion";
     if (it != remote_stats_.end()) ss << " previous: " << DebugPoolStats(it->second);
-    if (host_stats != NULL) ss << " new: " << DebugPoolStats(*host_stats);
+    if (host_stats != nullptr) ss << " new: " << DebugPoolStats(*host_stats);
     VLOG_ROW << ss.str();
   }
-  if (host_stats == NULL) {
+  if (host_stats == nullptr) {
     if (it != remote_stats_.end()) {
       remote_stats_.erase(it);
     } else {
@@ -620,7 +622,7 @@ void AdmissionController::HandleTopicDeletions(const vector<string>& topic_delet
     string topic_backend_id;
     if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
     if (topic_backend_id == host_id_) continue;
-    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, NULL);
+    GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
   }
 }
 
@@ -705,7 +707,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
       ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false);
 
   const int64_t current_reserved =
-      tracker == NULL ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
+      tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
   if (current_reserved != local_stats_.backend_mem_reserved) {
     parent_->pools_for_updates_.insert(name_);
     local_stats_.backend_mem_reserved = current_reserved;
@@ -713,7 +715,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
   }
 
   const int64_t current_usage =
-      tracker == NULL ? static_cast<int64_t>(0) : tracker->consumption();
+      tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
   metrics_.local_backend_mem_usage->set_value(current_usage);
 }
 
@@ -797,7 +799,7 @@ void AdmissionController::DequeueLoop() {
 
       while (max_to_dequeue > 0 && !queue.empty()) {
         QueueNode* queue_node = queue.head();
-        DCHECK(queue_node != NULL);
+        DCHECK(queue_node != nullptr);
         DCHECK(!queue_node->is_admitted.IsSet());
         const QuerySchedule& schedule = queue_node->schedule;
         string not_admitted_reason;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index f0a37d8..d3ec49d 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -178,7 +178,8 @@ class ExecEnv;
 ///       better idea of what is perhaps unnecessary.
 class AdmissionController {
  public:
-  AdmissionController(RequestPoolService* request_pool_service, MetricGroup* metrics,
+  AdmissionController(StatestoreSubscriber* subscriber,
+      RequestPoolService* request_pool_service, MetricGroup* metrics,
       const TNetworkAddress& host_addr);
   ~AdmissionController();
 
@@ -196,8 +197,8 @@ class AdmissionController {
   /// This does not block.
   Status ReleaseQuery(QuerySchedule* schedule);
 
-  /// Registers with the subscription manager.
-  Status Init(StatestoreSubscriber* subscriber);
+  /// Registers the request queue topic with the statestore.
+  Status Init();
 
  private:
   class PoolStats;
@@ -206,6 +207,10 @@ class AdmissionController {
   /// Statestore topic name.
   static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
 
+  /// Subscription manager used to handle admission control updates. This is not
+  /// owned by this class.
+  StatestoreSubscriber* subscriber_;
+
   /// Used for user-to-pool resolution and looking up pool configurations. Not owned by
   /// the AdmissionController.
   RequestPoolService* request_pool_service_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2474f33..782379c 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -450,13 +450,13 @@ SchedulerWrapper::SchedulerWrapper(const Plan& plan)
 }
 
 Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
-  DCHECK(scheduler_ != NULL);
+  DCHECK(scheduler_ != nullptr);
 
   // Compute Assignment.
   FragmentScanRangeAssignment* assignment = result->AddAssignment();
-  return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
-      false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
-      plan_.query_options(), NULL, assignment);
+  return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0,
+      nullptr, false, plan_.scan_range_locations(), plan_.referenced_datanodes(),
+      exec_at_coord, plan_.query_options(), nullptr, assignment);
 }
 
 void SchedulerWrapper::AddBackend(const Host& host) {
@@ -495,7 +495,7 @@ void SchedulerWrapper::SendEmptyUpdate() {
 }
 
 void SchedulerWrapper::InitializeScheduler() {
-  DCHECK(scheduler_ == NULL);
+  DCHECK(scheduler_ == nullptr);
   DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 "
                                            << "hosts.";
   const Host& scheduler_host = plan_.cluster().hosts()[0];
@@ -504,8 +504,8 @@ void SchedulerWrapper::InitializeScheduler() {
   scheduler_backend_address.hostname = scheduler_host.ip;
   scheduler_backend_address.port = scheduler_host.be_port;
 
-  scheduler_.reset(new Scheduler(
-      NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
+  scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address,
+      &metrics_, nullptr, nullptr));
   scheduler_->Init();
   // Initialize the scheduler backend maps.
   SendFullMembershipMap();
@@ -532,7 +532,7 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta)
 }
 
 void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
-  DCHECK(scheduler_ != NULL);
+  DCHECK(scheduler_ != nullptr);
   // Wrap in topic delta map.
   StatestoreSubscriber::TopicDeltaMap delta_map;
   delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 272a288..3e05c5b 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -362,8 +362,11 @@ TEST_F(SchedulerTest, TestSendUpdates) {
 }
 
 /// IMPALA-4329: Test scheduling with no backends.
-/// With the fix for IMPALA-4494, the scheduler will always register its local backend
-/// with itself, so scheduling with no backends will still succeed.
+/// With the fix for IMPALA-5058, the scheduler is no longer responsible for
+/// registering the local backend with itself. This functionality is moved to
+/// ImpalaServer::MembershipCallback() and the scheduler will receive the local
+/// backend info through the statestore update, so until that happens, scheduling
+/// should fail.
 TEST_F(SchedulerTest, TestEmptyBackendConfig) {
   Cluster cluster;
   cluster.AddHost(false, true);
@@ -377,7 +380,7 @@ TEST_F(SchedulerTest, TestEmptyBackendConfig) {
   Result result(plan);
   SchedulerWrapper scheduler(plan);
   Status status = scheduler.Compute(&result);
-  EXPECT_TRUE(status.ok());
+  EXPECT_TRUE(!status.ok());
 }
 
 /// IMPALA-4494: Test scheduling with no backends but exec_at_coord.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 70dbcdc..a73e13a 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -30,6 +30,7 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/Types_types.h"
 #include "rapidjson/rapidjson.h"
+#include "runtime/exec-env.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/container-util.h"
 #include "util/metrics.h"
@@ -46,8 +47,6 @@ using namespace strings;
 DECLARE_int32(be_port);
 DECLARE_string(hostname);
 
-DEFINE_bool(disable_admission_control, false, "Disables admission control.");
-
 namespace impala {
 
 static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -69,17 +68,11 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
     statestore_subscriber_(subscriber),
     local_backend_id_(backend_id),
     thrift_serializer_(false),
-    total_assignments_(NULL),
-    total_local_assignments_(NULL),
-    initialized_(NULL),
+    total_assignments_(nullptr),
+    total_local_assignments_(nullptr),
+    initialized_(nullptr),
     request_pool_service_(request_pool_service) {
   local_backend_descriptor_.address = backend_address;
-
-  if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
-  if (!FLAGS_disable_admission_control) {
-    admission_controller_.reset(
-        new AdmissionController(request_pool_service_, metrics, backend_address));
-  }
 }
 
 Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metrics,
@@ -87,20 +80,14 @@ Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metri
   : backend_config_(std::make_shared<const BackendConfig>(backends)),
     metrics_(metrics),
     webserver_(webserver),
-    statestore_subscriber_(NULL),
+    statestore_subscriber_(nullptr),
     thrift_serializer_(false),
-    total_assignments_(NULL),
-    total_local_assignments_(NULL),
-    initialized_(NULL),
+    total_assignments_(nullptr),
+    total_local_assignments_(nullptr),
+    initialized_(nullptr),
     request_pool_service_(request_pool_service) {
   DCHECK(backends.size() > 0);
   local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
-  if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
-  // request_pool_service_ may be null in unit tests
-  if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
-    admission_controller_.reset(
-        new AdmissionController(request_pool_service_, metrics, TNetworkAddress()));
-  }
 }
 
 Status Scheduler::Init() {
@@ -122,14 +109,14 @@ Status Scheduler::Init() {
 
   coord_only_backend_config_.AddBackend(local_backend_descriptor_);
 
-  if (webserver_ != NULL) {
+  if (webserver_ != nullptr) {
     Webserver::UrlCallback backends_callback =
         bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2);
     webserver_->RegisterUrlCallback(
         BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback);
   }
 
-  if (statestore_subscriber_ != NULL) {
+  if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
     Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
@@ -137,12 +124,9 @@ Status Scheduler::Init() {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;
     }
-    if (!FLAGS_disable_admission_control) {
-      RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_));
-    }
   }
 
-  if (metrics_ != NULL) {
+  if (metrics_ != nullptr) {
     // This is after registering with the statestored, so we already have to synchronize
     // access to the backend_config_ shared_ptr.
     int num_backends = GetBackendConfig()->NumBackends();
@@ -153,8 +137,8 @@ Status Scheduler::Init() {
         metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends);
   }
 
-  if (statestore_subscriber_ != NULL) {
-    if (webserver_ != NULL) {
+  if (statestore_subscriber_ != nullptr) {
+    if (webserver_ != nullptr) {
       const TNetworkAddress& webserver_address = webserver_->http_address();
       if (IsWildcardAddress(webserver_address.hostname)) {
         local_backend_descriptor_.__set_debug_http_address(
@@ -253,31 +237,9 @@ void Scheduler::UpdateMembership(
     }
   }
 
-  // If the local backend is not in our view of the membership list, we should add it
-  // and tell the statestore. We also ensure that it is part of our backend config.
-  if (current_membership_.find(local_backend_id_) == current_membership_.end()) {
-    new_backend_config->AddBackend(local_backend_descriptor_);
-    VLOG(1) << "Registering local backend with statestore";
-    subscriber_topic_updates->push_back(TTopicDelta());
-    TTopicDelta& update = subscriber_topic_updates->back();
-    update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
-    update.topic_entries.push_back(TTopicItem());
-
-    TTopicItem& item = update.topic_entries.back();
-    item.key = local_backend_id_;
-    Status status = thrift_serializer_.Serialize(&local_backend_descriptor_, &item.value);
-    if (!status.ok()) {
-      LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:"
-                   << " " << status.GetDetail();
-      subscriber_topic_updates->pop_back();
-    }
-  }
-
-  DCHECK(new_backend_config->LookUpBackendIp(
-      local_backend_descriptor_.address.hostname, nullptr));
   SetBackendConfig(new_backend_config);
 
-  if (metrics_ != NULL) {
+  if (metrics_ != nullptr) {
     /// TODO-MT: fix this (do we even need to report it?)
     num_fragment_instances_metric_->set_value(current_membership_.size());
   }
@@ -285,7 +247,7 @@ void Scheduler::UpdateMembership(
 
 Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const {
   lock_guard<mutex> l(backend_config_lock_);
-  DCHECK(backend_config_.get() != NULL);
+  DCHECK(backend_config_.get() != nullptr);
   BackendConfigPtr backend_config = backend_config_;
   return backend_config;
 }
@@ -660,7 +622,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config
       //   cache to worry about.
       // Remote reads will always break ties by backend rank.
       bool decide_local_assignment_by_rank = random_replica || cached_replica;
-      const IpAddr* backend_ip = NULL;
+      const IpAddr* backend_ip = nullptr;
       backend_ip = assignment_ctx.SelectLocalBackendHost(
           backend_candidates, decide_local_assignment_by_rank);
       TBackendDescriptor backend;
@@ -771,17 +733,6 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
     }
   }
   schedule->SetUniqueHosts(unique_hosts);
-
-  if (!FLAGS_disable_admission_control) {
-    RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
-  }
-  return Status::OK();
-}
-
-Status Scheduler::Release(QuerySchedule* schedule) {
-  if (!FLAGS_disable_admission_control) {
-    RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule));
-  }
   return Status::OK();
 }
 
@@ -844,7 +795,7 @@ const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() {
     DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size());
     candidate_ip = &(assignment_heap_.top().ip);
   }
-  DCHECK(candidate_ip != NULL);
+  DCHECK(candidate_ip != nullptr);
   return candidate_ip;
 }
 
@@ -866,7 +817,7 @@ int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
 
 void Scheduler::AssignmentCtx::SelectBackendOnHost(
     const IpAddr& backend_ip, TBackendDescriptor* backend) {
-  DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL));
+  DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr));
   const BackendConfig::BackendList& backends_on_host =
       backend_config_.GetBackendListForHost(backend_ip);
   DCHECK(backends_on_host.size() > 0);
@@ -934,8 +885,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
     if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
   }
 
-  if (total_assignments_ != NULL) {
-    DCHECK(total_local_assignments_ != NULL);
+  if (total_assignments_ != nullptr) {
+    DCHECK(total_local_assignments_ != nullptr);
     total_assignments_->Increment(1);
     if (!remote_read) total_local_assignments_->Increment(1);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index bd2aff4..ca520c8 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -33,9 +33,9 @@
 #include "gen-cpp/Types_types.h" // for TNetworkAddress
 #include "rapidjson/document.h"
 #include "rpc/thrift-util.h"
-#include "scheduling/admission-controller.h"
 #include "scheduling/backend-config.h"
 #include "scheduling/query-schedule.h"
+#include "scheduling/request-pool-service.h"
 #include "statestore/statestore-subscriber.h"
 #include "util/metrics.h"
 #include "util/network-util.h"
@@ -101,9 +101,6 @@ class Scheduler {
   /// returning.
   Status Schedule(QuerySchedule* schedule);
 
-  /// Releases the reserved resources (if any) from the given schedule.
-  Status Release(QuerySchedule* schedule);
-
  private:
   /// Map from a host's IP address to the next backend to be round-robin scheduled for
   /// that host (needed for setups with multiple backends on a single host)
@@ -324,9 +321,6 @@ class Scheduler {
   /// us.
   RequestPoolService* request_pool_service_;
 
-  /// Used to make admission decisions in 'Schedule()'
-  boost::scoped_ptr<AdmissionController> admission_controller_;
-
   /// Helper methods to access backend_config_ (the shared_ptr, not its contents),
   /// protecting the access with backend_config_lock_.
   BackendConfigPtr GetBackendConfig() const;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8417cf1..313708e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -177,6 +177,10 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
     "QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
     " the maximum allowable timeout.");
 
+DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate "
+    "queries from clients. If false, this daemon will only execute query fragments, and "
+    "will refuse client connections.");
+
 // TODO: Remove for Impala 3.0.
 DEFINE_string(local_nodemanager_url, "", "Deprecated");
 
@@ -244,7 +248,8 @@ class CancellationWork {
 };
 
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
-    : exec_env_(exec_env) {
+    : exec_env_(exec_env),
+      thrift_serializer_(false) {
   // Initialize default config
   InitializeConfigVariables();
 
@@ -328,15 +333,21 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics()));
 
   // Register the membership callback if required
-  if (exec_env->subscriber() != NULL) {
-    StatestoreSubscriber::UpdateCallback cb =
-        bind<void>(mem_fn(&ImpalaServer::MembershipCallback), this, _1, _2);
+  if (exec_env->subscriber() != nullptr) {
+    auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
+         vector<TTopicDelta>* topic_updates) {
+      this->MembershipCallback(state, topic_updates);
+    };
     exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb);
 
-    StatestoreSubscriber::UpdateCallback catalog_cb =
-        bind<void>(mem_fn(&ImpalaServer::CatalogUpdateCallback), this, _1, _2);
-    exec_env->subscriber()->AddTopic(
-        CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb);
+    if (FLAGS_is_coordinator) {
+      auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
+          vector<TTopicDelta>* topic_updates) {
+        this->CatalogUpdateCallback(state, topic_updates);
+      };
+      exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true,
+          catalog_cb);
+    }
   }
 
   ABORT_IF_ERROR(UpdateCatalogMetrics());
@@ -358,6 +369,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
   query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
       bind<void>(&ImpalaServer::ExpireQueries, this)));
 
+  is_coordinator_ = FLAGS_is_coordinator;
   exec_env_->SetImpalaServer(this);
 }
 
@@ -392,6 +404,8 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) {
   return status;
 }
 
+bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
+
 bool ImpalaServer::IsLineageLoggingEnabled() {
   return !FLAGS_lineage_event_log_dir.empty();
 }
@@ -562,7 +576,7 @@ Status ImpalaServer::InitProfileLogging() {
 
 Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
     bool base64_encoded, stringstream* output) {
-  DCHECK(output != NULL);
+  DCHECK(output != nullptr);
   // Search for the query id in the active query map
   {
     shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -599,9 +613,9 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* res
   // Search for the query id in the active query map.
   {
     shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-    if (exec_state != NULL) {
+    if (exec_state != nullptr) {
       lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
-      if (exec_state->coord() != NULL) {
+      if (exec_state->coord() != nullptr) {
         TExecProgress progress;
         {
           lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
@@ -687,7 +701,7 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) {
 
   if (FLAGS_query_log_size == 0) return;
   QueryStateRecord record(query, true, encoded_profile_str);
-  if (query.coord() != NULL) {
+  if (query.coord() != nullptr) {
     lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock());
     record.exec_summary = query.coord()->exec_summary();
   }
@@ -771,7 +785,7 @@ Status ImpalaServer::ExecuteInternal(
     shared_ptr<SessionState> session_state,
     bool* registered_exec_state,
     shared_ptr<QueryExecState>* exec_state) {
-  DCHECK(session_state != NULL);
+  DCHECK(session_state != nullptr);
   *registered_exec_state = false;
 
   exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
@@ -820,7 +834,7 @@ Status ImpalaServer::ExecuteInternal(
     }
   }
 
-  if ((*exec_state)->coord() != NULL) {
+  if ((*exec_state)->coord() != nullptr) {
     const unordered_set<TNetworkAddress>& unique_hosts =
         (*exec_state)->schedule()->unique_hosts();
     if (!unique_hosts.empty()) {
@@ -944,7 +958,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
     exec_state->session()->inflight_queries.erase(query_id);
   }
 
-  if (exec_state->coord() != NULL) {
+  if (exec_state->coord() != nullptr) {
     string exec_summary;
     {
       lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
@@ -975,12 +989,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 
 Status ImpalaServer::UpdateCatalogMetrics() {
   TGetDbsResult dbs;
-  RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(NULL, NULL, &dbs));
+  RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
   ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size());
   ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
   for (const TDatabase& db: dbs.dbs) {
     TGetTablesResult table_names;
-    RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, NULL, NULL,
+    RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
         &table_names));
     ImpaladMetrics::CATALOG_NUM_TABLES->Increment(table_names.tables.size());
   }
@@ -992,7 +1006,7 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflig
     const Status* cause) {
   VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state == NULL) return Status("Invalid or unknown query handle");
+  if (exec_state == nullptr) return Status("Invalid or unknown query handle");
   exec_state->Cancel(check_inflight, cause);
   return Status::OK();
 }
@@ -1014,7 +1028,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
     session_state = entry->second;
     session_state_map_.erase(session_id);
   }
-  DCHECK(session_state != NULL);
+  DCHECK(session_state != nullptr);
   if (session_state->session_type == TSessionType::BEESWAX) {
     ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L);
   } else {
@@ -1081,7 +1095,7 @@ void ImpalaServer::ReportExecStatus(
   // every report (assign each query a local int32_t id and use that to index into a
   // vector of QueryExecStates, w/o lookup or locking?)
   shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false);
-  if (exec_state.get() == NULL) {
+  if (exec_state.get() == nullptr) {
     // This is expected occasionally (since a report RPC might be in flight while
     // cancellation is happening). Return an error to the caller to get it to stop.
     const string& err = Substitute("ReportExecStatus(): Received report for unknown "
@@ -1475,6 +1489,10 @@ void ImpalaServer::MembershipCallback(
       // This is a new item - add it to the map of known backends.
       known_backends_.insert(make_pair(item.key, backend_descriptor));
     }
+
+    // Register the local backend in the statestore and update the list of known backends.
+    AddLocalBackendToStatestore(subscriber_topic_updates);
+
     // Process membership deletions.
     for (const string& backend_id: delta.topic_deletions) {
       known_backends_.erase(backend_id);
@@ -1559,13 +1577,48 @@ void ImpalaServer::MembershipCallback(
   }
 }
 
+void ImpalaServer::AddLocalBackendToStatestore(
+    vector<TTopicDelta>* subscriber_topic_updates) {
+  const string& local_backend_id = exec_env_->subscriber()->id();
+  if (known_backends_.find(local_backend_id) != known_backends_.end()) return;
+
+  TBackendDescriptor local_backend_descriptor;
+  local_backend_descriptor.__set_address(
+      MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
+  IpAddr ip;
+  const Hostname& hostname = local_backend_descriptor.address.hostname;
+  Status status = HostnameToIpAddr(hostname, &ip);
+  if (!status.ok()) {
+    // TODO: Should we do something about this failure?
+    LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: "
+                 << status.GetDetail();
+    return;
+  }
+  local_backend_descriptor.ip_address = ip;
+  subscriber_topic_updates->emplace_back(TTopicDelta());
+  TTopicDelta& update = subscriber_topic_updates->back();
+  update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_entries.emplace_back(TTopicItem());
+
+  TTopicItem& item = update.topic_entries.back();
+  item.key = local_backend_id;
+  status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
+  if (!status.ok()) {
+    LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
+                 << " " << status.GetDetail();
+    subscriber_topic_updates->pop_back();
+  } else {
+    known_backends_.insert(make_pair(item.key, local_backend_descriptor));
+  }
+}
+
 ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state,
     bool copy_profile, const string& encoded_profile) {
   id = exec_state.query_id();
   const TExecRequest& request = exec_state.exec_request();
 
   const string* plan_str = exec_state.summary_profile().GetInfoString("Plan");
-  if (plan_str != NULL) plan = *plan_str;
+  if (plan_str != nullptr) plan = *plan_str;
   stmt = exec_state.sql_stmt();
   stmt_type = request.stmt_type;
   effective_user = exec_state.effective_user();
@@ -1575,7 +1628,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat
   has_coord = false;
 
   Coordinator* coord = exec_state.coord();
-  if (coord != NULL) {
+  if (coord != nullptr) {
     num_complete_fragments = coord->progress().num_complete();
     total_fragments = coord->progress().total();
     has_coord = true;
@@ -1770,7 +1823,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
         if (expiration_event->first > now) break;
         shared_ptr<QueryExecState> query_state =
             GetQueryExecState(expiration_event->second, false);
-        if (query_state.get() == NULL) {
+        if (query_state.get() == nullptr) {
           // Query was deleted some other way.
           queries_by_timestamp_.erase(expiration_event++);
           continue;
@@ -1830,17 +1883,43 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
 
 Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port,
     ThriftServer** beeswax_server, ThriftServer** hs2_server, ThriftServer** be_server,
-    ImpalaServer** impala_server) {
-  DCHECK((beeswax_port == 0) == (beeswax_server == NULL));
-  DCHECK((hs2_port == 0) == (hs2_server == NULL));
-  DCHECK((be_port == 0) == (be_server == NULL));
+    boost::shared_ptr<ImpalaServer>* impala_server) {
+  DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
+  DCHECK((hs2_port == 0) == (hs2_server == nullptr));
+  DCHECK((be_port == 0) == (be_server == nullptr));
 
-  boost::shared_ptr<ImpalaServer> handler(new ImpalaServer(exec_env));
+  impala_server->reset(new ImpalaServer(exec_env));
 
-  if (beeswax_port != 0 && beeswax_server != NULL) {
+  if (be_port != 0 && be_server != nullptr) {
+    boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
+    boost::shared_ptr<TProcessor> be_processor(
+        new ImpalaInternalServiceProcessor(thrift_if));
+    boost::shared_ptr<TProcessorEventHandler> event_handler(
+        new RpcEventHandler("backend", exec_env->metrics()));
+    be_processor->setEventHandler(event_handler);
+
+    *be_server = new ThriftServer("backend", be_processor, be_port, nullptr,
+        exec_env->metrics(), FLAGS_be_service_threads);
+    if (EnableInternalSslConnections()) {
+      LOG(INFO) << "Enabling SSL for backend";
+      RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate,
+          FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd));
+    }
+
+    LOG(INFO) << "ImpalaInternalService listening on " << be_port;
+  }
+  if (!FLAGS_is_coordinator) {
+    LOG(INFO) << "Started worker Impala server on "
+              << ExecEnv::GetInstance()->backend_address();
+    return Status::OK();
+  }
+
+  // Initialize the HS2 and Beeswax services.
+  if (beeswax_port != 0 && beeswax_server != nullptr) {
     // Beeswax FE must be a TThreadPoolServer because ODBC and Hue only support
     // TThreadPoolServer.
-    boost::shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
+    boost::shared_ptr<TProcessor> beeswax_processor(
+        new ImpalaServiceProcessor(*impala_server));
     boost::shared_ptr<TProcessorEventHandler> event_handler(
         new RpcEventHandler("beeswax", exec_env->metrics()));
     beeswax_processor->setEventHandler(event_handler);
@@ -1848,7 +1927,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
         beeswax_port, AuthManager::GetInstance()->GetExternalAuthProvider(),
         exec_env->metrics(), FLAGS_fe_service_threads, ThriftServer::ThreadPool);
 
-    (*beeswax_server)->SetConnectionHandler(handler.get());
+    (*beeswax_server)->SetConnectionHandler(impala_server->get());
     if (!FLAGS_ssl_server_certificate.empty()) {
       LOG(INFO) << "Enabling SSL for Beeswax";
       RETURN_IF_ERROR((*beeswax_server)->EnableSsl(FLAGS_ssl_server_certificate,
@@ -1858,10 +1937,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
     LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_port;
   }
 
-  if (hs2_port != 0 && hs2_server != NULL) {
+  if (hs2_port != 0 && hs2_server != nullptr) {
     // HiveServer2 JDBC driver does not support non-blocking server.
     boost::shared_ptr<TProcessor> hs2_fe_processor(
-        new ImpalaHiveServer2ServiceProcessor(handler));
+        new ImpalaHiveServer2ServiceProcessor(*impala_server));
     boost::shared_ptr<TProcessorEventHandler> event_handler(
         new RpcEventHandler("hs2", exec_env->metrics()));
     hs2_fe_processor->setEventHandler(event_handler);
@@ -1870,7 +1949,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
         AuthManager::GetInstance()->GetExternalAuthProvider(), exec_env->metrics(),
         FLAGS_fe_service_threads, ThriftServer::ThreadPool);
 
-    (*hs2_server)->SetConnectionHandler(handler.get());
+    (*hs2_server)->SetConnectionHandler(impala_server->get());
     if (!FLAGS_ssl_server_certificate.empty()) {
       LOG(INFO) << "Enabling SSL for HiveServer2";
       RETURN_IF_ERROR((*hs2_server)->EnableSsl(FLAGS_ssl_server_certificate,
@@ -1880,32 +1959,14 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
     LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
   }
 
-  if (be_port != 0 && be_server != NULL) {
-    boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
-    boost::shared_ptr<TProcessor> be_processor(
-        new ImpalaInternalServiceProcessor(thrift_if));
-    boost::shared_ptr<TProcessorEventHandler> event_handler(
-        new RpcEventHandler("backend", exec_env->metrics()));
-    be_processor->setEventHandler(event_handler);
-
-    *be_server = new ThriftServer("backend", be_processor, be_port, NULL,
-        exec_env->metrics(), FLAGS_be_service_threads);
-    if (EnableInternalSslConnections()) {
-      LOG(INFO) << "Enabling SSL for backend";
-      RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate,
-          FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd));
-    }
-
-    LOG(INFO) << "ImpalaInternalService listening on " << be_port;
-  }
-  if (impala_server != NULL) *impala_server = handler.get();
-
+  LOG(INFO) << "Started coordinator Impala server on "
+            << ExecEnv::GetInstance()->backend_address();
   return Status::OK();
 }
 
 bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id,
     TUniqueId* session_id) {
-  DCHECK(session_id != NULL);
+  DCHECK(session_id != nullptr);
   lock_guard<mutex> l(query_exec_state_map_lock_);
   QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
   if (i == query_exec_state_map_.end()) {
@@ -1931,7 +1992,7 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
 void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
     const TUpdateFilterParams& params) {
   shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false);
-  if (query_exec_state.get() == NULL) {
+  if (query_exec_state.get() == nullptr) {
     LOG(INFO) << "Could not find query exec state: " << params.query_id;
     return;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 588a5c3..45e8080 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -270,6 +270,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   /// Returns true if lineage logging is enabled, false otherwise.
   bool IsLineageLoggingEnabled();
 
+  /// Retuns true if this is a coordinator, false otherwise.
+  bool IsCoordinator();
+
   /// The prefix of audit event log filename.
   static const string AUDIT_EVENT_LOG_FILE_PREFIX;
 
@@ -432,6 +435,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
   /// on why the failure occurred.
   Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user);
 
+  // Check if the local backend descriptor is in the list of known backends. If not, add
+  // it to the list of known backends and add it to the 'topic_updates'.
+  void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates);
+
   /// Snapshot of a query's state, archived in the query log.
   struct QueryStateRecord {
     /// Pretty-printed runtime profile. TODO: Copy actual profile object
@@ -942,6 +949,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 
   /// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
   boost::scoped_ptr<Thread> query_expiration_thread_;
+
+  /// Serializes TBackendDescriptors when creating topic updates
+  ThriftSerializer thrift_serializer_;
+
+  /// True if this ImpalaServer can accept client connections and coordinate
+  /// queries.
+  bool is_coordinator_;
 };
 
 /// Create an ImpalaServer and Thrift servers.
@@ -949,6 +963,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 /// ImpalaService (Beeswax) on beeswax_port (returned via beeswax_server).
 /// If hs2_port != 0 (and hs2_server != NULL), creates a ThriftServer exporting
 /// ImpalaHiveServer2Service on hs2_port (returned via hs2_server).
+/// ImpalaService and ImpalaHiveServer2Service are initialized only if this
+/// Impala server is a coordinator (indicated by the is_coordinator flag).
 /// If be_port != 0 (and be_server != NULL), create a ThriftServer exporting
 /// ImpalaInternalService on be_port (returned via be_server).
 /// Returns created ImpalaServer. The caller owns fe_server and be_server.
@@ -958,7 +974,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
 /// which case none of the output parameters can be assumed to be valid.
 Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port,
     int be_port, ThriftServer** beeswax_server, ThriftServer** hs2_server,
-    ThriftServer** be_server, ImpalaServer** impala_server);
+    ThriftServer** be_server, boost::shared_ptr<ImpalaServer>* impala_server);
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 4848b45..01e5b55 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -56,6 +56,7 @@ DECLARE_int32(hs2_port);
 DECLARE_int32(be_port);
 DECLARE_string(principal);
 DECLARE_bool(enable_rm);
+DECLARE_bool(is_coordinator);
 
 int ImpaladMain(int argc, char** argv) {
   InitCommonRuntime(argc, argv, true);
@@ -84,14 +85,17 @@ int ImpaladMain(int argc, char** argv) {
   ThriftServer* beeswax_server = NULL;
   ThriftServer* hs2_server = NULL;
   ThriftServer* be_server = NULL;
-  ImpalaServer* server = NULL;
+  boost::shared_ptr<ImpalaServer> server;
   ABORT_IF_ERROR(CreateImpalaServer(&exec_env, FLAGS_beeswax_port, FLAGS_hs2_port,
       FLAGS_be_port, &beeswax_server, &hs2_server, &be_server, &server));
 
   ABORT_IF_ERROR(be_server->Start());
 
-  ABORT_IF_ERROR(beeswax_server->Start());
-  ABORT_IF_ERROR(hs2_server->Start());
+  if (FLAGS_is_coordinator) {
+    ABORT_IF_ERROR(beeswax_server->Start());
+    ABORT_IF_ERROR(hs2_server->Start());
+  }
+
   Status status = exec_env.StartServices();
   if (!status.ok()) {
     LOG(ERROR) << "Impalad services did not start correctly, exiting.  Error: "
@@ -101,13 +105,17 @@ int ImpaladMain(int argc, char** argv) {
   }
   ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
   LOG(INFO) << "Impala has started.";
-  // this blocks until the beeswax and hs2 servers terminate
-  beeswax_server->Join();
-  hs2_server->Join();
 
+  be_server->Join();
   delete be_server;
-  delete beeswax_server;
-  delete hs2_server;
+
+  if (FLAGS_is_coordinator) {
+    // this blocks until the beeswax and hs2 servers terminate
+    beeswax_server->Join();
+    hs2_server->Join();
+    delete beeswax_server;
+    delete hs2_server;
+  }
 
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 2cf2e5d..f704676 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -25,6 +25,7 @@
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/exec-env.h"
+#include "scheduling/admission-controller.h"
 #include "scheduling/scheduler.h"
 #include "service/frontend.h"
 #include "service/impala-server.h"
@@ -456,6 +457,14 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
     RETURN_IF_ERROR(UpdateQueryStatus(status));
   }
 
+  if (exec_env_->admission_controller() != nullptr) {
+    status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
+    {
+      lock_guard<mutex> l(lock_);
+      RETURN_IF_ERROR(UpdateQueryStatus(status));
+    }
+  }
+
   coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
   status = coord_->Exec();
   {
@@ -570,10 +579,12 @@ void ImpalaServer::QueryExecState::Done() {
 
   if (coord_.get() != NULL) {
     // Release any reserved resources.
-    Status status = exec_env_->scheduler()->Release(schedule_.get());
-    if (!status.ok()) {
-      LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
-                   << " because of error: " << status.GetDetail();
+    if (exec_env_->admission_controller() != nullptr) {
+      Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
+      if (!status.ok()) {
+        LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
+                     << " because of error: " << status.GetDetail();
+      }
     }
     coord_->TearDown();
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 54c7484..870f02d 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -69,7 +69,7 @@ class InProcessImpalaServer {
   /// there was an error joining.
   Status Join();
 
-  ImpalaServer* impala_server() { return impala_server_; }
+  ImpalaServer* impala_server() { return impala_server_.get(); }
 
   MetricGroup* metrics() { return exec_env_->metrics(); }
 
@@ -92,10 +92,9 @@ class InProcessImpalaServer {
 
   uint32_t hs2_port_;
 
-  /// The ImpalaServer that handles client and backend requests. Not owned by this class;
-  /// instead it's owned via shared_ptrs in the ThriftServers. See CreateImpalaServer for
-  /// details.
-  ImpalaServer* impala_server_;
+  /// The ImpalaServer that handles client and backend requests. Ownership is shared via
+  /// shared_ptrs with the ThriftServers. See CreateImpalaServer for details.
+  boost::shared_ptr<ImpalaServer> impala_server_;
 
   /// ExecEnv holds much of the per-service state
   boost::scoped_ptr<ExecEnv> exec_env_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 61ef3ea..3b7c4f9 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -34,6 +34,8 @@
 
 #include "common/logging.h"
 #include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "service/impala-server.h"
 #include "thirdparty/mustache/mustache.h"
 #include "util/asan.h"
 #include "util/coding-util.h"
@@ -97,6 +99,8 @@ DEFINE_string(webserver_password_file, "",
 DEFINE_string(webserver_x_frame_options, "DENY",
     "webserver will add X-Frame-Options HTTP header with this value");
 
+DECLARE_bool(is_coordinator);
+
 static const char* DOC_FOLDER = "/www/";
 static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
 
@@ -116,7 +120,7 @@ static const char* ERROR_KEY = "__error_msg__";
 const char* GetDefaultDocumentRoot() {
   stringstream ss;
   char* impala_home = getenv("IMPALA_HOME");
-  if (impala_home == NULL) {
+  if (impala_home == nullptr) {
     return ""; // Empty document root means don't serve static files
   } else {
     ss << impala_home;
@@ -152,7 +156,7 @@ string BuildHeaderString(ResponseCode response, ContentType content_type) {
 }
 
 Webserver::Webserver()
-    : context_(NULL),
+    : context_(nullptr),
       error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2),
           "error.tmpl", false)) {
   http_address_ = MakeNetworkAddress(
@@ -161,7 +165,7 @@ Webserver::Webserver()
 }
 
 Webserver::Webserver(const int port)
-    : context_(NULL),
+    : context_(nullptr),
       error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2),
           "error.tmpl", false)) {
   http_address_ = MakeNetworkAddress("0.0.0.0", port);
@@ -186,6 +190,13 @@ void Webserver::RootHandler(const ArgumentMap& args, Document* document) {
     document->GetAllocator());
   document->AddMember("process_state_info", process_state_info,
     document->GetAllocator());
+
+  ExecEnv* env = ExecEnv::GetInstance();
+  if (env == nullptr || env->impala_server() == nullptr) return;
+  string mode = (env->impala_server()->IsCoordinator()) ?
+      "Coordinator + Executor" : "Executor";
+  Value impala_server_mode(mode.c_str(), document->GetAllocator());
+  document->AddMember("impala_server_mode", impala_server_mode, document->GetAllocator());
 }
 
 void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) {
@@ -300,7 +311,7 @@ Status Webserver::Start() {
   options.push_back("no");
 
   // Options must be a NULL-terminated list
-  options.push_back(NULL);
+  options.push_back(nullptr);
 
   // squeasel ignores SIGCHLD and we need it to run kinit. This means that since
   // squeasel does not reap its own children CGI programs must be avoided.
@@ -321,7 +332,7 @@ Status Webserver::Start() {
   // Restore the child signal handler so wait() works properly.
   signal(SIGCHLD, sig_chld);
 
-  if (context_ == NULL) {
+  if (context_ == nullptr) {
     stringstream error_msg;
     error_msg << "Webserver: Could not start on address " << http_address_;
     return Status(error_msg.str());
@@ -337,14 +348,14 @@ Status Webserver::Start() {
 }
 
 void Webserver::Stop() {
-  if (context_ != NULL) {
+  if (context_ != nullptr) {
     sq_stop(context_);
-    context_ = NULL;
+    context_ = nullptr;
   }
 }
 
 void Webserver::GetCommonJson(Document* document) {
-  DCHECK(document != NULL);
+  DCHECK(document != nullptr);
   Value obj(kObjectType);
   obj.AddMember("process-name", google::ProgramInvocationShortName(),
       document->GetAllocator());
@@ -365,7 +376,7 @@ void Webserver::GetCommonJson(Document* document) {
 
 int Webserver::LogMessageCallbackStatic(const struct sq_connection* connection,
     const char* message) {
-  if (message != NULL) {
+  if (message != nullptr) {
     LOG(INFO) << "Webserver: " << message;
   }
   return PROCESSING_COMPLETE;
@@ -389,7 +400,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
   }
 
   map<string, string> arguments;
-  if (request_info->query_string != NULL) {
+  if (request_info->query_string != nullptr) {
     BuildArgumentMap(request_info->query_string, &arguments);
   }
 
@@ -397,7 +408,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
   UrlHandlerMap::const_iterator it = url_handlers_.find(request_info->uri);
   ResponseCode response = OK;
   ContentType content_type = HTML;
-  const UrlHandler* url_handler = NULL;
+  const UrlHandler* url_handler = nullptr;
   if (it == url_handlers_.end()) {
     response = NOT_FOUND;
     arguments[ERROR_KEY] = Substitute("No URI handler for '$0'", request_info->uri);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index b276b08..37671e3 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -36,6 +36,8 @@ DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
 parser = OptionParser()
 parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
                   help="Size of the cluster (number of impalad instances to start).")
+parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
+                  default=3, help="Number of coordinators.")
 parser.add_option("--build_type", dest="build_type", default= 'latest',
                   help="Build type to use - debug / release / latest")
 parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
@@ -204,7 +206,7 @@ def build_jvm_args(instance_num):
   BASE_JVM_DEBUG_PORT = 30000
   return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
 
-def start_impalad_instances(cluster_size):
+def start_impalad_instances(cluster_size, num_coordinators):
   if cluster_size == 0:
     # No impalad instances should be started.
     return
@@ -239,6 +241,10 @@ def start_impalad_instances(cluster_size):
     if options.kudu_master_hosts:
       # Must be prepended, otherwise the java options interfere.
       args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
+
+    if i >= num_coordinators:
+      args = "-is_coordinator=false %s" % (args)
+
     stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
     exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
 
@@ -279,9 +285,10 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
   # impalad processes may take a while to come up.
   wait_for_impala_process_count(impala_cluster)
   for impalad in impala_cluster.impalads:
-    impalad.service.wait_for_num_known_live_backends(options.cluster_size,
-        timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
-    wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
+    if impalad._get_arg_value('is_coordinator', default='true') == 'true':
+      impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+          timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
+      wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
 
 def wait_for_catalog(impalad, timeout_in_seconds):
   """Waits for the impalad catalog to become ready"""
@@ -326,6 +333,10 @@ if __name__ == "__main__":
     print 'Please specify a cluster size >= 0'
     sys.exit(1)
 
+  if options.num_coordinators <= 0:
+    print 'Please specify a valid number of coordinators > 0'
+    sys.exit(1)
+
   if not os.path.isdir(options.log_dir):
     print 'Log dir does not exist or is not a directory: %s' % options.log_dir
     sys.exit(1)
@@ -372,7 +383,7 @@ if __name__ == "__main__":
       if not options.restart_impalad_only:
         start_statestore()
         start_catalogd()
-      start_impalad_instances(options.cluster_size)
+      start_impalad_instances(options.cluster_size, options.num_coordinators)
       # Sleep briefly to reduce log spam: the cluster takes some time to start up.
       sleep(3)
       wait_for_cluster()
@@ -380,4 +391,5 @@ if __name__ == "__main__":
       print 'Error starting cluster: %s' % e
       sys.exit(1)
 
-  print 'Impala Cluster Running with %d nodes.' % options.cluster_size
+  print 'Impala Cluster Running with %d nodes and %d coordinators.' % (
+      options.cluster_size, options.num_coordinators)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 28f598c..b19b5d2 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -30,6 +30,7 @@ from time import sleep
 
 IMPALA_HOME = os.environ['IMPALA_HOME']
 CLUSTER_SIZE = 3
+NUM_COORDINATORS = CLUSTER_SIZE
 # The number of statestore subscribers is CLUSTER_SIZE (# of impalad) + 1 (for catalogd).
 NUM_SUBSCRIBERS = CLUSTER_SIZE + 1
 
@@ -107,10 +108,11 @@ class CustomClusterTestSuite(ImpalaTestSuite):
 
   @classmethod
   def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"),
-      cluster_size=CLUSTER_SIZE, log_level=1):
+      cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, log_level=1):
     cls.impala_log_dir = log_dir
     cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
            '--cluster_size=%d' % cluster_size,
+           '--num_coordinators=%d' % num_coordinators,
            '--log_dir=%s' % log_dir,
            '--log_level=%s' % log_level]
     try:
@@ -123,7 +125,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
       raise Exception("statestored was not found")
     statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
     for impalad in cls.cluster.impalads:
-      impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
+      if impalad._get_arg_value('is_coordinator', default='true') == 'true':
+        impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
 
   def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
     """

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 4e9c815..265b1b6 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -26,6 +26,10 @@ import urllib
 from time import sleep, time
 
 from tests.common.impala_connection import create_connection, create_ldap_connection
+from TCLIService import TCLIService
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport
+from thrift.protocol import TBinaryProtocol
 
 logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
 LOG = logging.getLogger('impala_service')
@@ -201,6 +205,16 @@ class ImpaladService(BaseImpalaService):
     client.connect()
     return client
 
+  def create_hs2_client(self):
+    """Creates a new HS2 client connection to the impalad"""
+    host, port = (self.hostname, self.hs2_port)
+    socket = TSocket(host, port)
+    transport = TBufferedTransport(socket)
+    transport.open()
+    protocol = TBinaryProtocol.TBinaryProtocol(transport)
+    hs2_client = TCLIService.Client(protocol)
+    return hs2_client
+
   def get_catalog_object_dump(self, object_type, object_name):
     return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
         (object_type, object_name))

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
new file mode 100644
index 0000000..6010404
--- /dev/null
+++ b/tests/custom_cluster/test_coordinators.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# The base class that should be used for almost all Impala tests
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestCoordinators(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  def test_multiple_coordinators(self):
+    """Test a cluster configuration in which not all impalad nodes are coordinators.
+      Verify that only coordinators can accept client connections and that select and DDL
+      queries run successfully."""
+
+    db_name = "TEST_MUL_COORD_DB"
+    self._start_impala_cluster([], num_coordinators=2, cluster_size=3)
+    assert len(self.cluster.impalads) == 3
+
+    coordinator1 = self.cluster.impalads[0]
+    coordinator2 = self.cluster.impalads[1]
+    worker = self.cluster.impalads[2]
+
+    # Verify that Beeswax and HS2 client connections can't be established at a worker node
+    beeswax_client = None
+    try:
+      beeswax_client = worker.service.create_beeswax_client()
+    except: pass
+    finally:
+      assert beeswax_client is None
+
+    hs2_client = None
+    try:
+      hs2_client = worker.service.create_hs2_client()
+    except: pass
+    finally:
+      assert hs2_client is None
+
+    # Verify that queries can successfully run on coordinator nodes
+    try:
+      client1 = coordinator1.service.create_beeswax_client()
+      client2 = coordinator2.service.create_beeswax_client()
+
+      # select queries
+      self.execute_query_expect_success(client1, "select 1")
+      self.execute_query_expect_success(client2, "select * from functional.alltypes");
+      # DDL queries w/o SYNC_DDL
+      self.execute_query_expect_success(client1, "refresh functional.alltypes")
+      query_options = {"sync_ddl" : 1}
+      self.execute_query_expect_success(client2, "refresh functional.alltypesagg",
+          query_options)
+      self.execute_query_expect_success(client1,
+          "create database if not exists %s" % db_name, query_options)
+      # Create a table using one coordinator
+      self.execute_query_expect_success(client1,
+          "create table %s.foo1 (col int)" % db_name, query_options)
+      # Drop the table using the other coordinator
+      self.execute_query_expect_success(client2, "drop table %s.foo1" % db_name,
+          query_options)
+      # Swap roles and repeat
+      self.execute_query_expect_success(client2,
+          "create table %s.foo2 (col int)" % db_name, query_options)
+      self.execute_query_expect_success(client1, "drop table %s.foo2" % db_name,
+          query_options)
+      self.execute_query_expect_success(client1, "drop database %s cascade" % db_name)
+    finally:
+      # Ensure the worker hasn't received any table metadata
+      num_tbls = worker.service.get_metric_value('catalog.num-tables')
+      assert num_tbls == 0
+      client1.close()
+      client2.close()

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/www/root.tmpl
----------------------------------------------------------------------
diff --git a/www/root.tmpl b/www/root.tmpl
index 916b48b..40d448a 100644
--- a/www/root.tmpl
+++ b/www/root.tmpl
@@ -18,6 +18,10 @@ under the License.
 -->
 {{! Template for / }}
 {{>www/common-header.tmpl}}
+  {{?impala_server_mode}}
+  <h2>Impala Server Mode: {{impala_server_mode}}</h2>
+  {{/impala_server_mode}}
+
   <h2>Vers<span id="v">i</span>on</h2>
   <pre id="version_pre">{{version}}</pre>