You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2017/03/29 02:53:43 UTC
[02/14] incubator-impala git commit: IMPALA-4041: Limit catalog and
admission control updates to coordinators
IMPALA-4041: Limit catalog and admission control updates to coordinators
With this commit we add the ability to limit catalog updates to a
limited set of coordinator nodes. A new startup option, termed
'is_coordinator' is added to indicate if a node is a coordinator.
Coordinators accept connections through HS2 and Beeswax interfaces
and can also participate in query execution. Non-coordinator nodes
do not receive catalog updates from the statestore, do not initialize
a query scheduler and cannot accept Beeswax and HS2 client connections.
Testing:
- Added a custom cluster test that launches a cluster in which the
number of coordinators is less than the cluster size and runs a number
of smoke queries.
- Successfully run exhaustive tests.
Change-Id: I5f2c74abdbcd60ac050efa323616bd41182ceff3
Reviewed-on: http://gerrit.cloudera.org:8080/6344
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/296df3c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/296df3c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/296df3c8
Branch: refs/heads/master
Commit: 296df3c8269b7be16120397d60e97b6f24fc390b
Parents: 4aa8e2d
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Mar 9 15:24:51 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 28 22:27:25 2017 +0000
----------------------------------------------------------------------
be/src/runtime/exec-env.cc | 53 +++++---
be/src/runtime/exec-env.h | 7 +-
be/src/scheduling/admission-controller.cc | 26 ++--
be/src/scheduling/admission-controller.h | 11 +-
be/src/scheduling/scheduler-test-util.cc | 16 +--
be/src/scheduling/scheduler-test.cc | 9 +-
be/src/scheduling/scheduler.cc | 89 +++----------
be/src/scheduling/scheduler.h | 8 +-
be/src/service/impala-server.cc | 173 +++++++++++++++++--------
be/src/service/impala-server.h | 18 ++-
be/src/service/impalad-main.cc | 24 ++--
be/src/service/query-exec-state.cc | 19 ++-
be/src/testutil/in-process-servers.h | 9 +-
be/src/util/webserver.cc | 33 +++--
bin/start-impala-cluster.py | 24 +++-
tests/common/custom_cluster_test_suite.py | 7 +-
tests/common/impala_service.py | 14 ++
tests/custom_cluster/test_coordinators.py | 86 ++++++++++++
www/root.tmpl | 4 +
19 files changed, 415 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 611520c..dd27a91 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -40,6 +40,7 @@
#include "runtime/query-exec-mgr.h"
#include "runtime/thread-resource-mgr.h"
#include "runtime/tmp-file-mgr.h"
+#include "scheduling/admission-controller.h"
#include "scheduling/request-pool-service.h"
#include "scheduling/scheduler.h"
#include "service/frontend.h"
@@ -79,12 +80,14 @@ DEFINE_int32(state_store_subscriber_port, 23000,
"port where StatestoreSubscriberService should be exported");
DEFINE_int32(num_hdfs_worker_threads, 16,
"(Advanced) The number of threads in the global HDFS operation pool");
+DEFINE_bool(disable_admission_control, false, "Disables admission control.");
DECLARE_int32(state_store_port);
DECLARE_int32(num_threads_per_core);
DECLARE_int32(num_cores);
DECLARE_int32(be_port);
DECLARE_string(mem_limit);
+DECLARE_bool(is_coordinator);
// TODO: Remove the following RM-related flags in Impala 3.0.
DEFINE_bool(enable_rm, false, "Deprecated");
@@ -123,7 +126,7 @@ const static string DEFAULT_FS = "fs.defaultFS";
namespace impala {
-ExecEnv* ExecEnv::exec_env_ = NULL;
+ExecEnv* ExecEnv::exec_env_ = nullptr;
ExecEnv::ExecEnv()
: metrics_(new MetricGroup("impala-metrics")),
@@ -139,7 +142,7 @@ ExecEnv::ExecEnv()
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new DiskIoMgr()),
webserver_(new Webserver()),
- mem_tracker_(NULL),
+ mem_tracker_(nullptr),
pool_mem_trackers_(new PoolMemTrackerRegistry),
thread_mgr_(new ThreadResourceMgr),
hdfs_op_thread_pool_(
@@ -168,10 +171,19 @@ ExecEnv::ExecEnv()
Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
subscriber_address, statestore_address, metrics_.get()));
- scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
- statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
- request_pool_service_.get()));
- } else {
+ if (FLAGS_is_coordinator) {
+ scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
+ statestore_subscriber_->id(), backend_address_, metrics_.get(),
+ webserver_.get(), request_pool_service_.get()));
+ }
+
+ if (!FLAGS_disable_admission_control) {
+ admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
+ request_pool_service_.get(), metrics_.get(), backend_address_));
+ } else {
+ LOG(INFO) << "Admission control is disabled.";
+ }
+ } else if (FLAGS_is_coordinator) {
vector<TNetworkAddress> addresses;
addresses.push_back(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
scheduler_.reset(new Scheduler(
@@ -196,7 +208,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
htable_factory_(new HBaseTableFactory()),
disk_io_mgr_(new DiskIoMgr()),
webserver_(new Webserver(webserver_port)),
- mem_tracker_(NULL),
+ mem_tracker_(nullptr),
pool_mem_trackers_(new PoolMemTrackerRegistry),
thread_mgr_(new ThreadResourceMgr),
hdfs_op_thread_pool_(
@@ -208,7 +220,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)),
query_exec_mgr_(new QueryExecMgr()),
buffer_reservation_(nullptr),
- buffer_pool_(NULL),
+ buffer_pool_(nullptr),
enable_webserver_(FLAGS_enable_webserver && webserver_port > 0),
is_fe_tests_(false),
backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) {
@@ -224,10 +236,18 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port,
Substitute("impalad@$0", TNetworkAddressToString(backend_address_)),
subscriber_address, statestore_address, metrics_.get()));
- scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
- statestore_subscriber_->id(), backend_address_, metrics_.get(), webserver_.get(),
- request_pool_service_.get()));
- } else {
+ if (FLAGS_is_coordinator) {
+ scheduler_.reset(new Scheduler(statestore_subscriber_.get(),
+ statestore_subscriber_->id(), backend_address_, metrics_.get(),
+ webserver_.get(), request_pool_service_.get()));
+ }
+
+ if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
+ if (!FLAGS_disable_admission_control) {
+ admission_controller_.reset(new AdmissionController(statestore_subscriber_.get(),
+ request_pool_service_.get(), metrics_.get(), backend_address_));
+ }
+ } else if (FLAGS_is_coordinator) {
vector<TNetworkAddress> addresses;
addresses.push_back(MakeNetworkAddress(hostname, backend_port));
scheduler_.reset(new Scheduler(
@@ -284,7 +304,7 @@ Status ExecEnv::StartServices() {
return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'.");
}
- metrics_->Init(enable_webserver_ ? webserver_.get() : NULL);
+ metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr);
impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends");
catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server");
RETURN_IF_ERROR(RegisterMemoryMetrics(metrics_.get(), true));
@@ -326,7 +346,8 @@ Status ExecEnv::StartServices() {
LOG(INFO) << "Not starting webserver";
}
- if (scheduler_ != NULL) RETURN_IF_ERROR(scheduler_->Init());
+ if (scheduler_ != nullptr) RETURN_IF_ERROR(scheduler_->Init());
+ if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init());
// Get the fs.defaultFS value set in core-site.xml and assign it to
// configured_defaultFs
@@ -340,7 +361,7 @@ Status ExecEnv::StartServices() {
default_fs_ = "hdfs://";
}
// Must happen after all topic registrations / callbacks are done
- if (statestore_subscriber_.get() != NULL) {
+ if (statestore_subscriber_.get() != nullptr) {
Status status = statestore_subscriber_->Start();
if (!status.ok()) {
status.AddDetail("Statestore subscriber did not start up.");
@@ -355,6 +376,6 @@ void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) {
DCHECK(buffer_pool_ == nullptr);
buffer_pool_.reset(new BufferPool(min_page_size, capacity));
buffer_reservation_.reset(new ReservationTracker());
- buffer_reservation_->InitRootTracker(NULL, capacity);
+ buffer_reservation_->InitRootTracker(nullptr, capacity);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/runtime/exec-env.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h
index a5777ef..87824ba 100644
--- a/be/src/runtime/exec-env.h
+++ b/be/src/runtime/exec-env.h
@@ -28,6 +28,7 @@
namespace impala {
+class AdmissionController;
class BufferPool;
class CallableThreadPool;
class DataStreamMgr;
@@ -72,10 +73,6 @@ class ExecEnv {
void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; }
- StatestoreSubscriber* statestore_subscriber() {
- return statestore_subscriber_.get();
- }
-
DataStreamMgr* stream_mgr() { return stream_mgr_.get(); }
ImpalaBackendClientCache* impalad_client_cache() {
return impalad_client_cache_.get();
@@ -106,6 +103,7 @@ class ExecEnv {
void set_enable_webserver(bool enable) { enable_webserver_ = enable; }
Scheduler* scheduler() { return scheduler_.get(); }
+ AdmissionController* admission_controller() { return admission_controller_.get(); }
StatestoreSubscriber* subscriber() { return statestore_subscriber_.get(); }
const TNetworkAddress& backend_address() const { return backend_address_; }
@@ -129,6 +127,7 @@ class ExecEnv {
boost::scoped_ptr<MetricGroup> metrics_;
boost::scoped_ptr<DataStreamMgr> stream_mgr_;
boost::scoped_ptr<Scheduler> scheduler_;
+ boost::scoped_ptr<AdmissionController> admission_controller_;
boost::scoped_ptr<StatestoreSubscriber> statestore_subscriber_;
boost::scoped_ptr<ImpalaBackendClientCache> impalad_client_cache_;
boost::scoped_ptr<CatalogServiceClientCache> catalogd_client_cache_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index 3c9947a..efec78a 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -199,9 +199,11 @@ string AdmissionController::PoolStats::DebugString() const {
// TODO: do we need host_id_ to come from host_addr or can it just take the same id
// the Scheduler has (coming from the StatestoreSubscriber)?
-AdmissionController::AdmissionController(RequestPoolService* request_pool_service,
- MetricGroup* metrics, const TNetworkAddress& host_addr)
- : request_pool_service_(request_pool_service),
+AdmissionController::AdmissionController(StatestoreSubscriber* subscriber,
+ RequestPoolService* request_pool_service, MetricGroup* metrics,
+ const TNetworkAddress& host_addr)
+ : subscriber_(subscriber),
+ request_pool_service_(request_pool_service),
metrics_group_(metrics),
host_id_(TNetworkAddressToString(host_addr)),
thrift_serializer_(false),
@@ -224,10 +226,10 @@ AdmissionController::~AdmissionController() {
dequeue_thread_->Join();
}
-Status AdmissionController::Init(StatestoreSubscriber* subscriber) {
+Status AdmissionController::Init() {
StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
- Status status = subscriber->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+ Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
if (!status.ok()) {
status.AddDetail("AdmissionController failed to register request queue topic");
}
@@ -578,12 +580,12 @@ void AdmissionController::PoolStats::UpdateRemoteStats(const string& host_id,
if (VLOG_ROW_IS_ON) {
stringstream ss;
ss << "Stats update for pool=" << name_ << " backend=" << host_id;
- if (host_stats == NULL) ss << " topic deletion";
+ if (host_stats == nullptr) ss << " topic deletion";
if (it != remote_stats_.end()) ss << " previous: " << DebugPoolStats(it->second);
- if (host_stats != NULL) ss << " new: " << DebugPoolStats(*host_stats);
+ if (host_stats != nullptr) ss << " new: " << DebugPoolStats(*host_stats);
VLOG_ROW << ss.str();
}
- if (host_stats == NULL) {
+ if (host_stats == nullptr) {
if (it != remote_stats_.end()) {
remote_stats_.erase(it);
} else {
@@ -620,7 +622,7 @@ void AdmissionController::HandleTopicDeletions(const vector<string>& topic_delet
string topic_backend_id;
if (!ParsePoolTopicKey(topic_key, &pool_name, &topic_backend_id)) continue;
if (topic_backend_id == host_id_) continue;
- GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, NULL);
+ GetPoolStats(pool_name)->UpdateRemoteStats(topic_backend_id, nullptr);
}
}
@@ -705,7 +707,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
ExecEnv::GetInstance()->pool_mem_trackers()->GetRequestPoolMemTracker(name_, false);
const int64_t current_reserved =
- tracker == NULL ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
+ tracker == nullptr ? static_cast<int64_t>(0) : tracker->GetPoolMemReserved();
if (current_reserved != local_stats_.backend_mem_reserved) {
parent_->pools_for_updates_.insert(name_);
local_stats_.backend_mem_reserved = current_reserved;
@@ -713,7 +715,7 @@ void AdmissionController::PoolStats::UpdateMemTrackerStats() {
}
const int64_t current_usage =
- tracker == NULL ? static_cast<int64_t>(0) : tracker->consumption();
+ tracker == nullptr ? static_cast<int64_t>(0) : tracker->consumption();
metrics_.local_backend_mem_usage->set_value(current_usage);
}
@@ -797,7 +799,7 @@ void AdmissionController::DequeueLoop() {
while (max_to_dequeue > 0 && !queue.empty()) {
QueueNode* queue_node = queue.head();
- DCHECK(queue_node != NULL);
+ DCHECK(queue_node != nullptr);
DCHECK(!queue_node->is_admitted.IsSet());
const QuerySchedule& schedule = queue_node->schedule;
string not_admitted_reason;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index f0a37d8..d3ec49d 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -178,7 +178,8 @@ class ExecEnv;
/// better idea of what is perhaps unnecessary.
class AdmissionController {
public:
- AdmissionController(RequestPoolService* request_pool_service, MetricGroup* metrics,
+ AdmissionController(StatestoreSubscriber* subscriber,
+ RequestPoolService* request_pool_service, MetricGroup* metrics,
const TNetworkAddress& host_addr);
~AdmissionController();
@@ -196,8 +197,8 @@ class AdmissionController {
/// This does not block.
Status ReleaseQuery(QuerySchedule* schedule);
- /// Registers with the subscription manager.
- Status Init(StatestoreSubscriber* subscriber);
+ /// Registers the request queue topic with the statestore.
+ Status Init();
private:
class PoolStats;
@@ -206,6 +207,10 @@ class AdmissionController {
/// Statestore topic name.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
+ /// Subscription manager used to handle admission control updates. This is not
+ /// owned by this class.
+ StatestoreSubscriber* subscriber_;
+
/// Used for user-to-pool resolution and looking up pool configurations. Not owned by
/// the AdmissionController.
RequestPoolService* request_pool_service_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 2474f33..782379c 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -450,13 +450,13 @@ SchedulerWrapper::SchedulerWrapper(const Plan& plan)
}
Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
- DCHECK(scheduler_ != NULL);
+ DCHECK(scheduler_ != nullptr);
// Compute Assignment.
FragmentScanRangeAssignment* assignment = result->AddAssignment();
- return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0, NULL,
- false, plan_.scan_range_locations(), plan_.referenced_datanodes(), exec_at_coord,
- plan_.query_options(), NULL, assignment);
+ return scheduler_->ComputeScanRangeAssignment(*scheduler_->GetBackendConfig(), 0,
+ nullptr, false, plan_.scan_range_locations(), plan_.referenced_datanodes(),
+ exec_at_coord, plan_.query_options(), nullptr, assignment);
}
void SchedulerWrapper::AddBackend(const Host& host) {
@@ -495,7 +495,7 @@ void SchedulerWrapper::SendEmptyUpdate() {
}
void SchedulerWrapper::InitializeScheduler() {
- DCHECK(scheduler_ == NULL);
+ DCHECK(scheduler_ == nullptr);
DCHECK_GT(plan_.cluster().NumHosts(), 0) << "Cannot initialize scheduler with 0 "
<< "hosts.";
const Host& scheduler_host = plan_.cluster().hosts()[0];
@@ -504,8 +504,8 @@ void SchedulerWrapper::InitializeScheduler() {
scheduler_backend_address.hostname = scheduler_host.ip;
scheduler_backend_address.port = scheduler_host.be_port;
- scheduler_.reset(new Scheduler(
- NULL, scheduler_backend_id, scheduler_backend_address, &metrics_, NULL, NULL));
+ scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, scheduler_backend_address,
+ &metrics_, nullptr, nullptr));
scheduler_->Init();
// Initialize the scheduler backend maps.
SendFullMembershipMap();
@@ -532,7 +532,7 @@ void SchedulerWrapper::AddHostToTopicDelta(const Host& host, TTopicDelta* delta)
}
void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
- DCHECK(scheduler_ != NULL);
+ DCHECK(scheduler_ != nullptr);
// Wrap in topic delta map.
StatestoreSubscriber::TopicDeltaMap delta_map;
delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler-test.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test.cc b/be/src/scheduling/scheduler-test.cc
index 272a288..3e05c5b 100644
--- a/be/src/scheduling/scheduler-test.cc
+++ b/be/src/scheduling/scheduler-test.cc
@@ -362,8 +362,11 @@ TEST_F(SchedulerTest, TestSendUpdates) {
}
/// IMPALA-4329: Test scheduling with no backends.
-/// With the fix for IMPALA-4494, the scheduler will always register its local backend
-/// with itself, so scheduling with no backends will still succeed.
+/// With the fix for IMPALA-5058, the scheduler is no longer responsible for
+/// registering the local backend with itself. This functionality is moved to
+/// ImpalaServer::MembershipCallback() and the scheduler will receive the local
+/// backend info through the statestore update, so until that happens, scheduling
+/// should fail.
TEST_F(SchedulerTest, TestEmptyBackendConfig) {
Cluster cluster;
cluster.AddHost(false, true);
@@ -377,7 +380,7 @@ TEST_F(SchedulerTest, TestEmptyBackendConfig) {
Result result(plan);
SchedulerWrapper scheduler(plan);
Status status = scheduler.Compute(&result);
- EXPECT_TRUE(status.ok());
+ EXPECT_TRUE(!status.ok());
}
/// IMPALA-4494: Test scheduling with no backends but exec_at_coord.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 70dbcdc..a73e13a 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -30,6 +30,7 @@
#include "gen-cpp/ImpalaInternalService_constants.h"
#include "gen-cpp/Types_types.h"
#include "rapidjson/rapidjson.h"
+#include "runtime/exec-env.h"
#include "statestore/statestore-subscriber.h"
#include "util/container-util.h"
#include "util/metrics.h"
@@ -46,8 +47,6 @@ using namespace strings;
DECLARE_int32(be_port);
DECLARE_string(hostname);
-DEFINE_bool(disable_admission_control, false, "Disables admission control.");
-
namespace impala {
static const string LOCAL_ASSIGNMENTS_KEY("simple-scheduler.local-assignments.total");
@@ -69,17 +68,11 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
statestore_subscriber_(subscriber),
local_backend_id_(backend_id),
thrift_serializer_(false),
- total_assignments_(NULL),
- total_local_assignments_(NULL),
- initialized_(NULL),
+ total_assignments_(nullptr),
+ total_local_assignments_(nullptr),
+ initialized_(nullptr),
request_pool_service_(request_pool_service) {
local_backend_descriptor_.address = backend_address;
-
- if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
- if (!FLAGS_disable_admission_control) {
- admission_controller_.reset(
- new AdmissionController(request_pool_service_, metrics, backend_address));
- }
}
Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metrics,
@@ -87,20 +80,14 @@ Scheduler::Scheduler(const vector<TNetworkAddress>& backends, MetricGroup* metri
: backend_config_(std::make_shared<const BackendConfig>(backends)),
metrics_(metrics),
webserver_(webserver),
- statestore_subscriber_(NULL),
+ statestore_subscriber_(nullptr),
thrift_serializer_(false),
- total_assignments_(NULL),
- total_local_assignments_(NULL),
- initialized_(NULL),
+ total_assignments_(nullptr),
+ total_local_assignments_(nullptr),
+ initialized_(nullptr),
request_pool_service_(request_pool_service) {
DCHECK(backends.size() > 0);
local_backend_descriptor_.address = MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port);
- if (FLAGS_disable_admission_control) LOG(INFO) << "Admission control is disabled.";
- // request_pool_service_ may be null in unit tests
- if (request_pool_service_ != NULL && !FLAGS_disable_admission_control) {
- admission_controller_.reset(
- new AdmissionController(request_pool_service_, metrics, TNetworkAddress()));
- }
}
Status Scheduler::Init() {
@@ -122,14 +109,14 @@ Status Scheduler::Init() {
coord_only_backend_config_.AddBackend(local_backend_descriptor_);
- if (webserver_ != NULL) {
+ if (webserver_ != nullptr) {
Webserver::UrlCallback backends_callback =
bind<void>(mem_fn(&Scheduler::BackendsUrlCallback), this, _1, _2);
webserver_->RegisterUrlCallback(
BACKENDS_WEB_PAGE, BACKENDS_TEMPLATE, backends_callback);
}
- if (statestore_subscriber_ != NULL) {
+ if (statestore_subscriber_ != nullptr) {
StatestoreSubscriber::UpdateCallback cb =
bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
@@ -137,12 +124,9 @@ Status Scheduler::Init() {
status.AddDetail("Scheduler failed to register membership topic");
return status;
}
- if (!FLAGS_disable_admission_control) {
- RETURN_IF_ERROR(admission_controller_->Init(statestore_subscriber_));
- }
}
- if (metrics_ != NULL) {
+ if (metrics_ != nullptr) {
// This is after registering with the statestored, so we already have to synchronize
// access to the backend_config_ shared_ptr.
int num_backends = GetBackendConfig()->NumBackends();
@@ -153,8 +137,8 @@ Status Scheduler::Init() {
metrics_->AddGauge<int64_t>(NUM_BACKENDS_KEY, num_backends);
}
- if (statestore_subscriber_ != NULL) {
- if (webserver_ != NULL) {
+ if (statestore_subscriber_ != nullptr) {
+ if (webserver_ != nullptr) {
const TNetworkAddress& webserver_address = webserver_->http_address();
if (IsWildcardAddress(webserver_address.hostname)) {
local_backend_descriptor_.__set_debug_http_address(
@@ -253,31 +237,9 @@ void Scheduler::UpdateMembership(
}
}
- // If the local backend is not in our view of the membership list, we should add it
- // and tell the statestore. We also ensure that it is part of our backend config.
- if (current_membership_.find(local_backend_id_) == current_membership_.end()) {
- new_backend_config->AddBackend(local_backend_descriptor_);
- VLOG(1) << "Registering local backend with statestore";
- subscriber_topic_updates->push_back(TTopicDelta());
- TTopicDelta& update = subscriber_topic_updates->back();
- update.topic_name = IMPALA_MEMBERSHIP_TOPIC;
- update.topic_entries.push_back(TTopicItem());
-
- TTopicItem& item = update.topic_entries.back();
- item.key = local_backend_id_;
- Status status = thrift_serializer_.Serialize(&local_backend_descriptor_, &item.value);
- if (!status.ok()) {
- LOG(WARNING) << "Failed to serialize Impala backend address for statestore topic:"
- << " " << status.GetDetail();
- subscriber_topic_updates->pop_back();
- }
- }
-
- DCHECK(new_backend_config->LookUpBackendIp(
- local_backend_descriptor_.address.hostname, nullptr));
SetBackendConfig(new_backend_config);
- if (metrics_ != NULL) {
+ if (metrics_ != nullptr) {
/// TODO-MT: fix this (do we even need to report it?)
num_fragment_instances_metric_->set_value(current_membership_.size());
}
@@ -285,7 +247,7 @@ void Scheduler::UpdateMembership(
Scheduler::BackendConfigPtr Scheduler::GetBackendConfig() const {
lock_guard<mutex> l(backend_config_lock_);
- DCHECK(backend_config_.get() != NULL);
+ DCHECK(backend_config_.get() != nullptr);
BackendConfigPtr backend_config = backend_config_;
return backend_config;
}
@@ -660,7 +622,7 @@ Status Scheduler::ComputeScanRangeAssignment(const BackendConfig& backend_config
// cache to worry about.
// Remote reads will always break ties by backend rank.
bool decide_local_assignment_by_rank = random_replica || cached_replica;
- const IpAddr* backend_ip = NULL;
+ const IpAddr* backend_ip = nullptr;
backend_ip = assignment_ctx.SelectLocalBackendHost(
backend_candidates, decide_local_assignment_by_rank);
TBackendDescriptor backend;
@@ -771,17 +733,6 @@ Status Scheduler::Schedule(QuerySchedule* schedule) {
}
}
schedule->SetUniqueHosts(unique_hosts);
-
- if (!FLAGS_disable_admission_control) {
- RETURN_IF_ERROR(admission_controller_->AdmitQuery(schedule));
- }
- return Status::OK();
-}
-
-Status Scheduler::Release(QuerySchedule* schedule) {
- if (!FLAGS_disable_admission_control) {
- RETURN_IF_ERROR(admission_controller_->ReleaseQuery(schedule));
- }
return Status::OK();
}
@@ -844,7 +795,7 @@ const IpAddr* Scheduler::AssignmentCtx::SelectRemoteBackendHost() {
DCHECK_EQ(backend_config_.NumBackends(), assignment_heap_.size());
candidate_ip = &(assignment_heap_.top().ip);
}
- DCHECK(candidate_ip != NULL);
+ DCHECK(candidate_ip != nullptr);
return candidate_ip;
}
@@ -866,7 +817,7 @@ int Scheduler::AssignmentCtx::GetBackendRank(const IpAddr& ip) const {
void Scheduler::AssignmentCtx::SelectBackendOnHost(
const IpAddr& backend_ip, TBackendDescriptor* backend) {
- DCHECK(backend_config_.LookUpBackendIp(backend_ip, NULL));
+ DCHECK(backend_config_.LookUpBackendIp(backend_ip, nullptr));
const BackendConfig::BackendList& backends_on_host =
backend_config_.GetBackendListForHost(backend_ip);
DCHECK(backends_on_host.size() > 0);
@@ -934,8 +885,8 @@ void Scheduler::AssignmentCtx::RecordScanRangeAssignment(
if (is_cached) assignment_byte_counters_.cached_bytes += scan_range_length;
}
- if (total_assignments_ != NULL) {
- DCHECK(total_local_assignments_ != NULL);
+ if (total_assignments_ != nullptr) {
+ DCHECK(total_local_assignments_ != nullptr);
total_assignments_->Increment(1);
if (!remote_read) total_local_assignments_->Increment(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index bd2aff4..ca520c8 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -33,9 +33,9 @@
#include "gen-cpp/Types_types.h" // for TNetworkAddress
#include "rapidjson/document.h"
#include "rpc/thrift-util.h"
-#include "scheduling/admission-controller.h"
#include "scheduling/backend-config.h"
#include "scheduling/query-schedule.h"
+#include "scheduling/request-pool-service.h"
#include "statestore/statestore-subscriber.h"
#include "util/metrics.h"
#include "util/network-util.h"
@@ -101,9 +101,6 @@ class Scheduler {
/// returning.
Status Schedule(QuerySchedule* schedule);
- /// Releases the reserved resources (if any) from the given schedule.
- Status Release(QuerySchedule* schedule);
-
private:
/// Map from a host's IP address to the next backend to be round-robin scheduled for
/// that host (needed for setups with multiple backends on a single host)
@@ -324,9 +321,6 @@ class Scheduler {
/// us.
RequestPoolService* request_pool_service_;
- /// Used to make admission decisions in 'Schedule()'
- boost::scoped_ptr<AdmissionController> admission_controller_;
-
/// Helper methods to access backend_config_ (the shared_ptr, not its contents),
/// protecting the access with backend_config_lock_.
BackendConfigPtr GetBackendConfig() const;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 8417cf1..313708e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -177,6 +177,10 @@ DEFINE_int32(idle_query_timeout, 0, "The time, in seconds, that a query may be i
"QUERY_TIMEOUT_S overrides this setting, but, if set, --idle_query_timeout represents"
" the maximum allowable timeout.");
+DEFINE_bool(is_coordinator, true, "If true, this Impala daemon can accept and coordinate "
+ "queries from clients. If false, this daemon will only execute query fragments, and "
+ "will refuse client connections.");
+
// TODO: Remove for Impala 3.0.
DEFINE_string(local_nodemanager_url, "", "Deprecated");
@@ -244,7 +248,8 @@ class CancellationWork {
};
ImpalaServer::ImpalaServer(ExecEnv* exec_env)
- : exec_env_(exec_env) {
+ : exec_env_(exec_env),
+ thrift_serializer_(false) {
// Initialize default config
InitializeConfigVariables();
@@ -328,15 +333,21 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
ABORT_IF_ERROR(ExternalDataSourceExecutor::InitJNI(exec_env->metrics()));
// Register the membership callback if required
- if (exec_env->subscriber() != NULL) {
- StatestoreSubscriber::UpdateCallback cb =
- bind<void>(mem_fn(&ImpalaServer::MembershipCallback), this, _1, _2);
+ if (exec_env->subscriber() != nullptr) {
+ auto cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
+ vector<TTopicDelta>* topic_updates) {
+ this->MembershipCallback(state, topic_updates);
+ };
exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb);
- StatestoreSubscriber::UpdateCallback catalog_cb =
- bind<void>(mem_fn(&ImpalaServer::CatalogUpdateCallback), this, _1, _2);
- exec_env->subscriber()->AddTopic(
- CatalogServer::IMPALA_CATALOG_TOPIC, true, catalog_cb);
+ if (FLAGS_is_coordinator) {
+ auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
+ vector<TTopicDelta>* topic_updates) {
+ this->CatalogUpdateCallback(state, topic_updates);
+ };
+ exec_env->subscriber()->AddTopic(CatalogServer::IMPALA_CATALOG_TOPIC, true,
+ catalog_cb);
+ }
}
ABORT_IF_ERROR(UpdateCatalogMetrics());
@@ -358,6 +369,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
query_expiration_thread_.reset(new Thread("impala-server", "query-expirer",
bind<void>(&ImpalaServer::ExpireQueries, this)));
+ is_coordinator_ = FLAGS_is_coordinator;
exec_env_->SetImpalaServer(this);
}
@@ -392,6 +404,8 @@ Status ImpalaServer::LogLineageRecord(const QueryExecState& query_exec_state) {
return status;
}
+bool ImpalaServer::IsCoordinator() { return is_coordinator_; }
+
bool ImpalaServer::IsLineageLoggingEnabled() {
return !FLAGS_lineage_event_log_dir.empty();
}
@@ -562,7 +576,7 @@ Status ImpalaServer::InitProfileLogging() {
Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
bool base64_encoded, stringstream* output) {
- DCHECK(output != NULL);
+ DCHECK(output != nullptr);
// Search for the query id in the active query map
{
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
@@ -599,9 +613,9 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, TExecSummary* res
// Search for the query id in the active query map.
{
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
- if (exec_state != NULL) {
+ if (exec_state != nullptr) {
lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
- if (exec_state->coord() != NULL) {
+ if (exec_state->coord() != nullptr) {
TExecProgress progress;
{
lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
@@ -687,7 +701,7 @@ void ImpalaServer::ArchiveQuery(const QueryExecState& query) {
if (FLAGS_query_log_size == 0) return;
QueryStateRecord record(query, true, encoded_profile_str);
- if (query.coord() != NULL) {
+ if (query.coord() != nullptr) {
lock_guard<SpinLock> lock(query.coord()->GetExecSummaryLock());
record.exec_summary = query.coord()->exec_summary();
}
@@ -771,7 +785,7 @@ Status ImpalaServer::ExecuteInternal(
shared_ptr<SessionState> session_state,
bool* registered_exec_state,
shared_ptr<QueryExecState>* exec_state) {
- DCHECK(session_state != NULL);
+ DCHECK(session_state != nullptr);
*registered_exec_state = false;
exec_state->reset(new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
@@ -820,7 +834,7 @@ Status ImpalaServer::ExecuteInternal(
}
}
- if ((*exec_state)->coord() != NULL) {
+ if ((*exec_state)->coord() != nullptr) {
const unordered_set<TNetworkAddress>& unique_hosts =
(*exec_state)->schedule()->unique_hosts();
if (!unique_hosts.empty()) {
@@ -944,7 +958,7 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
exec_state->session()->inflight_queries.erase(query_id);
}
- if (exec_state->coord() != NULL) {
+ if (exec_state->coord() != nullptr) {
string exec_summary;
{
lock_guard<SpinLock> lock(exec_state->coord()->GetExecSummaryLock());
@@ -975,12 +989,12 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
Status ImpalaServer::UpdateCatalogMetrics() {
TGetDbsResult dbs;
- RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(NULL, NULL, &dbs));
+ RETURN_IF_ERROR(exec_env_->frontend()->GetDbs(nullptr, nullptr, &dbs));
ImpaladMetrics::CATALOG_NUM_DBS->set_value(dbs.dbs.size());
ImpaladMetrics::CATALOG_NUM_TABLES->set_value(0L);
for (const TDatabase& db: dbs.dbs) {
TGetTablesResult table_names;
- RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, NULL, NULL,
+ RETURN_IF_ERROR(exec_env_->frontend()->GetTableNames(db.db_name, nullptr, nullptr,
&table_names));
ImpaladMetrics::CATALOG_NUM_TABLES->Increment(table_names.tables.size());
}
@@ -992,7 +1006,7 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflig
const Status* cause) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
- if (exec_state == NULL) return Status("Invalid or unknown query handle");
+ if (exec_state == nullptr) return Status("Invalid or unknown query handle");
exec_state->Cancel(check_inflight, cause);
return Status::OK();
}
@@ -1014,7 +1028,7 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
session_state = entry->second;
session_state_map_.erase(session_id);
}
- DCHECK(session_state != NULL);
+ DCHECK(session_state != nullptr);
if (session_state->session_type == TSessionType::BEESWAX) {
ImpaladMetrics::IMPALA_SERVER_NUM_OPEN_BEESWAX_SESSIONS->Increment(-1L);
} else {
@@ -1081,7 +1095,7 @@ void ImpalaServer::ReportExecStatus(
// every report (assign each query a local int32_t id and use that to index into a
// vector of QueryExecStates, w/o lookup or locking?)
shared_ptr<QueryExecState> exec_state = GetQueryExecState(params.query_id, false);
- if (exec_state.get() == NULL) {
+ if (exec_state.get() == nullptr) {
// This is expected occasionally (since a report RPC might be in flight while
// cancellation is happening). Return an error to the caller to get it to stop.
const string& err = Substitute("ReportExecStatus(): Received report for unknown "
@@ -1475,6 +1489,10 @@ void ImpalaServer::MembershipCallback(
// This is a new item - add it to the map of known backends.
known_backends_.insert(make_pair(item.key, backend_descriptor));
}
+
+ // Register the local backend in the statestore and update the list of known backends.
+ AddLocalBackendToStatestore(subscriber_topic_updates);
+
// Process membership deletions.
for (const string& backend_id: delta.topic_deletions) {
known_backends_.erase(backend_id);
@@ -1559,13 +1577,48 @@ void ImpalaServer::MembershipCallback(
}
}
+void ImpalaServer::AddLocalBackendToStatestore(
+ vector<TTopicDelta>* subscriber_topic_updates) {
+ const string& local_backend_id = exec_env_->subscriber()->id();
+ if (known_backends_.find(local_backend_id) != known_backends_.end()) return;
+
+ TBackendDescriptor local_backend_descriptor;
+ local_backend_descriptor.__set_address(
+ MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port));
+ IpAddr ip;
+ const Hostname& hostname = local_backend_descriptor.address.hostname;
+ Status status = HostnameToIpAddr(hostname, &ip);
+ if (!status.ok()) {
+ // TODO: Should we do something about this failure?
+ LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: "
+ << status.GetDetail();
+ return;
+ }
+ local_backend_descriptor.ip_address = ip;
+ subscriber_topic_updates->emplace_back(TTopicDelta());
+ TTopicDelta& update = subscriber_topic_updates->back();
+ update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+ update.topic_entries.emplace_back(TTopicItem());
+
+ TTopicItem& item = update.topic_entries.back();
+ item.key = local_backend_id;
+ status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value);
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:"
+ << " " << status.GetDetail();
+ subscriber_topic_updates->pop_back();
+ } else {
+ known_backends_.insert(make_pair(item.key, local_backend_descriptor));
+ }
+}
+
ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_state,
bool copy_profile, const string& encoded_profile) {
id = exec_state.query_id();
const TExecRequest& request = exec_state.exec_request();
const string* plan_str = exec_state.summary_profile().GetInfoString("Plan");
- if (plan_str != NULL) plan = *plan_str;
+ if (plan_str != nullptr) plan = *plan_str;
stmt = exec_state.sql_stmt();
stmt_type = request.stmt_type;
effective_user = exec_state.effective_user();
@@ -1575,7 +1628,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const QueryExecState& exec_stat
has_coord = false;
Coordinator* coord = exec_state.coord();
- if (coord != NULL) {
+ if (coord != nullptr) {
num_complete_fragments = coord->progress().num_complete();
total_fragments = coord->progress().total();
has_coord = true;
@@ -1770,7 +1823,7 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
if (expiration_event->first > now) break;
shared_ptr<QueryExecState> query_state =
GetQueryExecState(expiration_event->second, false);
- if (query_state.get() == NULL) {
+ if (query_state.get() == nullptr) {
// Query was deleted some other way.
queries_by_timestamp_.erase(expiration_event++);
continue;
@@ -1830,17 +1883,43 @@ void ImpalaServer::RegisterSessionTimeout(int32_t session_timeout) {
Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int be_port,
ThriftServer** beeswax_server, ThriftServer** hs2_server, ThriftServer** be_server,
- ImpalaServer** impala_server) {
- DCHECK((beeswax_port == 0) == (beeswax_server == NULL));
- DCHECK((hs2_port == 0) == (hs2_server == NULL));
- DCHECK((be_port == 0) == (be_server == NULL));
+ boost::shared_ptr<ImpalaServer>* impala_server) {
+ DCHECK((beeswax_port == 0) == (beeswax_server == nullptr));
+ DCHECK((hs2_port == 0) == (hs2_server == nullptr));
+ DCHECK((be_port == 0) == (be_server == nullptr));
- boost::shared_ptr<ImpalaServer> handler(new ImpalaServer(exec_env));
+ impala_server->reset(new ImpalaServer(exec_env));
- if (beeswax_port != 0 && beeswax_server != NULL) {
+ if (be_port != 0 && be_server != nullptr) {
+ boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
+ boost::shared_ptr<TProcessor> be_processor(
+ new ImpalaInternalServiceProcessor(thrift_if));
+ boost::shared_ptr<TProcessorEventHandler> event_handler(
+ new RpcEventHandler("backend", exec_env->metrics()));
+ be_processor->setEventHandler(event_handler);
+
+ *be_server = new ThriftServer("backend", be_processor, be_port, nullptr,
+ exec_env->metrics(), FLAGS_be_service_threads);
+ if (EnableInternalSslConnections()) {
+ LOG(INFO) << "Enabling SSL for backend";
+ RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate,
+ FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd));
+ }
+
+ LOG(INFO) << "ImpalaInternalService listening on " << be_port;
+ }
+ if (!FLAGS_is_coordinator) {
+ LOG(INFO) << "Started worker Impala server on "
+ << ExecEnv::GetInstance()->backend_address();
+ return Status::OK();
+ }
+
+ // Initialize the HS2 and Beeswax services.
+ if (beeswax_port != 0 && beeswax_server != nullptr) {
// Beeswax FE must be a TThreadPoolServer because ODBC and Hue only support
// TThreadPoolServer.
- boost::shared_ptr<TProcessor> beeswax_processor(new ImpalaServiceProcessor(handler));
+ boost::shared_ptr<TProcessor> beeswax_processor(
+ new ImpalaServiceProcessor(*impala_server));
boost::shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("beeswax", exec_env->metrics()));
beeswax_processor->setEventHandler(event_handler);
@@ -1848,7 +1927,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
beeswax_port, AuthManager::GetInstance()->GetExternalAuthProvider(),
exec_env->metrics(), FLAGS_fe_service_threads, ThriftServer::ThreadPool);
- (*beeswax_server)->SetConnectionHandler(handler.get());
+ (*beeswax_server)->SetConnectionHandler(impala_server->get());
if (!FLAGS_ssl_server_certificate.empty()) {
LOG(INFO) << "Enabling SSL for Beeswax";
RETURN_IF_ERROR((*beeswax_server)->EnableSsl(FLAGS_ssl_server_certificate,
@@ -1858,10 +1937,10 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
LOG(INFO) << "Impala Beeswax Service listening on " << beeswax_port;
}
- if (hs2_port != 0 && hs2_server != NULL) {
+ if (hs2_port != 0 && hs2_server != nullptr) {
// HiveServer2 JDBC driver does not support non-blocking server.
boost::shared_ptr<TProcessor> hs2_fe_processor(
- new ImpalaHiveServer2ServiceProcessor(handler));
+ new ImpalaHiveServer2ServiceProcessor(*impala_server));
boost::shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("hs2", exec_env->metrics()));
hs2_fe_processor->setEventHandler(event_handler);
@@ -1870,7 +1949,7 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
AuthManager::GetInstance()->GetExternalAuthProvider(), exec_env->metrics(),
FLAGS_fe_service_threads, ThriftServer::ThreadPool);
- (*hs2_server)->SetConnectionHandler(handler.get());
+ (*hs2_server)->SetConnectionHandler(impala_server->get());
if (!FLAGS_ssl_server_certificate.empty()) {
LOG(INFO) << "Enabling SSL for HiveServer2";
RETURN_IF_ERROR((*hs2_server)->EnableSsl(FLAGS_ssl_server_certificate,
@@ -1880,32 +1959,14 @@ Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port, int
LOG(INFO) << "Impala HiveServer2 Service listening on " << hs2_port;
}
- if (be_port != 0 && be_server != NULL) {
- boost::shared_ptr<ImpalaInternalService> thrift_if(new ImpalaInternalService());
- boost::shared_ptr<TProcessor> be_processor(
- new ImpalaInternalServiceProcessor(thrift_if));
- boost::shared_ptr<TProcessorEventHandler> event_handler(
- new RpcEventHandler("backend", exec_env->metrics()));
- be_processor->setEventHandler(event_handler);
-
- *be_server = new ThriftServer("backend", be_processor, be_port, NULL,
- exec_env->metrics(), FLAGS_be_service_threads);
- if (EnableInternalSslConnections()) {
- LOG(INFO) << "Enabling SSL for backend";
- RETURN_IF_ERROR((*be_server)->EnableSsl(FLAGS_ssl_server_certificate,
- FLAGS_ssl_private_key, FLAGS_ssl_private_key_password_cmd));
- }
-
- LOG(INFO) << "ImpalaInternalService listening on " << be_port;
- }
- if (impala_server != NULL) *impala_server = handler.get();
-
+ LOG(INFO) << "Started coordinator Impala server on "
+ << ExecEnv::GetInstance()->backend_address();
return Status::OK();
}
bool ImpalaServer::GetSessionIdForQuery(const TUniqueId& query_id,
TUniqueId* session_id) {
- DCHECK(session_id != NULL);
+ DCHECK(session_id != nullptr);
lock_guard<mutex> l(query_exec_state_map_lock_);
QueryExecStateMap::iterator i = query_exec_state_map_.find(query_id);
if (i == query_exec_state_map_.end()) {
@@ -1931,7 +1992,7 @@ shared_ptr<ImpalaServer::QueryExecState> ImpalaServer::GetQueryExecState(
void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
const TUpdateFilterParams& params) {
shared_ptr<QueryExecState> query_exec_state = GetQueryExecState(params.query_id, false);
- if (query_exec_state.get() == NULL) {
+ if (query_exec_state.get() == nullptr) {
LOG(INFO) << "Could not find query exec state: " << params.query_id;
return;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 588a5c3..45e8080 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -270,6 +270,9 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// Returns true if lineage logging is enabled, false otherwise.
bool IsLineageLoggingEnabled();
+ /// Retuns true if this is a coordinator, false otherwise.
+ bool IsCoordinator();
+
/// The prefix of audit event log filename.
static const string AUDIT_EVENT_LOG_FILE_PREFIX;
@@ -432,6 +435,10 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// on why the failure occurred.
Status AuthorizeProxyUser(const std::string& user, const std::string& do_as_user);
+ // Check if the local backend descriptor is in the list of known backends. If not, add
+ // it to the list of known backends and add it to the 'topic_updates'.
+ void AddLocalBackendToStatestore(std::vector<TTopicDelta>* topic_updates);
+
/// Snapshot of a query's state, archived in the query log.
struct QueryStateRecord {
/// Pretty-printed runtime profile. TODO: Copy actual profile object
@@ -942,6 +949,13 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// Container for a thread that runs ExpireQueries() if FLAGS_idle_query_timeout is set.
boost::scoped_ptr<Thread> query_expiration_thread_;
+
+ /// Serializes TBackendDescriptors when creating topic updates
+ ThriftSerializer thrift_serializer_;
+
+ /// True if this ImpalaServer can accept client connections and coordinate
+ /// queries.
+ bool is_coordinator_;
};
/// Create an ImpalaServer and Thrift servers.
@@ -949,6 +963,8 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// ImpalaService (Beeswax) on beeswax_port (returned via beeswax_server).
/// If hs2_port != 0 (and hs2_server != NULL), creates a ThriftServer exporting
/// ImpalaHiveServer2Service on hs2_port (returned via hs2_server).
+/// ImpalaService and ImpalaHiveServer2Service are initialized only if this
+/// Impala server is a coordinator (indicated by the is_coordinator flag).
/// If be_port != 0 (and be_server != NULL), create a ThriftServer exporting
/// ImpalaInternalService on be_port (returned via be_server).
/// Returns created ImpalaServer. The caller owns fe_server and be_server.
@@ -958,7 +974,7 @@ class ImpalaServer : public ImpalaServiceIf, public ImpalaHiveServer2ServiceIf,
/// which case none of the output parameters can be assumed to be valid.
Status CreateImpalaServer(ExecEnv* exec_env, int beeswax_port, int hs2_port,
int be_port, ThriftServer** beeswax_server, ThriftServer** hs2_server,
- ThriftServer** be_server, ImpalaServer** impala_server);
+ ThriftServer** be_server, boost::shared_ptr<ImpalaServer>* impala_server);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/impalad-main.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impalad-main.cc b/be/src/service/impalad-main.cc
index 4848b45..01e5b55 100644
--- a/be/src/service/impalad-main.cc
+++ b/be/src/service/impalad-main.cc
@@ -56,6 +56,7 @@ DECLARE_int32(hs2_port);
DECLARE_int32(be_port);
DECLARE_string(principal);
DECLARE_bool(enable_rm);
+DECLARE_bool(is_coordinator);
int ImpaladMain(int argc, char** argv) {
InitCommonRuntime(argc, argv, true);
@@ -84,14 +85,17 @@ int ImpaladMain(int argc, char** argv) {
ThriftServer* beeswax_server = NULL;
ThriftServer* hs2_server = NULL;
ThriftServer* be_server = NULL;
- ImpalaServer* server = NULL;
+ boost::shared_ptr<ImpalaServer> server;
ABORT_IF_ERROR(CreateImpalaServer(&exec_env, FLAGS_beeswax_port, FLAGS_hs2_port,
FLAGS_be_port, &beeswax_server, &hs2_server, &be_server, &server));
ABORT_IF_ERROR(be_server->Start());
- ABORT_IF_ERROR(beeswax_server->Start());
- ABORT_IF_ERROR(hs2_server->Start());
+ if (FLAGS_is_coordinator) {
+ ABORT_IF_ERROR(beeswax_server->Start());
+ ABORT_IF_ERROR(hs2_server->Start());
+ }
+
Status status = exec_env.StartServices();
if (!status.ok()) {
LOG(ERROR) << "Impalad services did not start correctly, exiting. Error: "
@@ -101,13 +105,17 @@ int ImpaladMain(int argc, char** argv) {
}
ImpaladMetrics::IMPALA_SERVER_READY->set_value(true);
LOG(INFO) << "Impala has started.";
- // this blocks until the beeswax and hs2 servers terminate
- beeswax_server->Join();
- hs2_server->Join();
+ be_server->Join();
delete be_server;
- delete beeswax_server;
- delete hs2_server;
+
+ if (FLAGS_is_coordinator) {
+ // this blocks until the beeswax and hs2 servers terminate
+ beeswax_server->Join();
+ hs2_server->Join();
+ delete beeswax_server;
+ delete hs2_server;
+ }
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/service/query-exec-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc
index 2cf2e5d..f704676 100644
--- a/be/src/service/query-exec-state.cc
+++ b/be/src/service/query-exec-state.cc
@@ -25,6 +25,7 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/exec-env.h"
+#include "scheduling/admission-controller.h"
#include "scheduling/scheduler.h"
#include "service/frontend.h"
#include "service/impala-server.h"
@@ -456,6 +457,14 @@ Status ImpalaServer::QueryExecState::ExecQueryOrDmlRequest(
RETURN_IF_ERROR(UpdateQueryStatus(status));
}
+ if (exec_env_->admission_controller() != nullptr) {
+ status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+ }
+
coord_.reset(new Coordinator(*schedule_, exec_env_, query_events_));
status = coord_->Exec();
{
@@ -570,10 +579,12 @@ void ImpalaServer::QueryExecState::Done() {
if (coord_.get() != NULL) {
// Release any reserved resources.
- Status status = exec_env_->scheduler()->Release(schedule_.get());
- if (!status.ok()) {
- LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
- << " because of error: " << status.GetDetail();
+ if (exec_env_->admission_controller() != nullptr) {
+ Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
+ << " because of error: " << status.GetDetail();
+ }
}
coord_->TearDown();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/testutil/in-process-servers.h
----------------------------------------------------------------------
diff --git a/be/src/testutil/in-process-servers.h b/be/src/testutil/in-process-servers.h
index 54c7484..870f02d 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -69,7 +69,7 @@ class InProcessImpalaServer {
/// there was an error joining.
Status Join();
- ImpalaServer* impala_server() { return impala_server_; }
+ ImpalaServer* impala_server() { return impala_server_.get(); }
MetricGroup* metrics() { return exec_env_->metrics(); }
@@ -92,10 +92,9 @@ class InProcessImpalaServer {
uint32_t hs2_port_;
- /// The ImpalaServer that handles client and backend requests. Not owned by this class;
- /// instead it's owned via shared_ptrs in the ThriftServers. See CreateImpalaServer for
- /// details.
- ImpalaServer* impala_server_;
+ /// The ImpalaServer that handles client and backend requests. Ownership is shared via
+ /// shared_ptrs with the ThriftServers. See CreateImpalaServer for details.
+ boost::shared_ptr<ImpalaServer> impala_server_;
/// ExecEnv holds much of the per-service state
boost::scoped_ptr<ExecEnv> exec_env_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/be/src/util/webserver.cc
----------------------------------------------------------------------
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index 61ef3ea..3b7c4f9 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -34,6 +34,8 @@
#include "common/logging.h"
#include "rpc/thrift-util.h"
+#include "runtime/exec-env.h"
+#include "service/impala-server.h"
#include "thirdparty/mustache/mustache.h"
#include "util/asan.h"
#include "util/coding-util.h"
@@ -97,6 +99,8 @@ DEFINE_string(webserver_password_file, "",
DEFINE_string(webserver_x_frame_options, "DENY",
"webserver will add X-Frame-Options HTTP header with this value");
+DECLARE_bool(is_coordinator);
+
static const char* DOC_FOLDER = "/www/";
static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
@@ -116,7 +120,7 @@ static const char* ERROR_KEY = "__error_msg__";
const char* GetDefaultDocumentRoot() {
stringstream ss;
char* impala_home = getenv("IMPALA_HOME");
- if (impala_home == NULL) {
+ if (impala_home == nullptr) {
return ""; // Empty document root means don't serve static files
} else {
ss << impala_home;
@@ -152,7 +156,7 @@ string BuildHeaderString(ResponseCode response, ContentType content_type) {
}
Webserver::Webserver()
- : context_(NULL),
+ : context_(nullptr),
error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2),
"error.tmpl", false)) {
http_address_ = MakeNetworkAddress(
@@ -161,7 +165,7 @@ Webserver::Webserver()
}
Webserver::Webserver(const int port)
- : context_(NULL),
+ : context_(nullptr),
error_handler_(UrlHandler(bind<void>(&Webserver::ErrorHandler, this, _1, _2),
"error.tmpl", false)) {
http_address_ = MakeNetworkAddress("0.0.0.0", port);
@@ -186,6 +190,13 @@ void Webserver::RootHandler(const ArgumentMap& args, Document* document) {
document->GetAllocator());
document->AddMember("process_state_info", process_state_info,
document->GetAllocator());
+
+ ExecEnv* env = ExecEnv::GetInstance();
+ if (env == nullptr || env->impala_server() == nullptr) return;
+ string mode = (env->impala_server()->IsCoordinator()) ?
+ "Coordinator + Executor" : "Executor";
+ Value impala_server_mode(mode.c_str(), document->GetAllocator());
+ document->AddMember("impala_server_mode", impala_server_mode, document->GetAllocator());
}
void Webserver::ErrorHandler(const ArgumentMap& args, Document* document) {
@@ -300,7 +311,7 @@ Status Webserver::Start() {
options.push_back("no");
// Options must be a NULL-terminated list
- options.push_back(NULL);
+ options.push_back(nullptr);
// squeasel ignores SIGCHLD and we need it to run kinit. This means that since
// squeasel does not reap its own children CGI programs must be avoided.
@@ -321,7 +332,7 @@ Status Webserver::Start() {
// Restore the child signal handler so wait() works properly.
signal(SIGCHLD, sig_chld);
- if (context_ == NULL) {
+ if (context_ == nullptr) {
stringstream error_msg;
error_msg << "Webserver: Could not start on address " << http_address_;
return Status(error_msg.str());
@@ -337,14 +348,14 @@ Status Webserver::Start() {
}
void Webserver::Stop() {
- if (context_ != NULL) {
+ if (context_ != nullptr) {
sq_stop(context_);
- context_ = NULL;
+ context_ = nullptr;
}
}
void Webserver::GetCommonJson(Document* document) {
- DCHECK(document != NULL);
+ DCHECK(document != nullptr);
Value obj(kObjectType);
obj.AddMember("process-name", google::ProgramInvocationShortName(),
document->GetAllocator());
@@ -365,7 +376,7 @@ void Webserver::GetCommonJson(Document* document) {
int Webserver::LogMessageCallbackStatic(const struct sq_connection* connection,
const char* message) {
- if (message != NULL) {
+ if (message != nullptr) {
LOG(INFO) << "Webserver: " << message;
}
return PROCESSING_COMPLETE;
@@ -389,7 +400,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
}
map<string, string> arguments;
- if (request_info->query_string != NULL) {
+ if (request_info->query_string != nullptr) {
BuildArgumentMap(request_info->query_string, &arguments);
}
@@ -397,7 +408,7 @@ int Webserver::BeginRequestCallback(struct sq_connection* connection,
UrlHandlerMap::const_iterator it = url_handlers_.find(request_info->uri);
ResponseCode response = OK;
ContentType content_type = HTML;
- const UrlHandler* url_handler = NULL;
+ const UrlHandler* url_handler = nullptr;
if (it == url_handlers_.end()) {
response = NOT_FOUND;
arguments[ERROR_KEY] = Substitute("No URI handler for '$0'", request_info->uri);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/bin/start-impala-cluster.py
----------------------------------------------------------------------
diff --git a/bin/start-impala-cluster.py b/bin/start-impala-cluster.py
index b276b08..37671e3 100755
--- a/bin/start-impala-cluster.py
+++ b/bin/start-impala-cluster.py
@@ -36,6 +36,8 @@ DEFAULT_IMPALA_MAX_LOG_FILES = os.environ.get('IMPALA_MAX_LOG_FILES', 10)
parser = OptionParser()
parser.add_option("-s", "--cluster_size", type="int", dest="cluster_size", default=3,
help="Size of the cluster (number of impalad instances to start).")
+parser.add_option("-c", "--num_coordinators", type="int", dest="num_coordinators",
+ default=3, help="Number of coordinators.")
parser.add_option("--build_type", dest="build_type", default= 'latest',
help="Build type to use - debug / release / latest")
parser.add_option("--impalad_args", dest="impalad_args", action="append", type="string",
@@ -204,7 +206,7 @@ def build_jvm_args(instance_num):
BASE_JVM_DEBUG_PORT = 30000
return JVM_ARGS % (BASE_JVM_DEBUG_PORT + instance_num, options.jvm_args)
-def start_impalad_instances(cluster_size):
+def start_impalad_instances(cluster_size, num_coordinators):
if cluster_size == 0:
# No impalad instances should be started.
return
@@ -239,6 +241,10 @@ def start_impalad_instances(cluster_size):
if options.kudu_master_hosts:
# Must be prepended, otherwise the java options interfere.
args = "-kudu_master_hosts %s %s" % (options.kudu_master_hosts, args)
+
+ if i >= num_coordinators:
+ args = "-is_coordinator=false %s" % (args)
+
stderr_log_file_path = os.path.join(options.log_dir, '%s-error.log' % service_name)
exec_impala_process(IMPALAD_PATH, args, stderr_log_file_path)
@@ -279,9 +285,10 @@ def wait_for_cluster_web(timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS):
# impalad processes may take a while to come up.
wait_for_impala_process_count(impala_cluster)
for impalad in impala_cluster.impalads:
- impalad.service.wait_for_num_known_live_backends(options.cluster_size,
- timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
- wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
+ if impalad._get_arg_value('is_coordinator', default='true') == 'true':
+ impalad.service.wait_for_num_known_live_backends(options.cluster_size,
+ timeout=CLUSTER_WAIT_TIMEOUT_IN_SECONDS, interval=2)
+ wait_for_catalog(impalad, timeout_in_seconds=CLUSTER_WAIT_TIMEOUT_IN_SECONDS)
def wait_for_catalog(impalad, timeout_in_seconds):
"""Waits for the impalad catalog to become ready"""
@@ -326,6 +333,10 @@ if __name__ == "__main__":
print 'Please specify a cluster size >= 0'
sys.exit(1)
+ if options.num_coordinators <= 0:
+ print 'Please specify a valid number of coordinators > 0'
+ sys.exit(1)
+
if not os.path.isdir(options.log_dir):
print 'Log dir does not exist or is not a directory: %s' % options.log_dir
sys.exit(1)
@@ -372,7 +383,7 @@ if __name__ == "__main__":
if not options.restart_impalad_only:
start_statestore()
start_catalogd()
- start_impalad_instances(options.cluster_size)
+ start_impalad_instances(options.cluster_size, options.num_coordinators)
# Sleep briefly to reduce log spam: the cluster takes some time to start up.
sleep(3)
wait_for_cluster()
@@ -380,4 +391,5 @@ if __name__ == "__main__":
print 'Error starting cluster: %s' % e
sys.exit(1)
- print 'Impala Cluster Running with %d nodes.' % options.cluster_size
+ print 'Impala Cluster Running with %d nodes and %d coordinators.' % (
+ options.cluster_size, options.num_coordinators)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/custom_cluster_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/custom_cluster_test_suite.py b/tests/common/custom_cluster_test_suite.py
index 28f598c..b19b5d2 100644
--- a/tests/common/custom_cluster_test_suite.py
+++ b/tests/common/custom_cluster_test_suite.py
@@ -30,6 +30,7 @@ from time import sleep
IMPALA_HOME = os.environ['IMPALA_HOME']
CLUSTER_SIZE = 3
+NUM_COORDINATORS = CLUSTER_SIZE
# The number of statestore subscribers is CLUSTER_SIZE (# of impalad) + 1 (for catalogd).
NUM_SUBSCRIBERS = CLUSTER_SIZE + 1
@@ -107,10 +108,11 @@ class CustomClusterTestSuite(ImpalaTestSuite):
@classmethod
def _start_impala_cluster(cls, options, log_dir=os.getenv('LOG_DIR', "/tmp/"),
- cluster_size=CLUSTER_SIZE, log_level=1):
+ cluster_size=CLUSTER_SIZE, num_coordinators=NUM_COORDINATORS, log_level=1):
cls.impala_log_dir = log_dir
cmd = [os.path.join(IMPALA_HOME, 'bin/start-impala-cluster.py'),
'--cluster_size=%d' % cluster_size,
+ '--num_coordinators=%d' % num_coordinators,
'--log_dir=%s' % log_dir,
'--log_level=%s' % log_level]
try:
@@ -123,7 +125,8 @@ class CustomClusterTestSuite(ImpalaTestSuite):
raise Exception("statestored was not found")
statestored.service.wait_for_live_subscribers(NUM_SUBSCRIBERS, timeout=60)
for impalad in cls.cluster.impalads:
- impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
+ if impalad._get_arg_value('is_coordinator', default='true') == 'true':
+ impalad.service.wait_for_num_known_live_backends(CLUSTER_SIZE, timeout=60)
def assert_impalad_log_contains(self, level, line_regex, expected_count=1):
"""
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/common/impala_service.py
----------------------------------------------------------------------
diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py
index 4e9c815..265b1b6 100644
--- a/tests/common/impala_service.py
+++ b/tests/common/impala_service.py
@@ -26,6 +26,10 @@ import urllib
from time import sleep, time
from tests.common.impala_connection import create_connection, create_ldap_connection
+from TCLIService import TCLIService
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport
+from thrift.protocol import TBinaryProtocol
logging.basicConfig(level=logging.ERROR, format='%(threadName)s: %(message)s')
LOG = logging.getLogger('impala_service')
@@ -201,6 +205,16 @@ class ImpaladService(BaseImpalaService):
client.connect()
return client
+ def create_hs2_client(self):
+ """Creates a new HS2 client connection to the impalad"""
+ host, port = (self.hostname, self.hs2_port)
+ socket = TSocket(host, port)
+ transport = TBufferedTransport(socket)
+ transport.open()
+ protocol = TBinaryProtocol.TBinaryProtocol(transport)
+ hs2_client = TCLIService.Client(protocol)
+ return hs2_client
+
def get_catalog_object_dump(self, object_type, object_name):
return self.read_debug_webpage('catalog_objects?object_type=%s&object_name=%s' %\
(object_type, object_name))
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/tests/custom_cluster/test_coordinators.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_coordinators.py b/tests/custom_cluster/test_coordinators.py
new file mode 100644
index 0000000..6010404
--- /dev/null
+++ b/tests/custom_cluster/test_coordinators.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# The base class that should be used for almost all Impala tests
+
+import pytest
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+
+class TestCoordinators(CustomClusterTestSuite):
+ @pytest.mark.execute_serially
+ def test_multiple_coordinators(self):
+ """Test a cluster configuration in which not all impalad nodes are coordinators.
+ Verify that only coordinators can accept client connections and that select and DDL
+ queries run successfully."""
+
+ db_name = "TEST_MUL_COORD_DB"
+ self._start_impala_cluster([], num_coordinators=2, cluster_size=3)
+ assert len(self.cluster.impalads) == 3
+
+ coordinator1 = self.cluster.impalads[0]
+ coordinator2 = self.cluster.impalads[1]
+ worker = self.cluster.impalads[2]
+
+ # Verify that Beeswax and HS2 client connections can't be established at a worker node
+ beeswax_client = None
+ try:
+ beeswax_client = worker.service.create_beeswax_client()
+ except: pass
+ finally:
+ assert beeswax_client is None
+
+ hs2_client = None
+ try:
+ hs2_client = worker.service.create_hs2_client()
+ except: pass
+ finally:
+ assert hs2_client is None
+
+ # Verify that queries can successfully run on coordinator nodes
+ try:
+ client1 = coordinator1.service.create_beeswax_client()
+ client2 = coordinator2.service.create_beeswax_client()
+
+ # select queries
+ self.execute_query_expect_success(client1, "select 1")
+ self.execute_query_expect_success(client2, "select * from functional.alltypes");
+ # DDL queries w/o SYNC_DDL
+ self.execute_query_expect_success(client1, "refresh functional.alltypes")
+ query_options = {"sync_ddl" : 1}
+ self.execute_query_expect_success(client2, "refresh functional.alltypesagg",
+ query_options)
+ self.execute_query_expect_success(client1,
+ "create database if not exists %s" % db_name, query_options)
+ # Create a table using one coordinator
+ self.execute_query_expect_success(client1,
+ "create table %s.foo1 (col int)" % db_name, query_options)
+ # Drop the table using the other coordinator
+ self.execute_query_expect_success(client2, "drop table %s.foo1" % db_name,
+ query_options)
+ # Swap roles and repeat
+ self.execute_query_expect_success(client2,
+ "create table %s.foo2 (col int)" % db_name, query_options)
+ self.execute_query_expect_success(client1, "drop table %s.foo2" % db_name,
+ query_options)
+ self.execute_query_expect_success(client1, "drop database %s cascade" % db_name)
+ finally:
+ # Ensure the worker hasn't received any table metadata
+ num_tbls = worker.service.get_metric_value('catalog.num-tables')
+ assert num_tbls == 0
+ client1.close()
+ client2.close()
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/296df3c8/www/root.tmpl
----------------------------------------------------------------------
diff --git a/www/root.tmpl b/www/root.tmpl
index 916b48b..40d448a 100644
--- a/www/root.tmpl
+++ b/www/root.tmpl
@@ -18,6 +18,10 @@ under the License.
-->
{{! Template for / }}
{{>www/common-header.tmpl}}
+ {{?impala_server_mode}}
+ <h2>Impala Server Mode: {{impala_server_mode}}</h2>
+ {{/impala_server_mode}}
+
<h2>Vers<span id="v">i</span>on</h2>
<pre id="version_pre">{{version}}</pre>