You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by he...@apache.org on 2016/09/21 04:00:42 UTC
[5/7] incubator-impala git commit: IMPALA-4160: Remove Llama support.
IMPALA-4160: Remove Llama support.
Alas, poor Llama! I knew him, Impala: a system
of infinite jest, of most excellent fancy: we hath
borne him on our back a thousand times; and now, how
abhorred in my imagination it is!
Done:
* Removed QueryResourceMgr, ResourceBroker, CGroupsMgr
* Removed untested 'offline' mode and NM failure detection from
ImpalaServer
* Removed all Llama-related Thrift files
* Removed RM-related arguments to MemTracker constructors
* Deprecated all RM-related flags, printing a warning if enable_rm is
set
* Removed expansion logic from MemTracker
* Removed VCore logic from QuerySchedule
* Removed all reservation-related logic from Scheduler
* Removed RM metric descriptions
* Various misc. small class changes
Not done:
* Remove RM flags (--enable_rm etc.)
* Remove RM query options
* Changes to RequestPoolService (see IMPALA-4159)
* Remove estimates of VCores / memory from plan
Change-Id: Icfb14209e31f6608bb7b8a33789e00411a6447ef
Reviewed-on: http://gerrit.cloudera.org:8080/4445
Tested-by: Internal Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19de09ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19de09ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19de09ab
Branch: refs/heads/master
Commit: 19de09ab7db4498fa3dd6e0775e32581139dd336
Parents: 3be61f9
Author: Henry Robinson <he...@cloudera.com>
Authored: Thu Sep 15 18:09:46 2016 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Tue Sep 20 23:50:43 2016 +0000
----------------------------------------------------------------------
be/CMakeLists.txt | 4 -
be/generated-sources/gen-cpp/CMakeLists.txt | 8 -
be/src/bufferpool/reservation-tracker-test.cc | 6 +-
be/src/exec/blocking-join-node.cc | 8 -
be/src/exec/data-sink.cc | 2 +-
be/src/exec/exec-node.cc | 5 +-
be/src/exec/hash-join-node.cc | 1 -
be/src/exec/hdfs-scan-node-base.cc | 1 -
be/src/exec/hdfs-scan-node.cc | 37 -
be/src/exec/kudu-scan-node-test.cc | 2 +-
be/src/exec/kudu-scan-node.cc | 5 -
be/src/exec/kudu-table-sink-test.cc | 3 +-
be/src/exprs/expr-test.cc | 4 +-
be/src/resourcebroker/CMakeLists.txt | 31 -
be/src/resourcebroker/resource-broker.cc | 850 -------------------
be/src/resourcebroker/resource-broker.h | 424 ---------
be/src/runtime/buffered-block-mgr-test.cc | 4 +-
be/src/runtime/buffered-block-mgr.cc | 4 +-
be/src/runtime/collection-value-builder-test.cc | 2 +-
be/src/runtime/coordinator.cc | 23 +-
be/src/runtime/data-stream-recvr.cc | 2 +-
be/src/runtime/data-stream-test.cc | 8 +-
be/src/runtime/disk-io-mgr-test.cc | 4 +-
be/src/runtime/disk-io-mgr.cc | 4 +-
be/src/runtime/exec-env.cc | 196 +----
be/src/runtime/exec-env.h | 19 -
be/src/runtime/mem-pool-test.cc | 4 +-
be/src/runtime/mem-tracker-test.cc | 6 +-
be/src/runtime/mem-tracker.cc | 77 +-
be/src/runtime/mem-tracker.h | 81 +-
be/src/runtime/plan-fragment-executor.cc | 90 +-
be/src/runtime/runtime-filter-bank.cc | 5 +-
be/src/runtime/runtime-state.cc | 25 +-
be/src/runtime/runtime-state.h | 16 +-
be/src/runtime/test-env.cc | 2 +-
be/src/scheduling/CMakeLists.txt | 1 -
be/src/scheduling/query-resource-mgr.cc | 271 ------
be/src/scheduling/query-resource-mgr.h | 227 -----
be/src/scheduling/query-schedule.cc | 137 +--
be/src/scheduling/query-schedule.h | 38 +-
be/src/scheduling/request-pool-service.cc | 9 +-
be/src/scheduling/scheduler.h | 16 -
be/src/scheduling/simple-scheduler-test.cc | 4 +-
be/src/scheduling/simple-scheduler.cc | 171 +---
be/src/scheduling/simple-scheduler.h | 41 +-
be/src/service/impala-server.cc | 92 +-
be/src/service/impala-server.h | 24 -
be/src/service/impalad-main.cc | 8 +
be/src/service/query-exec-state.cc | 20 -
be/src/util/CMakeLists.txt | 2 -
be/src/util/cgroups-mgr.cc | 238 ------
be/src/util/cgroups-mgr.h | 175 ----
be/src/util/debug-util.h | 1 -
be/src/util/llama-util.cc | 152 ----
be/src/util/llama-util.h | 75 --
be/src/util/thread-pool.h | 4 -
be/src/util/thread.cc | 14 -
be/src/util/thread.h | 21 -
be/src/util/uid-util.h | 5 +-
bin/bootstrap_toolchain.py | 2 +-
bin/create-test-configuration.sh | 2 +-
bin/generate_minidump_collection_testdata.py | 1 -
bin/start-impala-cluster.py | 23 +-
common/thrift/CMakeLists.txt | 2 -
common/thrift/Frontend.thrift | 1 +
common/thrift/ImpalaInternalService.thrift | 7 -
common/thrift/Llama.thrift | 276 ------
common/thrift/ResourceBrokerService.thrift | 119 ---
common/thrift/metrics.json | 210 -----
.../com/cloudera/impala/planner/Planner.java | 1 +
testdata/cluster/admin | 12 +-
.../cdh5/etc/hadoop/conf/yarn-site.xml.tmpl | 13 +-
.../cdh5/etc/init.d/llama-application | 38 -
.../etc/llama/conf/llama-log4j.properties.tmpl | 29 -
.../cdh5/etc/llama/conf/llama-site.xml.tmpl | 86 --
.../common/etc/hadoop/conf/core-site.xml.tmpl | 10 -
76 files changed, 165 insertions(+), 4376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 6df394f..2546bd0 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -167,7 +167,6 @@ if (DOXYGEN_FOUND)
${CMAKE_SOURCE_DIR}/be/src/common/
${CMAKE_SOURCE_DIR}/be/src/exec/
${CMAKE_SOURCE_DIR}/be/src/exprs/
- ${CMAKE_SOURCE_DIR}/be/src/resourcebroker/
${CMAKE_SOURCE_DIR}/be/src/runtime/
${CMAKE_SOURCE_DIR}/be/src/scheduling/
${CMAKE_SOURCE_DIR}/be/src/service/
@@ -267,7 +266,6 @@ set (IMPALA_LINK_LIBS
Exprs
GlobalFlags
ImpalaThrift
- ResourceBroker
Rpc
Runtime
Scheduling
@@ -295,7 +293,6 @@ if (BUILD_SHARED_LIBS)
Statestore
Scheduling
Catalog
- ResourceBroker
ImpalaThrift
GlobalFlags
Common
@@ -423,7 +420,6 @@ add_subdirectory(src/catalog)
add_subdirectory(src/codegen)
add_subdirectory(src/common)
add_subdirectory(src/exec)
-add_subdirectory(src/resourcebroker)
add_subdirectory(src/exprs)
add_subdirectory(src/runtime)
add_subdirectory(src/scheduling)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/generated-sources/gen-cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt
index ac2907e..35f7d65 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -37,10 +37,6 @@ set(SRC_FILES
ImpalaService_constants.cpp
ImpalaService_types.cpp
ImpalaHiveServer2Service.cpp
- Llama_constants.cpp
- Llama_types.cpp
- LlamaAMService.cpp
- LlamaNotificationService.cpp
beeswax_constants.cpp
beeswax_types.cpp
BeeswaxService.cpp
@@ -79,10 +75,6 @@ set(SRC_FILES
NetworkTestService.cpp
PlanNodes_constants.cpp
PlanNodes_types.cpp
- ResourceBrokerNotificationService.cpp
- ResourceBrokerService_constants.cpp
- ResourceBrokerService_types.cpp
- ResourceBrokerService.cpp
Results_constants.cpp
Results_types.cpp
Partitions_constants.cpp
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc
index e43efb8..8dd1e41 100644
--- a/be/src/bufferpool/reservation-tracker-test.cc
+++ b/be/src/bufferpool/reservation-tracker-test.cc
@@ -235,8 +235,8 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
// of different code paths.
root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
MemTracker root_mem_tracker;
- MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker);
- MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", &root_mem_tracker);
+ MemTracker child_mem_tracker1(-1, "Child 1", &root_mem_tracker);
+ MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, "Child 2", &root_mem_tracker);
ReservationTracker child_reservations1, child_reservations2;
child_reservations1.InitChildTracker(
NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
@@ -317,7 +317,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
reservations[0].InitRootTracker(NewProfile(), 500);
for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
mem_trackers[i].reset(new MemTracker(
- mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 1].get()));
+ mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
reservations[i].InitChildTracker(
NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 2c17d13..309bde4 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -25,7 +25,6 @@
#include "runtime/row-batch.h"
#include "runtime/runtime-state.h"
#include "runtime/tuple-row.h"
-#include "util/cgroups-mgr.h"
#include "util/debug-util.h"
#include "util/runtime-profile-counters.h"
#include "util/time.h"
@@ -196,13 +195,6 @@ Status BlockingJoinNode::ConstructBuildAndOpenProbe(RuntimeState* state,
Thread build_thread(
node_name_, "build thread", bind(&BlockingJoinNode::ProcessBuildInputAsync, this,
state, build_sink, &build_side_status));
- if (!state->cgroup().empty()) {
- Status status = state->exec_env()->cgroups_mgr()->AssignThreadToCgroup(
- build_thread, state->cgroup());
- // If AssignThreadToCgroup() failed, we still need to wait for the build-side
- // thread to complete before returning, so just log that error.
- if (!status.ok()) state->LogError(status.msg());
- }
// Open the left child so that it may perform any initialisation in parallel.
// Don't exit even if we see an error, we still need to wait for the build thread
// to finish.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index a6c8fcd..8c8b2dc 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -153,7 +153,7 @@ Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
DCHECK(mem_tracker != NULL);
profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), GetName()));
mem_tracker_ = mem_tracker;
- expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker, false));
+ expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker, false));
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/exec-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index 59ac997..2dce0d3 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -147,10 +147,9 @@ Status ExecNode::Prepare(RuntimeState* state) {
DCHECK(runtime_profile_.get() != NULL);
rows_returned_counter_ =
ADD_COUNTER(runtime_profile_, "RowsReturned", TUnit::UNIT);
- mem_tracker_.reset(new MemTracker(
- runtime_profile_.get(), -1, -1, runtime_profile_->name(),
+ mem_tracker_.reset(new MemTracker(runtime_profile_.get(), -1, runtime_profile_->name(),
state->instance_mem_tracker()));
- expr_mem_tracker_.reset(new MemTracker(-1, -1, "Exprs", mem_tracker_.get(), false));
+ expr_mem_tracker_.reset(new MemTracker(-1, "Exprs", mem_tracker_.get(), false));
rows_returned_rate_ = runtime_profile()->AddDerivedCounter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 946ab48..52ba1d1 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -39,7 +39,6 @@
#include "common/names.h"
-DECLARE_string(cgroup_hierarchy_path);
DEFINE_bool(enable_probe_side_filtering, true, "Deprecated.");
using namespace impala;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index de1dad0..c03817b 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -43,7 +43,6 @@
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/string-buffer.h"
-#include "scheduling/query-resource-mgr.h"
#include "util/bit-util.h"
#include "util/container-util.h"
#include "util/debug-util.h"
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 83f6452..03f81ed 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -27,7 +27,6 @@
#include "runtime/runtime-state.h"
#include "runtime/mem-tracker.h"
#include "runtime/row-batch.h"
-#include "scheduling/query-resource-mgr.h"
#include "util/debug-util.h"
#include "util/disk-info.h"
#include "util/runtime-profile-counters.h"
@@ -154,12 +153,6 @@ Status HdfsScanNode::Prepare(RuntimeState* state) {
SCOPED_TIMER(runtime_profile_->total_time_counter());
RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
- // Assign scanner thread group to cgroup, if any.
- if (!state->cgroup().empty()) {
- scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr());
- scanner_threads_.SetCgroup(state->cgroup());
- }
-
// Compute the minimum bytes required to start a new thread. This is based on the
// file format.
// The higher the estimate, the less likely it is the query will fail but more likely
@@ -212,12 +205,6 @@ Status HdfsScanNode::Open(RuntimeState* state) {
thread_avail_cb_id_ = runtime_state_->resource_pool()->AddThreadAvailableCb(
bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this, _1));
- if (runtime_state_->query_resource_mgr() != NULL) {
- rm_callback_id_ = runtime_state_->query_resource_mgr()->AddVcoreAvailableCb(
- bind<void>(mem_fn(&HdfsScanNode::ThreadTokenAvailableCb), this,
- runtime_state_->resource_pool()));
- }
-
return Status::OK();
}
@@ -228,9 +215,6 @@ void HdfsScanNode::Close(RuntimeState* state) {
if (thread_avail_cb_id_ != -1) {
state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_);
}
- if (state->query_resource_mgr() != NULL && rm_callback_id_ != -1) {
- state->query_resource_mgr()->RemoveVcoreAvailableCb(rm_callback_id_);
- }
scanner_threads_.JoinAll();
@@ -326,8 +310,6 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
// 6. Don't start up a thread if there isn't enough memory left to run it.
// 7. Don't start up more than maximum number of scanner threads configured.
// 8. Don't start up if there are no thread tokens.
- // 9. Don't start up if we are running too many threads for our vcore allocation
- // (unless the thread is reserved, in which case it has to run).
// Case 4. We have not issued the initial ranges so don't start a scanner thread.
// Issuing ranges will call this function and we'll start the scanner threads then.
@@ -360,25 +342,12 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool)
break;
}
- // Case 9.
- if (!is_reserved) {
- if (runtime_state_->query_resource_mgr() != NULL &&
- runtime_state_->query_resource_mgr()->IsVcoreOverSubscribed()) {
- pool->ReleaseThreadToken(false);
- break;
- }
- }
-
COUNTER_ADD(&active_scanner_thread_counter_, 1);
COUNTER_ADD(num_scanner_threads_started_counter_, 1);
stringstream ss;
ss << "scanner-thread(" << num_scanner_threads_started_counter_->value() << ")";
scanner_threads_.AddThread(
new Thread("hdfs-scan-node", ss.str(), &HdfsScanNode::ScannerThread, this));
-
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
- }
}
}
@@ -411,9 +380,6 @@ void HdfsScanNode::ScannerThread() {
// Unlock before releasing the thread token to avoid deadlock in
// ThreadTokenAvailableCb().
l.unlock();
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
- }
runtime_state_->resource_pool()->ReleaseThreadToken(false);
if (filter_status.ok()) {
for (auto& ctx: filter_ctxs) {
@@ -495,9 +461,6 @@ void HdfsScanNode::ScannerThread() {
}
COUNTER_ADD(&active_scanner_thread_counter_, -1);
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(-1);
- }
runtime_state_->resource_pool()->ReleaseThreadToken(false);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node-test.cc b/be/src/exec/kudu-scan-node-test.cc
index 8324628..a0eabef 100644
--- a/be/src/exec/kudu-scan-node-test.cc
+++ b/be/src/exec/kudu-scan-node-test.cc
@@ -85,7 +85,7 @@ class KuduScanNodeTest : public testing::Test {
exec_env_->thread_mgr()->UnregisterPool(runtime_state_->resource_pool());
}
- runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), "", exec_env_.get()));
+ runtime_state_.reset(new RuntimeState(TExecPlanFragmentParams(), exec_env_.get()));
runtime_state_->InitMemTrackers(TUniqueId(), NULL, -1);
TKuduScanNode kudu_scan_node_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index e171afa..827631a 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -35,7 +35,6 @@
#include "runtime/row-batch.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
-#include "scheduling/query-resource-mgr.h"
#include "util/disk-info.h"
#include "util/jni-util.h"
#include "util/periodic-counter-updater.h"
@@ -248,10 +247,6 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourceMgr::ResourcePool* pool) {
VLOG_RPC << "Thread started: " << name;
scanner_threads_.AddThread(new Thread("kudu-scan-node", name,
&KuduScanNode::RunScannerThread, this, name, token));
-
- if (runtime_state_->query_resource_mgr() != NULL) {
- runtime_state_->query_resource_mgr()->NotifyThreadUsageChange(1);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exec/kudu-table-sink-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink-test.cc b/be/src/exec/kudu-table-sink-test.cc
index 96aec55..a1cbb68 100644
--- a/be/src/exec/kudu-table-sink-test.cc
+++ b/be/src/exec/kudu-table-sink-test.cc
@@ -49,8 +49,7 @@ static const int THIRD_SLOT_ID = 3;
class KuduTableSinkTest : public testing::Test {
public:
- KuduTableSinkTest()
- : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_) {}
+ KuduTableSinkTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_) {}
virtual void SetUp() {
// Create a Kudu client and the table (this will abort the test here
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/exprs/expr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exprs/expr-test.cc b/be/src/exprs/expr-test.cc
index 8a19603..8064de0 100644
--- a/be/src/exprs/expr-test.cc
+++ b/be/src/exprs/expr-test.cc
@@ -1025,7 +1025,7 @@ template <typename T> void TestSingleLiteralConstruction(
const ColumnType& type, const T& value, const string& string_val) {
ObjectPool pool;
RowDescriptor desc;
- RuntimeState state(TExecPlanFragmentParams(), "", NULL);
+ RuntimeState state(TExecPlanFragmentParams(), NULL);
MemTracker tracker;
Expr* expr = pool.Add(new Literal(type, value));
@@ -1041,7 +1041,7 @@ TEST_F(ExprTest, NullLiteral) {
for (int type = TYPE_BOOLEAN; type != TYPE_DATE; ++type) {
NullLiteral expr(static_cast<PrimitiveType>(type));
ExprContext ctx(&expr);
- RuntimeState state(TExecPlanFragmentParams(), "", NULL);
+ RuntimeState state(TExecPlanFragmentParams(), NULL);
MemTracker tracker;
EXPECT_OK(ctx.Prepare(&state, RowDescriptor(), &tracker));
EXPECT_OK(ctx.Open(&state));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/CMakeLists.txt b/be/src/resourcebroker/CMakeLists.txt
deleted file mode 100644
index 776152c..0000000
--- a/be/src/resourcebroker/CMakeLists.txt
+++ /dev/null
@@ -1,31 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-# where to put generated libraries
-set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker")
-
-# where to put generated binaries
-set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/resourcebroker")
-
-add_library(ResourceBroker
- resource-broker.cc
-)
-add_dependencies(ResourceBroker thrift-deps)
-
-# TODO: Add resource broker BE test
-# ADD_BE_TEST(resource-broker-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.cc
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/resource-broker.cc b/be/src/resourcebroker/resource-broker.cc
deleted file mode 100644
index 4690c59..0000000
--- a/be/src/resourcebroker/resource-broker.cc
+++ /dev/null
@@ -1,850 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "resourcebroker/resource-broker.h"
-
-#include <boost/algorithm/string/join.hpp>
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
-#include <boost/lexical_cast.hpp>
-#include <gutil/strings/substitute.h>
-#include <thrift/Thrift.h>
-
-#include "common/status.h"
-#include "rpc/thrift-util.h"
-#include "rpc/thrift-server.h"
-#include "scheduling/query-resource-mgr.h"
-#include "scheduling/scheduler.h"
-#include "util/debug-util.h"
-#include "util/uid-util.h"
-#include "util/network-util.h"
-#include "util/llama-util.h"
-#include "util/time.h"
-#include "gen-cpp/ResourceBrokerService.h"
-#include "gen-cpp/Llama_types.h"
-
-#include "common/names.h"
-
-using boost::algorithm::join;
-using boost::algorithm::to_lower;
-using boost::uuids::random_generator;
-using namespace ::apache::thrift::server;
-using namespace ::apache::thrift;
-using namespace impala;
-using namespace strings;
-
-DECLARE_int64(llama_registration_timeout_secs);
-DECLARE_int64(llama_registration_wait_secs);
-DECLARE_int64(llama_max_request_attempts);
-
-DECLARE_int32(resource_broker_cnxn_attempts);
-DECLARE_int32(resource_broker_cnxn_retry_interval_ms);
-DECLARE_int32(resource_broker_send_timeout);
-DECLARE_int32(resource_broker_recv_timeout);
-
-static const string LLAMA_KERBEROS_SERVICE_NAME = "llama";
-
-namespace impala {
-
-// String to search for in Llama error messages to detect that Llama has restarted,
-// and hence the resource broker must re-register.
-const string LLAMA_RESTART_SEARCH_STRING = "unknown handle";
-
-class LlamaNotificationThriftIf : public llama::LlamaNotificationServiceIf {
- public:
- LlamaNotificationThriftIf(ResourceBroker* resource_broker)
- : resource_broker_(resource_broker) {}
-
- virtual void AMNotification(llama::TLlamaAMNotificationResponse& response,
- const llama::TLlamaAMNotificationRequest& request) {
- resource_broker_->AMNotification(request, response);
- }
-
- virtual void NMNotification(llama::TLlamaNMNotificationResponse& response,
- const llama::TLlamaNMNotificationRequest& request) {
- LOG(WARNING) << "Ignoring node-manager notification. Handling not yet implemented.";
- response.status.__set_status_code(llama::TStatusCode::OK);
- }
-
- virtual ~LlamaNotificationThriftIf() {}
-
- private:
- ResourceBroker* resource_broker_;
-};
-
-ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses,
- const TNetworkAddress& llama_callback_address, MetricGroup* metrics) :
- llama_addresses_(llama_addresses),
- active_llama_addr_idx_(-1),
- llama_callback_address_(llama_callback_address),
- metrics_(metrics),
- scheduler_(NULL),
- llama_callback_thrift_iface_(new LlamaNotificationThriftIf(this)),
- llama_client_cache_(new ClientCache<llama::LlamaAMServiceClient>(
- FLAGS_resource_broker_cnxn_attempts,
- FLAGS_resource_broker_cnxn_retry_interval_ms,
- FLAGS_resource_broker_send_timeout,
- FLAGS_resource_broker_recv_timeout,
- LLAMA_KERBEROS_SERVICE_NAME)) {
- DCHECK(metrics != NULL);
- llama_client_cache_->InitMetrics(metrics, "resource-broker");
- active_llama_metric_ = metrics->AddProperty<string>(
- "resource-broker.active-llama", "none");
- active_llama_handle_metric_ = metrics->AddProperty<string>(
- "resource-broker.active-llama-handle", "none");
-
- reservation_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
- "resource-broker.reservation-request-rpc-time");
- reservation_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
- "resource-broker.reservation-request-response-time");
- reservation_requests_total_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.reservation-requests-total", 0);
- reservation_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.reservation-requests-fulfilled", 0);
- reservation_requests_failed_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.reservation-requests-failed", 0);
- reservation_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.reservation-requests-rejected", 0);
- reservation_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.reservation-requests-timedout", 0);
-
- expansion_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
- "resource-broker.expansion-request-rpc-time");
- expansion_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
- "resource-broker.expansion-request-response-time");
- expansion_requests_total_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.expansion-requests-total", 0);
- expansion_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.expansion-requests-fulfilled", 0);
- expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.expansion-requests-failed", 0);
- expansion_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.expansion-requests-rejected", 0);
- expansion_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.expansion-requests-timedout", 0);
-
- requests_released_metric_ = metrics->AddCounter<int64_t>(
- "resource-broker.requests-released", 0);
- allocated_memory_metric_ = metrics->AddGauge<uint64_t>(
- "resource-broker.memory-resources-in-use", 0L);
- allocated_vcpus_metric_ = metrics->AddGauge<uint64_t>(
- "resource-broker.vcpu-resources-in-use", 0);
-}
-
-Status ResourceBroker::Init() {
- // The scheduler must have been set before calling Init().
- DCHECK(scheduler_ != NULL);
- DCHECK(llama_callback_thrift_iface_ != NULL);
- if (llama_addresses_.size() == 0) {
- return Status("No Llama addresses configured (see --llama_addresses)");
- }
-
- boost::shared_ptr<TProcessor> llama_callback_proc(
- new llama::LlamaNotificationServiceProcessor(llama_callback_thrift_iface_));
- llama_callback_server_.reset(new ThriftServer("llama-callback", llama_callback_proc,
- llama_callback_address_.port, NULL, metrics_, 5));
- RETURN_IF_ERROR(llama_callback_server_->Start());
-
- // Generate client id for registration with Llama, and register with LLama.
- random_generator uuid_generator;
- llama_client_id_ = uuid_generator();
- RETURN_IF_ERROR(RegisterWithLlama());
- RETURN_IF_ERROR(RefreshLlamaNodes());
- return Status::OK();
-}
-
-Status ResourceBroker::RegisterWithLlama() {
- // Remember the current llama_handle_ to detect if another thread has already
- // completed the registration successfully.
- llama::TUniqueId current_llama_handle = llama_handle_;
-
- // Start time that this thread attempted registration. Used to limit the time that a
- // query will wait for re-registration with the Llama to succeed.
- int64_t start = MonotonicSeconds();
- lock_guard<mutex> l(llama_registration_lock_);
- if (llama_handle_ != current_llama_handle) return Status::OK();
-
- active_llama_metric_->set_value("none");
- active_llama_handle_metric_->set_value("none");
-
- int llama_addr_idx = (active_llama_addr_idx_ + 1) % llama_addresses_.size();
- int64_t now = MonotonicSeconds();
- while (FLAGS_llama_registration_timeout_secs == -1 ||
- (now - start) < FLAGS_llama_registration_timeout_secs) {
- // Connect to the Llama at llama_address.
- const TNetworkAddress& llama_address = llama_addresses_[llama_addr_idx];
- // Client status will be ok if a Thrift connection could be successfully established
- // for the returned client at some point in the past. Hence, there is no guarantee
- // that the connection is still valid now and we must check for broken pipes, etc.
- Status client_status;
- ClientConnection<llama::LlamaAMServiceClient> llama_client(llama_client_cache_.get(),
- llama_address, &client_status);
- if (client_status.ok()) {
- // Register this resource broker with Llama.
- llama::TLlamaAMRegisterRequest request;
- request.__set_version(llama::TLlamaServiceVersion::V1);
- llama::TUniqueId llama_uuid;
- UUIDToTUniqueId(llama_client_id_, &llama_uuid);
- request.__set_client_id(llama_uuid);
-
- llama::TNetworkAddress callback_address;
- callback_address << llama_callback_address_;
- request.__set_notification_callback_service(callback_address);
- llama::TLlamaAMRegisterResponse response;
- LOG(INFO) << "Registering Resource Broker with Llama at " << llama_address;
- Status rpc_status =
- llama_client.DoRpc(&llama::LlamaAMServiceClient::Register, request, &response);
- if (rpc_status.ok()) {
- // TODO: Is there a period where an inactive Llama may respond to RPCs?
- // If so, then we need to keep cycling through Llamas here and not
- // return an error.
- RETURN_IF_ERROR(LlamaStatusToImpalaStatus(
- response.status, "Failed to register Resource Broker with Llama."));
- LOG(INFO) << "Received Llama client handle " << response.am_handle
- << ((response.am_handle == llama_handle_) ? " (same as old)" : "");
- llama_handle_ = response.am_handle;
- break;
- }
- }
- // Cycle through the list of Llama addresses for Llama failover.
- llama_addr_idx = (llama_addr_idx + 1) % llama_addresses_.size();
- LOG(INFO) << "Failed to connect to Llama at " << llama_address << "." << endl
- << "Error: " << client_status.GetDetail() << endl
- << "Retrying to connect to Llama at "
- << llama_addresses_[llama_addr_idx] << " in "
- << FLAGS_llama_registration_wait_secs << "s.";
- // Sleep to give Llama time to recover/failover before the next attempt.
- SleepForMs(FLAGS_llama_registration_wait_secs * 1000);
- now = MonotonicSeconds();
- }
- DCHECK(FLAGS_llama_registration_timeout_secs != -1);
- if ((now - start) >= FLAGS_llama_registration_timeout_secs) {
- return Status("Failed to (re-)register Resource Broker with Llama.");
- }
-
- if (llama_addr_idx != active_llama_addr_idx_) {
- // TODO: We've switched to a different Llama (failover). Cancel all queries
- // coordinated by this Impalad to free up physical resources that are not
- // accounted for anymore by Yarn.
- }
-
- // If we reached this point, (re-)registration was successful.
- active_llama_addr_idx_ = llama_addr_idx;
- active_llama_metric_->set_value(lexical_cast<string>(llama_addresses_[llama_addr_idx]));
- active_llama_handle_metric_->set_value(lexical_cast<string>(llama_handle_));
- return Status::OK();
-}
-
-bool ResourceBroker::LlamaHasRestarted(const llama::TStatus& status) const {
- if (status.status_code == llama::TStatusCode::OK || !status.__isset.error_msgs) {
- return false;
- }
- // Check whether one of the error messages contains LLAMA_RESTART_SEARCH_STRING.
- for (int i = 0; i < status.error_msgs.size(); ++i) {
- string error_msg = status.error_msgs[i];
- to_lower(error_msg);
- if (error_msg.find(LLAMA_RESTART_SEARCH_STRING) != string::npos) {
- LOG(INFO) << "Assuming Llama restart from error message: " << status.error_msgs[i];
- return true;
- }
- }
- return false;
-}
-
-void ResourceBroker::Close() {
- // Close connections to all Llama addresses, not just the active one.
- for (const TNetworkAddress& llama_address: llama_addresses_) {
- llama_client_cache_->CloseConnections(llama_address);
- }
- llama_callback_server_->Join();
-}
-
-void ResourceBroker::CreateLlamaReservationRequest(
- const TResourceBrokerReservationRequest& src,
- llama::TLlamaAMReservationRequest& dest) {
- dest.version = llama::TLlamaServiceVersion::V1;
- dest.am_handle = llama_handle_;
- dest.gang = src.gang;
- // Queue is optional, so must be explicitly set for all versions of Thrift to work
- // together.
- dest.__set_queue(src.queue);
- dest.user = src.user;
- dest.resources = src.resources;
- random_generator uuid_generator;
- llama::TUniqueId request_id;
- UUIDToTUniqueId(uuid_generator(), &request_id);
- dest.__set_reservation_id(request_id);
-}
-
-template <class F, typename LlamaReqType, typename LlamaRespType>
-Status ResourceBroker::LlamaRpc(const F& f, LlamaReqType* request,
- LlamaRespType* response, StatsMetric<double>* rpc_time_metric) {
- int attempts = 0;
- MonotonicStopWatch sw;
- // Indicates whether to re-register with Llama before the next RPC attempt,
- // e.g. because Llama has restarted or become unavailable.
- bool register_with_llama = false;
- while (attempts < FLAGS_llama_max_request_attempts) {
- if (register_with_llama) {
- RETURN_IF_ERROR(ReRegisterWithLlama(*request, response));
- // Set the new Llama handle received from re-registering.
- request->__set_am_handle(llama_handle_);
- VLOG_RPC << "Retrying Llama RPC after re-registration: " << *request;
- register_with_llama = false;
- }
- ++attempts;
- Status rpc_status;
- ClientConnection<llama::LlamaAMServiceClient> llama_client(llama_client_cache_.get(),
- llama_addresses_[active_llama_addr_idx_], &rpc_status);
- if (!rpc_status.ok()) {
- register_with_llama = true;
- continue;
- }
-
- sw.Start();
- Status status = llama_client.DoRpc(f, *request, response);
- if (!status.ok()) {
- VLOG_RPC << "Error making Llama RPC: " << status.GetDetail();
- register_with_llama = status.code() == TErrorCode::RPC_CLIENT_CONNECT_FAILURE;
- continue;
- }
- if (rpc_time_metric != NULL) {
- rpc_time_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
- }
-
- // Check whether Llama has been restarted. If so, re-register with it.
- // Break out of the loop here upon success of the RPC.
- if (!LlamaHasRestarted(response->status)) break;
- register_with_llama = true;
- }
- if (attempts >= FLAGS_llama_max_request_attempts) {
- return Status(Substitute(
- "Request aborted after $0 attempts due to connectivity issues with Llama.",
- FLAGS_llama_max_request_attempts));
- }
- return Status::OK();
-}
-
-template <typename LlamaReqType, typename LlamaRespType>
-Status ResourceBroker::ReRegisterWithLlama(const LlamaReqType& request,
- LlamaRespType* response) {
- RETURN_IF_ERROR(RegisterWithLlama());
- return RefreshLlamaNodes();
-}
-
-template <>
-Status ResourceBroker::ReRegisterWithLlama(const llama::TLlamaAMGetNodesRequest& request,
- llama::TLlamaAMGetNodesResponse* response) {
- return RegisterWithLlama();
-}
-
-void ResourceBroker::PendingRequest::GetResources(ResourceMap* resources) {
- resources->clear();
- for (const llama::TAllocatedResource& resource: allocated_resources_) {
- TNetworkAddress host = MakeNetworkAddress(resource.location);
- (*resources)[host] = resource;
- VLOG_QUERY << "Getting allocated resource for reservation id "
- << reservation_id_ << " and location " << host;
- }
-}
-
-void ResourceBroker::PendingRequest::SetResources(
- const vector<llama::TAllocatedResource>& resources) {
- // TODO: Llama returns a dump of all resources that we need to manually group by
- // reservation id. Can Llama do the grouping for us?
- for (const llama::TAllocatedResource& resource: resources) {
- // Ignore resources that don't belong to the given reservation id.
- if (resource.reservation_id == request_id()) {
- allocated_resources_.push_back(resource);
- }
- }
-}
-
-bool ResourceBroker::WaitForNotification(int64_t timeout, ResourceMap* resources,
- bool* timed_out, PendingRequest* pending_request) {
- bool request_granted;
- if (timeout <= 0) {
- *timed_out = false;
- request_granted = pending_request->promise()->Get();
- } else {
- request_granted = pending_request->promise()->Get(timeout, timed_out);
- }
-
- // Remove the promise from the pending-requests map.
- const llama::TUniqueId& res_id = pending_request->reservation_id();
- {
- lock_guard<mutex> l(pending_requests_lock_);
- pending_requests_.erase(pending_request->request_id());
- if (pending_request->is_expansion()) {
- PendingExpansionIdsMap::iterator it = pending_expansion_ids_.find(res_id);
- if (it == pending_expansion_ids_.end()) {
- // If the AMNotification was received as the reservation was being cleaned up,
- // it's possible that the pending/allocated request structures were updated
- // before this thread was able to acquire the lock.
- VLOG_RPC << "Didn't find reservation=" << res_id << " in pending requests";
- return false;
- }
- it->second.erase(pending_request->request_id());
- }
- }
-
- if (request_granted && !*timed_out) {
- pending_request->GetResources(resources);
- int64_t total_memory_mb = 0L;
- int32_t total_vcpus = 0;
- for (const ResourceMap::value_type& resource: *resources) {
- total_memory_mb += resource.second.memory_mb;
- total_vcpus += resource.second.v_cpu_cores;
- }
- {
- lock_guard<mutex> l(allocated_requests_lock_);
- AllocatedRequestMap::iterator it = allocated_requests_.find(res_id);
- if (it == allocated_requests_.end()) {
- // The reservation may have already been cleaned up. See above.
- VLOG_RPC << "Didn't find reservation=" << res_id << " in allocated requests";
- return false;
- }
- it->second.push_back(AllocatedRequest(res_id, total_memory_mb, total_vcpus,
- pending_request->is_expansion()));
- allocated_memory_metric_->Increment(total_memory_mb * 1024L * 1024L);
- allocated_vcpus_metric_->Increment(total_vcpus);
- }
- }
-
- return request_granted;
-}
-
-Status ResourceBroker::Expand(const TUniqueId& reservation_id,
- const llama::TResource& resource, int64_t timeout_ms, llama::TUniqueId* expansion_id,
- llama::TAllocatedResource* allocated_resource) {
- llama::TLlamaAMReservationExpansionRequest ll_request;
- llama::TLlamaAMReservationExpansionResponse ll_response;
-
- ll_request.version = llama::TLlamaServiceVersion::V1;
- ll_request.am_handle = llama_handle_;
- ll_request.expansion_of << reservation_id;
- random_generator uuid_generator;
- llama::TUniqueId request_id;
- UUIDToTUniqueId(uuid_generator(), &request_id);
- ll_request.__set_expansion_id(request_id);
- ll_request.resource = resource;
- VLOG_RPC << "Sending expansion request for reservation_id=" << reservation_id
- << " expansion_id=" << request_id
- << " resource=" << resource;
-
- PendingRequest* pending_request;
- {
- lock_guard<mutex> l(pending_requests_lock_);
- PendingExpansionIdsMap::iterator it =
- pending_expansion_ids_.find(ll_request.expansion_of);
- // If pending_expansion_ids_ doesn't contain the reservation id then the
- // QueryResourceMgr has already been unregistered and the reservation has been
- // released.
- if (it == pending_expansion_ids_.end()) {
- return Status(Substitute("Resource expansion request (expansion id=$0, "
- "reservation id=$1) made after reservation released.",
- PrintId(ll_request.expansion_id), PrintId(reservation_id)));
- }
- it->second.insert(request_id);
- pending_request = new PendingRequest(ll_request.expansion_of, request_id, true);
- pending_requests_.insert(make_pair(request_id, pending_request));
- }
-
- MonotonicStopWatch sw;
- sw.Start();
- Status status = LlamaRpc(&llama::LlamaAMServiceClient::Expand, &ll_request,
- &ll_response, expansion_rpc_time_metric_);
- // Check the status of the response.
- if (!status.ok()) {
- expansion_requests_failed_metric_->Increment(1);
- return status;
- }
-
- Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
- if (!request_status.ok()) {
- expansion_requests_failed_metric_->Increment(1);
- return request_status;
- }
-
- ResourceMap allocated_resources;
- bool timed_out = false;
- bool request_granted = WaitForNotification(timeout_ms,
- &allocated_resources, &timed_out, pending_request);
-
- if (timed_out) {
- expansion_requests_timedout_metric_->Increment(1);
- Status release_status = ReleaseRequest(request_id);
- if (!release_status.ok()) {
- VLOG_QUERY << "Error releasing timed out expansion request, expansion_id="
- << request_id << " status: " << release_status.GetDetail();
- }
- return Status(Substitute("Resource expansion request (expansion id=$0, "
- "reservation id=$1) exceeded timeout of $2.",
- PrintId(ll_request.expansion_id),
- PrintId(reservation_id),
- PrettyPrinter::Print(timeout_ms * 1000L * 1000L, TUnit::TIME_NS)));
- }
- expansion_response_time_metric_->Update(
- sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-
- if (!request_granted) {
- if (pending_request->is_cancelled()) {
- return Status(Substitute("Resource expansion request (expansion id=$0, "
- "reservation id=$1) was cancelled.", PrintId(ll_request.expansion_id),
- PrintId(reservation_id)));
- }
- expansion_requests_rejected_metric_->Increment(1);
- return Status(Substitute("Resource expansion request (expansion id=$0, "
- "reservation id=$1) was rejected.", PrintId(ll_request.expansion_id),
- PrintId(reservation_id)));
- }
-
- DCHECK_EQ(allocated_resources.size(), 1);
- *allocated_resource = allocated_resources.begin()->second;
- *expansion_id = request_id;
-
- VLOG_QUERY << "Fulfilled expansion for id=" << ll_response.expansion_id
- << " resource=" << *allocated_resource;
- expansion_requests_fulfilled_metric_->Increment(1);
- return Status::OK();
-}
-
-Status ResourceBroker::Reserve(const TResourceBrokerReservationRequest& request,
- TResourceBrokerReservationResponse* response) {
- VLOG_QUERY << "Sending reservation request: " << request;
- reservation_requests_total_metric_->Increment(1);
-
- llama::TLlamaAMReservationRequest ll_request;
- llama::TLlamaAMReservationResponse ll_response;
- CreateLlamaReservationRequest(request, ll_request);
- const llama::TUniqueId& res_id = ll_request.reservation_id;
-
- PendingRequest* pending_request;
- {
- pending_request = new PendingRequest(res_id, res_id, false);
- lock_guard<mutex> l(pending_requests_lock_);
- pending_requests_.insert(make_pair(pending_request->request_id(), pending_request));
- }
- {
- lock_guard<mutex> l(allocated_requests_lock_);
- DCHECK(allocated_requests_.find(res_id) == allocated_requests_.end());
- allocated_requests_[res_id] = vector<AllocatedRequest>();
- }
-
- MonotonicStopWatch sw;
- sw.Start();
- Status status = LlamaRpc(&llama::LlamaAMServiceClient::Reserve, &ll_request,
- &ll_response, reservation_rpc_time_metric_);
- // Check the status of the response.
- if (!status.ok()) {
- reservation_requests_failed_metric_->Increment(1);
- return status;
- }
- Status request_status = LlamaStatusToImpalaStatus(ll_response.status);
- if (!request_status.ok()) {
- reservation_requests_failed_metric_->Increment(1);
- return request_status;
- }
- VLOG_RPC << "Received reservation response, waiting for notification on: " << res_id;
-
- bool timed_out = false;
- bool request_granted = WaitForNotification(request.request_timeout,
- &response->allocated_resources, &timed_out, pending_request);
-
- if (request_granted || timed_out) {
- // Set the reservation_id to make sure it eventually gets released - even if when
- // timed out, since the response may arrive later.
- response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, TUniqueId>(res_id));
- }
-
- if (timed_out) {
- reservation_requests_timedout_metric_->Increment(1);
- return Status(Substitute(
- "Resource reservation request (id=$0) exceeded timeout of $1.",
- PrintId(res_id),
- PrettyPrinter::Print(request.request_timeout * 1000L * 1000L,
- TUnit::TIME_NS)));
- }
- reservation_response_time_metric_->Update(
- sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-
- if (!request_granted) {
- reservation_requests_rejected_metric_->Increment(1);
- return Status(Substitute("Resource reservation request (id=$0) was rejected.",
- PrintId(res_id)));
- }
-
- response->__set_reservation_id(CastTUniqueId<llama::TUniqueId, TUniqueId>(res_id));
- VLOG_QUERY << "Fulfilled reservation with id: " << res_id;
- reservation_requests_fulfilled_metric_->Increment(1);
- return Status::OK();
-}
-
-void ResourceBroker::ClearRequests(const TUniqueId& reservation_id) {
- int64_t total_memory_bytes = 0L;
- int32_t total_vcpus = 0L;
- llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id);
- {
- lock_guard<mutex> l(pending_requests_lock_);
- PendingExpansionIdsMap::iterator it = pending_expansion_ids_.find(llama_id);
- if (it != pending_expansion_ids_.end()) {
- for (const llama::TUniqueId& id: it->second) {
- PendingRequestMap::iterator request_it = pending_requests_.find(id);
- DCHECK(request_it != pending_requests_.end());
- if (request_it == pending_requests_.end()) continue;
- // It is possible that the AMNotification thread set the promise and the thread
- // waiting on the promise hasn't had a chance to acquire the
- // pending_requests_lock_ yet to remove it from pending_requests_. We don't need
- // to do anything because it will be released with the reservation anyway.
- if (request_it->second->promise()->IsSet()) continue;
- request_it->second->SetCancelled();
- request_it->second->promise()->Set(false);
- }
- it->second.clear();
- pending_expansion_ids_.erase(it);
- }
- }
- {
- lock_guard<mutex> l(allocated_requests_lock_);
- AllocatedRequestMap::iterator it = allocated_requests_.find(llama_id);
- if (it == allocated_requests_.end()) return;
- for (AllocatedRequest& allocated_req: it->second) {
- DCHECK(allocated_req.reservation_id() == llama_id);
- total_memory_bytes += (allocated_req.memory_mb() * 1024L * 1024L);
- total_vcpus += allocated_req.vcpus();
- }
- it->second.clear();
- allocated_requests_.erase(it);
- allocated_memory_metric_->Increment(-total_memory_bytes);
- allocated_vcpus_metric_->Increment(-total_vcpus);
- }
-
- VLOG_QUERY << "Releasing "
- << PrettyPrinter::Print(total_memory_bytes, TUnit::BYTES)
- << " and " << total_vcpus << " cores for " << llama_id;
-}
-
-Status ResourceBroker::ReleaseRequest(const llama::TUniqueId& request_id) {
- llama::TLlamaAMReleaseRequest llama_request;
- llama::TLlamaAMReleaseResponse llama_response;
- llama_request.version = llama::TLlamaServiceVersion::V1;
- llama_request.am_handle = llama_handle_;
- llama_request.reservation_id = request_id;
-
- RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::Release,
- &llama_request, &llama_response,reservation_rpc_time_metric_));
- RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
- return Status::OK();
-}
-
-Status ResourceBroker::ReleaseReservation(const impala::TUniqueId& reservation_id) {
- VLOG_QUERY << "Releasing all resources for reservation: " << reservation_id;
- llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id);
-
- ClearRequests(reservation_id);
- RETURN_IF_ERROR(ReleaseRequest(llama_id));
- requests_released_metric_->Increment(1);
- return Status::OK();
-}
-
-void ResourceBroker::AMNotification(const llama::TLlamaAMNotificationRequest& request,
- llama::TLlamaAMNotificationResponse& response) {
- {
- // This Impalad may have restarted, so it is possible Llama is sending notifications
- // while this Impalad is registering with Llama.
- lock_guard<mutex> l(llama_registration_lock_);
- if (request.am_handle != llama_handle_) {
- VLOG_QUERY << "Ignoring Llama AM notification with mismatched AM handle. "
- << "Known handle: " << llama_handle_ << ". Received handle: "
- << request.am_handle;
- // Ignore all notifications with mismatched handles.
- return;
- }
- }
- // Nothing to be done for heartbeats.
- if (request.heartbeat) return;
- VLOG_QUERY << "Received non-heartbeat AM notification";
-
- lock_guard<mutex> l(pending_requests_lock_);
-
- // Process granted allocations.
- for (const llama::TUniqueId& res_id: request.allocated_reservation_ids) {
- PendingRequestMap::iterator it = pending_requests_.find(res_id);
- if (it == pending_requests_.end()) {
- VLOG_RPC << "Allocation for " << res_id << " arrived after timeout or cleanup";
- continue;
- }
- if (it->second->promise()->IsSet()) {
- // The promise should not have been set unless it was already cancelled.
- DCHECK(it->second->is_cancelled());
- continue;
- }
- LOG(INFO) << "Received allocated resource for reservation id: " << res_id;
- it->second->SetResources(request.allocated_resources);
- it->second->promise()->Set(true);
- }
-
- // Process rejected allocations.
- for (const llama::TUniqueId& res_id: request.rejected_reservation_ids) {
- PendingRequestMap::iterator it = pending_requests_.find(res_id);
- if (it == pending_requests_.end()) {
- VLOG_RPC << "Rejection for " << res_id << " arrived after timeout";
- continue;
- }
- if (it->second->promise()->IsSet()) {
- DCHECK(it->second->is_cancelled());
- continue;
- }
- it->second->promise()->Set(false);
- }
-
- // TODO: We maybe want a thread pool for handling preemptions to avoid
- // blocking this function on query cancellations.
- // Process preempted reservations.
- for (const llama::TUniqueId& res_id: request.preempted_reservation_ids) {
- TUniqueId impala_res_id;
- impala_res_id << res_id;
- scheduler_->HandlePreemptedReservation(impala_res_id);
- }
-
- // Process preempted client resources.
- for (const llama::TUniqueId& res_id: request.preempted_client_resource_ids) {
- TUniqueId impala_res_id;
- impala_res_id << res_id;
- scheduler_->HandlePreemptedResource(impala_res_id);
- }
-
- // Process lost client resources.
- for (const llama::TUniqueId& res_id: request.lost_client_resource_ids) {
- TUniqueId impala_res_id;
- impala_res_id << res_id;
- scheduler_->HandlePreemptedResource(impala_res_id);
- }
-
- response.status.__set_status_code(llama::TStatusCode::OK);
-}
-
-void ResourceBroker::NMNotification(const llama::TLlamaNMNotificationRequest& request,
- llama::TLlamaNMNotificationResponse& response) {
-}
-
-Status ResourceBroker::RefreshLlamaNodes() {
- llama::TLlamaAMGetNodesRequest llama_request;
- llama_request.__set_am_handle(llama_handle_);
- llama_request.__set_version(llama::TLlamaServiceVersion::V1);
- llama::TLlamaAMGetNodesResponse llama_response;
-
- RETURN_IF_ERROR(LlamaRpc(&llama::LlamaAMServiceClient::GetNodes, &llama_request,
- &llama_response, NULL));
- RETURN_IF_ERROR(LlamaStatusToImpalaStatus(llama_response.status));
- llama_nodes_ = llama_response.nodes;
- LOG(INFO) << "Llama Nodes [" << join(llama_nodes_, ", ") << "]";
- return Status::OK();
-}
-
-bool ResourceBroker::GetQueryResourceMgr(const TUniqueId& query_id,
- const TUniqueId& reservation_id, const TNetworkAddress& local_resource_address,
- QueryResourceMgr** mgr) {
- lock_guard<mutex> l(query_resource_mgrs_lock_);
- pair<int32_t, QueryResourceMgr*>* entry = &query_resource_mgrs_[query_id];
- if (entry->second == NULL) {
- entry->second =
- new QueryResourceMgr(reservation_id, local_resource_address, query_id);
- DCHECK_EQ(entry->first, 0);
- // Also create the per-query entries in the allocated_resources_ and
- // pending_expansion_ids_ map.
- llama::TUniqueId llama_id = CastTUniqueId<TUniqueId, llama::TUniqueId>(reservation_id);
- {
- lock_guard<mutex> pending_lock(pending_requests_lock_);
- DCHECK(pending_expansion_ids_.find(llama_id) == pending_expansion_ids_.end());
- pending_expansion_ids_[llama_id] = boost::unordered_set<llama::TUniqueId>();
- }
- {
- lock_guard<mutex> allocated_lock(allocated_requests_lock_);
- if (allocated_requests_.find(llama_id) == allocated_requests_.end()) {
- allocated_requests_[llama_id] = vector<AllocatedRequest>();
- }
- }
- }
- *mgr = entry->second;
- // Return true if this is the first reference to this resource mgr.
- return ++entry->first == 1L;
-}
-
-void ResourceBroker::UnregisterQueryResourceMgr(const TUniqueId& query_id) {
- lock_guard<mutex> l(query_resource_mgrs_lock_);
- QueryResourceMgrsMap::iterator it = query_resource_mgrs_.find(query_id);
- DCHECK(it != query_resource_mgrs_.end())
- << "UnregisterQueryResourceMgr() without corresponding GetQueryResourceMgr()";
- if (--it->second.first == 0) {
- it->second.second->Shutdown();
- ClearRequests(it->second.second->reservation_id());
- delete it->second.second;
- query_resource_mgrs_.erase(it);
- }
-}
-
-ostream& operator<<(ostream& os,
- const map<TNetworkAddress, llama::TAllocatedResource>& resources) {
- typedef map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
- int count = 0;
- for (const ResourceMap::value_type& resource: resources) {
- os << "(" << resource.first << "," << resource.second << ")";
- if (++count != resources.size()) os << ",";
- }
- return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerReservationRequest& request) {
- os << "Reservation Request("
- << "queue=" << request.queue << " "
- << "user=" << request.user << " "
- << "gang=" << request.gang << " "
- << "request_timeout=" << request.request_timeout << " "
- << "resources=[";
- for (int i = 0; i < request.resources.size(); ++i) {
- os << request.resources[i];
- if (i + 1 != request.resources.size()) os << ",";
- }
- os << "])";
- return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerReservationResponse& reservation) {
- os << "Granted Reservation("
- << "reservation id=" << reservation.reservation_id << " "
- << "resources=[" << reservation.allocated_resources << "])";
- return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerExpansionRequest& request) {
- os << "Expansion Request("
- << "reservation id=" << request.reservation_id << " "
- << "resource=" << request.resource << " "
- << "request_timeout=" << request.request_timeout << ")";
- return os;
-}
-
-ostream& operator<<(ostream& os, const TResourceBrokerExpansionResponse& expansion) {
- os << "Expansion Response("
- << "reservation id=" << expansion.reservation_id << " "
- << "resources=[" << expansion.allocated_resources << "])";
- return os;
-}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/resourcebroker/resource-broker.h
----------------------------------------------------------------------
diff --git a/be/src/resourcebroker/resource-broker.h b/be/src/resourcebroker/resource-broker.h
deleted file mode 100644
index b9e0bd7..0000000
--- a/be/src/resourcebroker/resource-broker.h
+++ /dev/null
@@ -1,424 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef RESOURCE_BROKER_H_
-#define RESOURCE_BROKER_H_
-
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/scoped_ptr.hpp>
-#include <boost/uuid/uuid.hpp>
-
-#include "runtime/client-cache.h"
-#include "util/collection-metrics.h"
-#include "util/promise.h"
-#include "util/stopwatch.h"
-#include "gen-cpp/LlamaAMService.h"
-#include "gen-cpp/LlamaNotificationService.h"
-#include "gen-cpp/ResourceBrokerService_types.h"
-
-namespace impala {
-
-class QueryResourceMgr;
-class Status;
-class MetricGroup;
-class Scheduler;
-class ResourceBrokerNotificationServiceClient;
-class RuntimeProfile;
-
-/// Mediates resource-reservation requests between Impala and Yarn via the Llama service.
-/// The resource broker requests resources via the Llama's thrift interface and exposes
-/// a thrift server for the Llama to notify it of granted/denied/preempted resource
-/// reservations. The reserve/release API of the resource broker is blocking.
-/// The resource broker is configured with a list of Llama addresses that
-/// are cycled through for failover.
-/// TODO: Implement NM notification service.
-class ResourceBroker {
- public:
- ResourceBroker(const std::vector<TNetworkAddress>& llama_addresses,
- const TNetworkAddress& llama_callback_address, MetricGroup* metrics);
-
- /// Register this resource broker with LLama and starts the Llama callback service.
- /// Returns a non-OK status if the callback service failed to start (e.g., port in use)
- /// or if registration with the Llama failed (e.g., connection to Llama failed).
- Status Init();
-
- /// Closes the llama_client_cache_ and joins the llama_callback_server_.
- void Close();
-
- /// Requests resources from Llama. Blocks until the request has been granted or denied.
- /// TODO: Remove thrift interface
- Status Reserve(const TResourceBrokerReservationRequest& request,
- TResourceBrokerReservationResponse* response);
-
- /// Requests more resources specified by 'resource' from Llama for an existing
- /// reservation specified by the 'reservation_id'. Blocks until the request has been
- /// granted or rejected, or no response is received within the timeout specified, in
- /// which case a call to cancel the outstanding expansion is made and the call returns
- /// with an error status. If timeout_ms <= 0, the call will not timeout. If the
- /// expansion is successful, an OK status is returned and the 'expansion_id' and
- /// 'allocated_resource' are set. An error status is returned if a timeout or an error
- /// occurs.
- Status Expand(const TUniqueId& reservation_id, const llama::TResource& resource,
- int64_t timeout_ms, llama::TUniqueId* expansion_id,
- llama::TAllocatedResource* allocated_resource);
-
- /// Removes the record of all resource requests associated with this
- /// 'reservation_id', updating the per-node accounting of resources and cancels any
- /// threads waiting on pending expansions. Does not communicate this to Llama, so the
- /// coordinator should always call ReleaseReservation() to make sure that Llama knows
- /// the resources should be released.
- void ClearRequests(const TUniqueId& reservation_id);
-
- /// Releases resources acquired from Llama for this reservation and all associated
- /// expansion requests across _all_ nodes. Should therefore only be called once per
- /// query by the coordinator.
- Status ReleaseReservation(const TUniqueId& reservation_id);
-
- /// Handles asynchronous Llama Application Master (AM) notifications including
- /// granted/denied/preempted reservations and resources.
- void AMNotification(const llama::TLlamaAMNotificationRequest& request,
- llama::TLlamaAMNotificationResponse& response);
-
- /// Handles asynchronous notifications from the Llama Node Manager (NM)
- /// auxiliary service, in particular, incoming Yarn container allocations
- /// that are going to claim resources.
- /// TODO: Implement once NM service is fully functional.
- void NMNotification(const llama::TLlamaNMNotificationRequest& request,
- llama::TLlamaNMNotificationResponse& response);
-
- const std::vector<std::string>& llama_nodes() { return llama_nodes_; }
-
- /// Retrieves the nodes known to Llama and stores them in llama_nodes_.
- Status RefreshLlamaNodes();
-
- void set_scheduler(Scheduler* scheduler) { scheduler_ = scheduler; };
-
- /// Retrieves or creates a new QueryResourceMgr for the given query ID. Returns true if
- /// this is the first 'checkout' of this QueryResourceMgr, false otherwise. The other
- /// parameters are passed to the QueryResourceMgr constructor.
- bool GetQueryResourceMgr(const TUniqueId& query_id, const TUniqueId& reservation_id,
- const TNetworkAddress& local_resource_address, QueryResourceMgr** res_mgr);
-
- /// Decrements the reference count for a particular QueryResourceMgr. If this is the last
- /// reference (i.e. the ref count goes to 0), the QueryResourceMgr is deleted. It's an
- /// error to call this with a query_id that does not have a registered QueryResourceMgr.
- void UnregisterQueryResourceMgr(const TUniqueId& query_id);
-
- private:
- typedef std::map<TNetworkAddress, llama::TAllocatedResource> ResourceMap;
-
- bool has_standby_llama() { return llama_addresses_.size() > 1; }
-
- /// Registers this resource broker with the Llama. Cycles through the list of
- /// Llama addresses to find the active Llama which is accepting requests (if any).
- /// Returns a non-OK status if registration with any of the Llama's did not succeed
- /// within FLAGS_llama_registration_timeout_s seconds.
- /// Registration with the Llama is idempotent with respect to the llama_client_id_
- /// (see comment on llama_client_id_ for details).
- Status RegisterWithLlama();
-
- /// Issues the Llama RPC f where F is a thrift call taking LlamaReqType and returning
- /// LlamaRespType. If failures occur, this function handles re-registering with Llama
- /// if necessary and re-trying multiple times. If rpc_time_metric is non-NULL, the
- /// metric is updated upon success of the RPC. Returns a non-OK status if the RPC
- /// failed due to connectivity issues with the Llama. Returns OK if the RPC succeeded.
- template <class F, typename LlamaReqType, typename LlamaRespType>
- Status LlamaRpc(const F& f, LlamaReqType* request, LlamaRespType* response,
- StatsMetric<double>* rpc_time_metric);
-
- /// Re-registers with Llama to recover from the Llama being unreachable. Handles both
- /// Llama restart and failover. This function is a template to allow specialization on
- /// the Llama request/response type.
- template <typename LlamaReqType, typename LlamaRespType>
- Status ReRegisterWithLlama(const LlamaReqType& request, LlamaRespType* response);
-
- /// Detects Llama restarts from the given return status of a Llama RPC.
- bool LlamaHasRestarted(const llama::TStatus& status) const;
-
- /// Sends a Llama release RPC for the reservation or expansion with the specified
- /// request_id.
- Status ReleaseRequest(const llama::TUniqueId& request_id);
-
- /// Creates a Llama reservation request from a resource broker reservation request.
- void CreateLlamaReservationRequest(const TResourceBrokerReservationRequest& src,
- llama::TLlamaAMReservationRequest& dest);
-
- class PendingRequest;
- /// Wait for a reservation or expansion request to be fulfilled by the Llama via an
- /// async call into LlamaNotificationThriftIf::AMNotification(). If timeout_ms > 0, the
- /// call will not wait longer than timeout_ms before returning false and *timed_out set
- /// to true. If the request is fulfilled, resources and reservation_id are populated.
- bool WaitForNotification(int64_t timeout_ms, ResourceMap* resources, bool* timed_out,
- PendingRequest* reservation);
-
- /// Llama availability group.
- std::vector<TNetworkAddress> llama_addresses_;
-
- /// Indexes into llama_addresses_ indicating the currently active Llama.
- /// Protected by llama_registration_lock_.
- int active_llama_addr_idx_;
-
- /// Address of thrift server started in this resource broker to handle
- /// Llama notifications.
- TNetworkAddress llama_callback_address_;
-
- MetricGroup* metrics_;
-
- Scheduler* scheduler_;
-
- /// Address of the active Llama. A Llama is considered active once we have successfully
- /// registered with it. Set to "none" while registering with the Llama.
- StringProperty* active_llama_metric_;
-
- /// Llama handle received from the active Llama upon registration.
- /// Set to "none" while not registered with Llama.
- StringProperty* active_llama_handle_metric_;
-
- /// Accumulated statistics on the time taken to RPC a reservation request and receive
- /// an acknowledgement from Llama.
- StatsMetric<double>* reservation_rpc_time_metric_;
-
- /// Accumulated statistics on the time taken to complete a reservation request
- /// (granted or denied). The time includes the request RPC to Llama and the time
- /// the requesting thread waits on the pending_requests_'s promise.
- /// The metric does not include requests that timed out.
- StatsMetric<double>* reservation_response_time_metric_;
-
- /// Total number of reservation requests.
- IntCounter* reservation_requests_total_metric_;
-
- /// Number of fulfilled reservation requests.
- IntCounter* reservation_requests_fulfilled_metric_;
-
- /// Reservation requests that failed due to a malformed request or an internal
- /// error in Llama.
- IntCounter* reservation_requests_failed_metric_;
-
- /// Number of well-formed reservation requests rejected by the central scheduler.
- IntCounter* reservation_requests_rejected_metric_;
-
- /// Number of well-formed reservation requests that did not get fulfilled within
- /// the timeout period.
- IntCounter* reservation_requests_timedout_metric_;
-
- /// Accumulated statistics on the time taken to RPC an expansion request and receive an
- /// acknowledgement from Llama.
- StatsMetric<double>* expansion_rpc_time_metric_;
-
- /// Accumulated statistics on the time taken to complete an expansion request
- /// (granted or denied). The time includes the request RPC to Llama and the time
- /// the requesting thread waits on the pending_requests_'s promise.
- /// The metric does not include requests that timed out.
- StatsMetric<double>* expansion_response_time_metric_;
-
- /// Total number of expansion requests.
- IntCounter* expansion_requests_total_metric_;
-
- /// Number of fulfilled expansion requests.
- IntCounter* expansion_requests_fulfilled_metric_;
-
- /// Expansion requests that failed due to a malformed request or an internal
- /// error in Llama.
- IntCounter* expansion_requests_failed_metric_;
-
- /// Number of well-formed expansion requests rejected by the central scheduler.
- IntCounter* expansion_requests_rejected_metric_;
-
- /// Number of well-formed expansion requests that did not get fulfilled within
- /// the timeout period.
- IntCounter* expansion_requests_timedout_metric_;
-
- /// Total amount of memory currently allocated by Llama to this node
- UIntGauge* allocated_memory_metric_;
-
- /// Total number of vcpu cores currently allocated by Llama to this node
- UIntGauge* allocated_vcpus_metric_;
-
- /// Total number of fulfilled reservation requests that have been released.
- IntCounter* requests_released_metric_;
-
- /// Client id used to register with Llama. Set in Init(). Used to communicate to Llama
- /// whether this Impalad has restarted. Registration with Llama is idempotent if the
- /// same llama_client_id_ is passed, i.e., the same Llama handle is returned and
- /// resource allocations are preserved. From Llama's perspective an unknown
- /// llama_client_id_ indicates a new registration and all resources allocated by this
- /// Impalad under a different llama_client_id_ are consider lost and will be released.
- boost::uuids::uuid llama_client_id_;
-
- /// Thrift API implementation which proxies Llama notifications onto this ResourceBroker.
- boost::shared_ptr<llama::LlamaNotificationServiceIf> llama_callback_thrift_iface_;
- boost::scoped_ptr<ThriftServer> llama_callback_server_;
-
- /// Cache of Llama client connections.
- boost::scoped_ptr<ClientCache<llama::LlamaAMServiceClient>> llama_client_cache_;
-
- /// Lock to ensure that only a single registration with Llama is sent, e.g.,
- /// when multiple concurrent requests realize that Llama has restarted.
- boost::mutex llama_registration_lock_;
-
- /// Handle received from Llama during registration. Set in RegisterWithLlama().
- llama::TUniqueId llama_handle_;
-
- /// List of nodes registered with Llama. Set in RefreshLlamaNodes().
- std::vector<std::string> llama_nodes_;
-
- /// A PendingRequest tracks a single reservation or expansion request that is in flight
- /// to Llama. A new PendingRequest is created in either Expand() or Reserve(), and its
- /// promise() is blocked on there until a response is received for that request from
- /// Llama via AMNotification(), or until a timeout occurs.
- //
- /// Every request has a unique request_id which is assigned by the resource broker. Each
- /// request is also associated with exactly one reservation, via reservation_id(). This
- /// allows us to track which resources belong to which reservation, and to make sure that
- /// all are correctly accounted for when the reservation is released. Each reservation ID
- /// will belong to exactly one reservation request, and 0 or more expansion requests.
- class PendingRequest {
- public:
- PendingRequest(const llama::TUniqueId& reservation_id,
- const llama::TUniqueId& request_id, bool is_expansion)
- : reservation_id_(reservation_id), request_id_(request_id),
- is_expansion_(is_expansion), is_cancelled_(false) {
- DCHECK(is_expansion || reservation_id == request_id);
- }
-
- /// Promise is set to true if the reservation or expansion request was granted, false
- /// if it was rejected by Yarn. When promise()->Get() returns true,
- /// allocated_resources_ will be populated and it will be safe to call GetResources().
- Promise<bool>* promise() { return &promise_; }
-
- /// Called by WaitForNotification() to populate a map of resources once the
- /// corresponding request has returned successfully (and promise() therefore has
- /// returned true).
- void GetResources(ResourceMap* resources);
-
- /// Populates allocated_resources_ from all members of resources that match the given
- /// reservation id. Called in AMNotification().
- void SetResources(const std::vector<llama::TAllocatedResource>& resources);
-
- const llama::TUniqueId& request_id() const { return request_id_; }
- const llama::TUniqueId& reservation_id() const { return reservation_id_; }
-
- bool is_expansion() const { return is_expansion_; }
- bool is_cancelled() const { return is_cancelled_; }
-
- /// Sets the cancelled flag to true. Is only called before the promise is set and
- /// while the pending_requests_lock_ is held to avoid races.
- void SetCancelled() { is_cancelled_ = true; }
-
- private:
- /// Promise object that WaitForNotification() waits on and AMNotification() signals.
- Promise<bool> promise_;
-
- /// Filled in by AMNotification(), so that WaitForNotification() can read the set of
- /// allocated_resources without AMNotification() having to wait (hence the copy is
- /// deliberate, since the original copy may go out of scope).
- std::vector<llama::TAllocatedResource> allocated_resources_;
-
- /// The ID for the reservation associated with this request. There is always exactly
- /// one reservation associated with every request.
- llama::TUniqueId reservation_id_;
-
- /// The unique ID for this request. If this is a reservation request, request_id_ ==
- /// reservation_id_, otherwise this is generated during Expand().
- llama::TUniqueId request_id_;
-
- /// True if this is an expansion request, false if it is a reservation request
- bool is_expansion_;
-
- /// Set if the request was cancelled.
- bool is_cancelled_;
- };
-
- /// Protects pending_requests_ and pending_expansion_ids_
- boost::mutex pending_requests_lock_;
-
- /// Map from unique request ID provided to Llama (for both reservation and expansion
- /// requests) to PendingRequest object used to coordinate when a response is received
- /// from Llama.
- typedef boost::unordered_map<llama::TUniqueId, PendingRequest*> PendingRequestMap;
- PendingRequestMap pending_requests_;
-
- /// Map from reservation IDs to pending expansion IDs. All pending request IDs have a
- /// PendingRequest in pending_requests_.
- typedef boost::unordered_map<llama::TUniqueId, boost::unordered_set<llama::TUniqueId>>
- PendingExpansionIdsMap;
- PendingExpansionIdsMap pending_expansion_ids_;
-
- /// An AllocatedRequest tracks resources allocated in response to one reservation or
- /// expansion request.
- class AllocatedRequest {
- public:
- AllocatedRequest(const llama::TUniqueId& reservation_id,
- uint64_t memory_mb, uint32_t vcpus, bool is_expansion)
- : reservation_id_(reservation_id), memory_mb_(memory_mb), vcpus_(vcpus),
- is_expansion_(is_expansion) { }
-
- const llama::TUniqueId reservation_id() const { return reservation_id_; }
- uint64_t memory_mb() const { return memory_mb_; }
- uint32_t vcpus() const { return vcpus_; }
- bool is_expansion() const { return is_expansion_; }
-
- private:
- /// The reservation ID for this request. Expansions all share the same reservation ID.
- llama::TUniqueId reservation_id_;
-
- /// The total memory allocated to this request
- uint64_t memory_mb_;
-
- /// The number of VCPUs allocated to this request
- uint32_t vcpus_;
-
- /// True if this is an expansion request, false if it is a reservation request
- bool is_expansion_;
- };
-
- /// Protectes allocated_requests_
- boost::mutex allocated_requests_lock_;
-
- /// Map from reservation ID to all satisfied requests - reservation and expansion -
- /// associated with that reservation. Used only for bookkeeping so that Impala can report
- /// on the current resource usage.
- typedef boost::unordered_map<llama::TUniqueId, std::vector<AllocatedRequest>>
- AllocatedRequestMap;
- AllocatedRequestMap allocated_requests_;
-
- /// Protects query_resource_mgrs_
- boost::mutex query_resource_mgrs_lock_;
- typedef boost::unordered_map<TUniqueId, std::pair<int32_t, QueryResourceMgr*>>
- QueryResourceMgrsMap;
-
- /// Map from query ID to a (ref_count, QueryResourceMgr*) pair, i.e. one QueryResourceMgr
- /// per query. The refererence count is always non-zero - once it hits zero the entry in
- /// the map is removed and the QueryResourceMgr is deleted.
- QueryResourceMgrsMap query_resource_mgrs_;
-};
-
-std::ostream& operator<<(std::ostream& os,
- const TResourceBrokerReservationRequest& request);
-
-std::ostream& operator<<(std::ostream& os,
- const TResourceBrokerReservationResponse& reservation);
-
-std::ostream& operator<<(std::ostream& os,
- const TResourceBrokerExpansionRequest& request);
-
-std::ostream& operator<<(std::ostream& os,
- const TResourceBrokerExpansionResponse& expansion);
-}
-
-#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc
index 5b5ee8a..169baf2 100644
--- a/be/src/runtime/buffered-block-mgr-test.cc
+++ b/be/src/runtime/buffered-block-mgr-test.cc
@@ -553,8 +553,8 @@ class BufferedBlockMgrTest : public ::testing::Test {
const int num_threads = 8;
thread_group workers;
// Create a shared RuntimeState with no BufferedBlockMgr.
- RuntimeState* shared_state = new RuntimeState(TExecPlanFragmentParams(), "",
- test_env_->exec_env());
+ RuntimeState* shared_state =
+ new RuntimeState(TExecPlanFragmentParams(), test_env_->exec_env());
for (int i = 0; i < num_threads; ++i) {
thread* t = new thread(bind(
&BufferedBlockMgrTest::CreateDestroyThread, this, shared_state));
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/buffered-block-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc
index 90c1041..d582c31 100644
--- a/be/src/runtime/buffered-block-mgr.cc
+++ b/be/src/runtime/buffered-block-mgr.cc
@@ -1317,8 +1317,8 @@ void BufferedBlockMgr::Init(DiskIoMgr* io_mgr, RuntimeProfile* parent_profile,
integrity_check_timer_ = ADD_TIMER(profile_.get(), "TotalIntegrityCheckTime");
// Create a new mem_tracker and allocate buffers.
- mem_tracker_.reset(new MemTracker(
- profile(), mem_limit, -1, "Block Manager", parent_tracker));
+ mem_tracker_.reset(
+ new MemTracker(profile(), mem_limit, "Block Manager", parent_tracker));
initialized_ = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/collection-value-builder-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/collection-value-builder-test.cc b/be/src/runtime/collection-value-builder-test.cc
index c7843b7..b8f4b65 100644
--- a/be/src/runtime/collection-value-builder-test.cc
+++ b/be/src/runtime/collection-value-builder-test.cc
@@ -40,7 +40,7 @@ TEST(CollectionValueBuilderTest, MaxBufferSize) {
CollectionValue coll_value;
int64_t initial_capacity = (INT_MAX / 8) + 1;
int64_t mem_limit = initial_capacity * 4 * 4;
- MemTracker tracker(mem_limit, mem_limit);
+ MemTracker tracker(mem_limit);
MemPool pool(&tracker);
CollectionValueBuilder coll_value_builder(
&coll_value, tuple_desc, &pool, NULL, initial_capacity);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 4728866..e2dd1a4 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -62,7 +62,6 @@
#include "util/error-util.h"
#include "util/hdfs-bulk-ops.h"
#include "util/hdfs-util.h"
-#include "util/llama-util.h"
#include "util/network-util.h"
#include "util/pretty-printer.h"
#include "util/summary-util.h"
@@ -486,7 +485,7 @@ Status Coordinator::Exec(QuerySchedule& schedule,
runtime_state()->obj_pool(), request.fragments[0].output_exprs,
output_expr_ctxs));
MemTracker* output_expr_tracker = runtime_state()->obj_pool()->Add(new MemTracker(
- -1, -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
+ -1, "Output exprs", runtime_state()->instance_mem_tracker(), false));
RETURN_IF_ERROR(Expr::Prepare(
*output_expr_ctxs, runtime_state(), row_desc(), output_expr_tracker));
} else {
@@ -503,12 +502,12 @@ Status Coordinator::Exec(QuerySchedule& schedule,
MemTracker* pool_tracker = MemTracker::GetRequestPoolMemTracker(
schedule.request_pool(), exec_env_->process_mem_tracker());
query_mem_tracker_ =
- MemTracker::GetQueryMemTracker(query_id_, query_limit, -1, pool_tracker, NULL);
+ MemTracker::GetQueryMemTracker(query_id_, query_limit, pool_tracker);
executor_.reset(NULL);
}
- filter_mem_tracker_.reset(new MemTracker(-1, -1, "Runtime Filter (Coordinator)",
- query_mem_tracker(), false));
+ filter_mem_tracker_.reset(
+ new MemTracker(-1, "Runtime Filter (Coordinator)", query_mem_tracker(), false));
// Initialize the execution profile structures.
InitExecProfile(request);
@@ -1900,20 +1899,6 @@ void Coordinator::SetExecPlanFragmentParams(QuerySchedule& schedule,
SetExecPlanDescriptorTable(fragment, rpc_params);
TNetworkAddress exec_host = params.hosts[fragment_instance_idx];
- if (schedule.HasReservation()) {
- // The reservation has already have been validated at this point.
- TNetworkAddress resource_hostport;
- schedule.GetResourceHostport(exec_host, &resource_hostport);
- map<TNetworkAddress, llama::TAllocatedResource>::const_iterator it =
- schedule.reservation()->allocated_resources.find(resource_hostport);
- // Only set reserved resource if we actually have one for this plan
- // fragment. Otherwise, don't set it (usually this the coordinator fragment), and it
- // won't participate in dynamic RM controls.
- if (it != schedule.reservation()->allocated_resources.end()) {
- fragment_instance_ctx.__set_reserved_resource(it->second);
- fragment_instance_ctx.__set_local_resource_address(resource_hostport);
- }
- }
FragmentScanRangeAssignment::const_iterator it =
params.scan_range_assignment.find(exec_host);
// Scan ranges may not always be set, so use an empty structure if so.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc b/be/src/runtime/data-stream-recvr.cc
index 9bd51af..50103ba 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -295,7 +295,7 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_t
is_merging_(is_merging),
num_buffered_bytes_(0),
profile_(profile) {
- mem_tracker_.reset(new MemTracker(-1, -1, "DataStreamRecvr", parent_tracker));
+ mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker));
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
sender_queues_.reserve(num_queues);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19de09ab/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc b/be/src/runtime/data-stream-test.cc
index dae4724..f5eb783 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -112,9 +112,7 @@ class ImpalaTestBackend : public ImpalaInternalServiceIf {
class DataStreamTest : public testing::Test {
protected:
- DataStreamTest()
- : runtime_state_(TExecPlanFragmentParams(), "", &exec_env_),
- next_val_(0) {
+ DataStreamTest() : runtime_state_(TExecPlanFragmentParams(), &exec_env_), next_val_(0) {
// Initialize Mem trackers for use by the data stream receiver.
exec_env_.InitForFeTests();
runtime_state_.InitMemTrackers(TUniqueId(), NULL, -1);
@@ -482,7 +480,7 @@ class DataStreamTest : public testing::Test {
void Sender(int sender_num, int channel_buffer_size,
TPartitionType::type partition_type) {
- RuntimeState state(TExecPlanFragmentParams(), "", &exec_env_);
+ RuntimeState state(TExecPlanFragmentParams(), &exec_env_);
state.set_desc_tbl(desc_tbl_);
state.InitMemTrackers(TUniqueId(), NULL, -1);
VLOG_QUERY << "create sender " << sender_num;
@@ -596,7 +594,7 @@ TEST_F(DataStreamTest, BasicTest) {
// TODO: Make lifecycle requirements more explicit.
TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
scoped_ptr<RuntimeState> runtime_state(
- new RuntimeState(TExecPlanFragmentParams(), "", &exec_env_));
+ new RuntimeState(TExecPlanFragmentParams(), &exec_env_));
runtime_state->InitMemTrackers(TUniqueId(), NULL, -1);
scoped_ptr<RuntimeProfile> profile(new RuntimeProfile(&obj_pool_, "TestReceiver"));