You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/09 22:30:14 UTC
impala git commit: IMPALA-6034: Add scanned bytes limits per query
Repository: impala
Updated Branches:
refs/heads/master 7f9a74ffc -> 3e17705ec
IMPALA-6034: Add scanned bytes limits per query
This adds support for aggregate resource limits at runtime, specified
via query options. If a query exceeds a limit it is terminated. The
checks are periodic so the query may go somewhat over the limits.
SCAN_BYTES_LIMIT is exposed as an advanced query option.
CPU_LIMIT_S is hidden as a development query option because it is flawed
- the CPU user/sys time is only updated upon thread completion, so in
many cases the limit will not take effect until well after the resources
have been used. IMPALA-7318 tracks enabling this.
Query profile is updated to include query wide and per backend metrics
for CPU and scanned bytes. Example from "select count(*) from
tpch_parquet.lineitem":
Per Node Peak Memory Usage: tarmstrong-box:22000(289.50 KB) tarmstrong-box:22001(249.50 KB) tarmstrong-box:22002(249.50 KB)
Per Node Bytes Read: tarmstrong-box:22000(100.00 KB) tarmstrong-box:22001(100.00 KB) tarmstrong-box:22002(100.00 KB)
Per Node User Time: tarmstrong-box:22000(40.000ms) tarmstrong-box:22001(32.000ms) tarmstrong-box:22002(24.000ms)
Per Node System Time: tarmstrong-box:22000(0.000ns) tarmstrong-box:22001(0.000ns) tarmstrong-box:22002(0.000ns)
- FiltersReceived: 0 (0)
- FinalizationTimer: 0.000ns
- NumBackends: 3 (3)
- NumFragmentInstances: 4 (4)
- NumFragments: 2 (2)
- TotalBytesRead: 300.00 KB (307200)
- TotalCpuTime: 96.000ms
Testing:
Added tests for various permutations for CPU_LIMIT_S and
SCAN_BYTES_LIMIT
Based on a previous patch by Mostafa Mokhtar
<mm...@cloudera.com>
Change-Id: I3e85f80b70b3fce47e637e9322ed0316ee84f6a9
Reviewed-on: http://gerrit.cloudera.org:8080/11081
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3e17705e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3e17705e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3e17705e
Branch: refs/heads/master
Commit: 3e17705ecaba0b6ab9ae929e6c7c409e0b6aea1d
Parents: 7f9a74f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Mon Aug 6 12:35:38 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 9 22:18:01 2018 +0000
----------------------------------------------------------------------
be/src/runtime/coordinator-backend-state.cc | 56 ++++++++++---
be/src/runtime/coordinator-backend-state.h | 23 ++---
be/src/runtime/coordinator.cc | 42 ++++++++--
be/src/runtime/coordinator.h | 32 +++++++
be/src/service/impala-server.cc | 88 +++++++++++++++++---
be/src/service/impala-server.h | 11 ++-
be/src/service/query-options-test.cc | 39 +++++++--
be/src/service/query-options.cc | 21 ++++-
be/src/service/query-options.h | 5 +-
common/thrift/ImpalaInternalService.thrift | 6 ++
common/thrift/ImpalaService.thrift | 8 ++
.../QueryTest/query-resource-limits.test | 75 +++++++++++++++++
tests/query_test/test_cancellation.py | 6 ++
tests/query_test/test_resource_limits.py | 4 +
www/query_backends.tmpl | 10 ++-
15 files changed, 371 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator-backend-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.cc b/be/src/runtime/coordinator-backend-state.cc
index a99acdb..2d97b54 100644
--- a/be/src/runtime/coordinator-backend-state.cc
+++ b/be/src/runtime/coordinator-backend-state.cc
@@ -210,9 +210,35 @@ Status Coordinator::BackendState::GetStatus(bool* is_fragment_failure,
return status_;
}
-int64_t Coordinator::BackendState::GetPeakConsumption() {
+Coordinator::ResourceUtilization Coordinator::BackendState::ComputeResourceUtilization() {
lock_guard<mutex> l(lock_);
- return peak_consumption_;
+ return ComputeResourceUtilizationLocked();
+}
+
+Coordinator::ResourceUtilization
+Coordinator::BackendState::ComputeResourceUtilizationLocked() {
+ ResourceUtilization result;
+ for (const auto& entry : instance_stats_map_) {
+ RuntimeProfile* profile = entry.second->profile_;
+ ResourceUtilization instance_utilization;
+ // Update resource utilization and apply delta.
+ RuntimeProfile::Counter* user_time = profile->GetCounter("TotalThreadsUserTime");
+ if (user_time != nullptr) instance_utilization.cpu_user_ns = user_time->value();
+
+ RuntimeProfile::Counter* system_time = profile->GetCounter("TotalThreadsSysTime");
+ if (system_time != nullptr) instance_utilization.cpu_sys_ns = system_time->value();
+
+ for (RuntimeProfile::Counter* c : entry.second->bytes_read_counters_) {
+ instance_utilization.bytes_read += c->value();
+ }
+
+ RuntimeProfile::Counter* peak_mem =
+ profile->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
+ if (peak_mem != nullptr)
+ instance_utilization.peak_per_host_mem_consumption = peak_mem->value();
+ result.Merge(instance_utilization);
+ }
+ return result;
}
void Coordinator::BackendState::MergeErrorLog(ErrorLogMap* merged) {
@@ -258,11 +284,6 @@ bool Coordinator::BackendState::ApplyExecStatusReport(
if (instance_stats->done_) continue;
instance_stats->Update(instance_exec_status, exec_summary, scan_range_progress);
- if (instance_stats->peak_mem_counter_ != nullptr) {
- // protect against out-of-order status updates
- peak_consumption_ =
- max(peak_consumption_, instance_stats->peak_mem_counter_->value());
- }
// If a query is aborted due to an error encountered by a single fragment instance,
// all other fragment instances will report a cancelled status; make sure not to mask
@@ -445,15 +466,17 @@ void Coordinator::BackendState::InstanceStats::InitCounters() {
RuntimeProfile::Counter* c =
p->GetCounter(ScanNode::SCAN_RANGES_COMPLETE_COUNTER);
if (c != nullptr) scan_ranges_complete_counters_.push_back(c);
+
+ RuntimeProfile::Counter* bytes_read =
+ p->GetCounter(ScanNode::BYTES_READ_COUNTER);
+ if (bytes_read != nullptr) bytes_read_counters_.push_back(bytes_read);
}
- peak_mem_counter_ =
- profile_->GetCounter(FragmentInstanceState::PER_HOST_PEAK_MEM_COUNTER);
}
void Coordinator::BackendState::InstanceStats::Update(
- const TFragmentInstanceExecStatus& exec_status,
- ExecSummary* exec_summary, ProgressUpdater* scan_range_progress) {
+ const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+ ProgressUpdater* scan_range_progress) {
last_report_time_ms_ = MonotonicMillis();
if (exec_status.done) stopwatch_.Stop();
profile_->Update(exec_status.profile);
@@ -579,10 +602,17 @@ void Coordinator::FragmentStats::AddExecStats() {
void Coordinator::BackendState::ToJson(Value* value, Document* document) {
lock_guard<mutex> l(lock_);
+ ResourceUtilization resource_utilization = ComputeResourceUtilizationLocked();
value->AddMember("num_instances", fragments_.size(), document->GetAllocator());
value->AddMember("done", IsDone(), document->GetAllocator());
- value->AddMember(
- "peak_mem_consumption", peak_consumption_, document->GetAllocator());
+ value->AddMember("peak_per_host_mem_consumption",
+ resource_utilization.peak_per_host_mem_consumption, document->GetAllocator());
+ value->AddMember("bytes_read", resource_utilization.bytes_read,
+ document->GetAllocator());
+ value->AddMember("cpu_user_s", resource_utilization.cpu_user_ns / 1e9,
+ document->GetAllocator());
+ value->AddMember("cpu_sys_s", resource_utilization.cpu_sys_ns / 1e9,
+ document->GetAllocator());
string host = TNetworkAddressToString(impalad_address());
Value val(host.c_str(), document->GetAllocator());
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator-backend-state.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator-backend-state.h b/be/src/runtime/coordinator-backend-state.h
index c51c16c..1154cd8 100644
--- a/be/src/runtime/coordinator-backend-state.h
+++ b/be/src/runtime/coordinator-backend-state.h
@@ -109,8 +109,9 @@ class Coordinator::BackendState {
Status GetStatus(bool* is_fragment_failure = nullptr,
TUniqueId* failed_instance_id = nullptr) WARN_UNUSED_RESULT;
- /// Return peak memory consumption.
- int64_t GetPeakConsumption();
+ /// Return peak memory consumption and aggregated resource usage across all fragment
+ /// instances for this backend.
+ ResourceUtilization ComputeResourceUtilization();
/// Merge the accumulated error log into 'merged'.
void MergeErrorLog(ErrorLogMap* merged);
@@ -147,8 +148,9 @@ class Coordinator::BackendState {
/// Updates 'this' with exec_status, the fragment instances' TExecStats in
/// exec_summary, and 'progress_updater' with the number of newly completed scan
/// ranges. Also updates the instance's avg profile.
- void Update(const TFragmentInstanceExecStatus& exec_status,
- ExecSummary* exec_summary, ProgressUpdater* scan_range_progress);
+ /// Caller must hold BackendState::lock_.
+ void Update(const TFragmentInstanceExecStatus& exec_status, ExecSummary* exec_summary,
+ ProgressUpdater* scan_range_progress);
int per_fragment_instance_idx() const {
return exec_params_.per_fragment_instance_idx;
@@ -191,14 +193,14 @@ class Coordinator::BackendState {
/// SCAN_RANGES_COMPLETE_COUNTERs in profile_
std::vector<RuntimeProfile::Counter*> scan_ranges_complete_counters_;
- /// PER_HOST_PEAK_MEM_COUNTER
- RuntimeProfile::Counter* peak_mem_counter_ = nullptr;
+ /// Collection of BYTES_READ_COUNTERs of all scan nodes in this fragment instance.
+ std::vector<RuntimeProfile::Counter*> bytes_read_counters_;
/// The current state of this fragment instance's execution. This gets serialized in
/// ToJson() and is displayed in the debug webpages.
TFInstanceExecState::type current_state_ = TFInstanceExecState::WAITING_FOR_EXEC;
- /// Extracts scan_ranges_complete_counters_ and peak_mem_counter_ from profile_.
+ /// Extracts scan_ranges_complete_counters_ and bytes_read_counters_ from profile_.
void InitCounters();
};
@@ -250,10 +252,6 @@ class Coordinator::BackendState {
/// successful.
bool rpc_sent_ = false;
- /// peak memory used for this query (value of that node's query memtracker's
- /// peak_consumption()
- int64_t peak_consumption_ = 0;
-
/// Set in ApplyExecStatusReport(). Uses MonotonicMillis().
int64_t last_report_time_ms_ = 0;
@@ -268,6 +266,9 @@ class Coordinator::BackendState {
/// Return true if execution at this backend is done. Caller must hold lock_.
bool IsDone() const;
+
+ /// Same as ComputeResourceUtilization() but caller must hold lock.
+ ResourceUtilization ComputeResourceUtilizationLocked();
};
/// Per fragment execution statistics.
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc
index 8a3213e..99f0be5 100644
--- a/be/src/runtime/coordinator.cc
+++ b/be/src/runtime/coordinator.cc
@@ -730,13 +730,35 @@ void Coordinator::ComputeQuerySummary() {
fragment_stats->AddExecStats();
}
- stringstream info;
+ stringstream mem_info, cpu_user_info, cpu_system_info, bytes_read_info;
+ ResourceUtilization total_utilization;
for (BackendState* backend_state: backend_states_) {
- info << TNetworkAddressToString(backend_state->impalad_address()) << "("
- << PrettyPrinter::Print(backend_state->GetPeakConsumption(), TUnit::BYTES)
- << ") ";
- }
- query_profile_->AddInfoString("Per Node Peak Memory Usage", info.str());
+ ResourceUtilization utilization = backend_state->ComputeResourceUtilization();
+ total_utilization.Merge(utilization);
+ string network_address = TNetworkAddressToString(
+ backend_state->impalad_address());
+ mem_info << network_address << "("
+ << PrettyPrinter::Print(utilization.peak_per_host_mem_consumption,
+ TUnit::BYTES) << ") ";
+ bytes_read_info << network_address << "("
+ << PrettyPrinter::Print(utilization.bytes_read, TUnit::BYTES) << ") ";
+ cpu_user_info << network_address << "("
+ << PrettyPrinter::Print(utilization.cpu_user_ns, TUnit::TIME_NS)
+ << ") ";
+ cpu_system_info << network_address << "("
+ << PrettyPrinter::Print(utilization.cpu_sys_ns, TUnit::TIME_NS)
+ << ") ";
+ }
+
+ COUNTER_SET(ADD_COUNTER(query_profile_, "TotalBytesRead", TUnit::BYTES),
+ total_utilization.bytes_read);
+ COUNTER_SET(ADD_COUNTER(query_profile_, "TotalCpuTime", TUnit::TIME_NS),
+ total_utilization.cpu_user_ns + total_utilization.cpu_sys_ns);
+
+ query_profile_->AddInfoString("Per Node Peak Memory Usage", mem_info.str());
+ query_profile_->AddInfoString("Per Node Bytes Read", bytes_read_info.str());
+ query_profile_->AddInfoString("Per Node User Time", cpu_user_info.str());
+ query_profile_->AddInfoString("Per Node System Time", cpu_system_info.str());
}
string Coordinator::GetErrorLog() {
@@ -775,6 +797,14 @@ void Coordinator::ReleaseAdmissionControlResources() {
query_events_->MarkEvent("Released admission control resources");
}
+Coordinator::ResourceUtilization Coordinator::ComputeQueryResourceUtilization() {
+ ResourceUtilization query_resource_utilization;
+ for (BackendState* backend_state: backend_states_) {
+ query_resource_utilization.Merge(backend_state->ComputeResourceUtilization());
+ }
+ return query_resource_utilization;
+}
+
void Coordinator::UpdateFilter(const TUpdateFilterParams& params) {
shared_lock<shared_mutex> lock(filter_lock_);
DCHECK_NE(filter_mode_, TRuntimeFilterMode::OFF)
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/runtime/coordinator.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/coordinator.h b/be/src/runtime/coordinator.h
index 0b0312c..4fb7e25 100644
--- a/be/src/runtime/coordinator.h
+++ b/be/src/runtime/coordinator.h
@@ -167,6 +167,38 @@ class Coordinator { // NOLINT: The member variables could be re-ordered to save
/// instances running on each backend in a member named 'backend_instances'.
void FInstanceStatsToJson(rapidjson::Document* document);
+ /// Struct to aggregate resource usage information at the finstance, backend and
+ /// query level.
+ struct ResourceUtilization {
+ /// Peak memory used for this query (value of the query memtracker's
+ /// peak_consumption()). At the finstance or backend level, this is the
+ /// peak value for that backend or if at the query level, this is the max
+ /// peak value from any backend.
+ int64_t peak_per_host_mem_consumption = 0;
+
+ /// Total bytes read across all scan nodes.
+ int64_t bytes_read = 0;
+
+ /// Total user cpu consumed.
+ int64_t cpu_user_ns = 0;
+
+ /// Total system cpu consumed.
+ int64_t cpu_sys_ns = 0;
+
+ /// Merge utilization from 'other' into this.
+ void Merge(const ResourceUtilization& other) {
+ peak_per_host_mem_consumption =
+ std::max(peak_per_host_mem_consumption, other.peak_per_host_mem_consumption);
+ bytes_read += other.bytes_read;
+ cpu_user_ns += other.cpu_user_ns;
+ cpu_sys_ns += other.cpu_sys_ns;
+ }
+ };
+
+ /// Aggregate resource utilization for the query (i.e. across all backends based on the
+ /// latest status reports received from those backends).
+ ResourceUtilization ComputeQueryResourceUtilization();
+
private:
class BackendState;
struct FilterTarget;
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 34293f6..cc8465f 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -240,6 +240,9 @@ const char* ImpalaServer::SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION = "42000";
const char* ImpalaServer::SQLSTATE_GENERAL_ERROR = "HY000";
const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
+// Interval between checks for query expiration.
+const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
+
// Work item for ImpalaServer::cancellation_thread_pool_.
class CancellationWork {
public:
@@ -1061,7 +1064,10 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
}
int32_t exec_time_limit_s = request_state->query_options().exec_time_limit_s;
- if (idle_timeout_s > 0 || exec_time_limit_s > 0) {
+ int64_t cpu_limit_s = request_state->query_options().cpu_limit_s;
+ int64_t scan_bytes_limit = request_state->query_options().scan_bytes_limit;
+ if (idle_timeout_s > 0 || exec_time_limit_s > 0 ||
+ cpu_limit_s > 0 || scan_bytes_limit > 0) {
lock_guard<mutex> l2(query_expiration_lock_);
int64_t now = UnixMillis();
if (idle_timeout_s > 0) {
@@ -1076,6 +1082,18 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
queries_by_timestamp_.emplace(ExpirationEvent{
now + (1000L * exec_time_limit_s), query_id, ExpirationKind::EXEC_TIME_LIMIT});
}
+ if (cpu_limit_s > 0 || scan_bytes_limit > 0) {
+ if (cpu_limit_s > 0) {
+ VLOG_QUERY << "Query " << PrintId(query_id) << " has CPU limit of "
+ << PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S);
+ }
+ if (scan_bytes_limit > 0) {
+ VLOG_QUERY << "Query " << PrintId(query_id) << " has scan bytes limit of "
+ << PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES);
+ }
+ queries_by_timestamp_.emplace(ExpirationEvent{
+ now + EXPIRATION_CHECK_INTERVAL_MS, query_id, ExpirationKind::RESOURCE_LIMIT});
+ }
}
return Status::OK();
}
@@ -1952,7 +1970,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
[[noreturn]] void ImpalaServer::ExpireQueries() {
while (true) {
- // The following block accomplishes three things:
+ // The following block accomplishes four things:
//
// 1. Update the ordered list of queries by checking the 'idle_time' parameter in
// client_request_state. We are able to avoid doing this for *every* query in flight
@@ -1967,6 +1985,8 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
// 3. Compute the next time a query *might* expire, so that the sleep at the end of
// this loop has an accurate duration to wait. If the list of queries is empty, the
// default sleep duration is half the idle query timeout.
+ //
+ // 4. Cancel queries with CPU and scan bytes constraints if limit is exceeded
int64_t now;
{
lock_guard<mutex> l(query_expiration_lock_);
@@ -1976,24 +1996,38 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
// 'queries_by_timestamp_' is stored in ascending order of deadline so we can
// break out of the loop and sleep as soon as we see a deadline in the future.
if (expiration_event->deadline > now) break;
- shared_ptr<ClientRequestState> query_state =
+ shared_ptr<ClientRequestState> crs =
GetClientRequestState(expiration_event->query_id);
- if (query_state == nullptr || query_state->is_expired()) {
+ if (crs == nullptr || crs->is_expired()) {
// Query was deleted or expired already from a previous expiration event.
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
+ // Check for CPU and scanned bytes limits
+ if (expiration_event->kind == ExpirationKind::RESOURCE_LIMIT) {
+ Status resource_status = CheckResourceLimits(crs.get());
+ if (resource_status.ok()) {
+ queries_by_timestamp_.emplace(
+ ExpirationEvent{now + EXPIRATION_CHECK_INTERVAL_MS,
+ expiration_event->query_id, ExpirationKind::RESOURCE_LIMIT});
+ } else {
+ ExpireQuery(crs.get(), resource_status);
+ }
+ expiration_event = queries_by_timestamp_.erase(expiration_event);
+ continue;
+ }
+
// If the query time limit expired, we must cancel the query.
if (expiration_event->kind == ExpirationKind::EXEC_TIME_LIMIT) {
- int32_t exec_time_limit_s = query_state->query_options().exec_time_limit_s;
+ int32_t exec_time_limit_s = crs->query_options().exec_time_limit_s;
VLOG_QUERY << "Expiring query " << PrintId(expiration_event->query_id)
<< " due to execution time limit of " << exec_time_limit_s << "s.";
const string& err_msg = Substitute(
"Query $0 expired due to execution time limit of $1",
PrintId(expiration_event->query_id),
PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S));
- ExpireQuery(query_state.get(), Status::Expected(err_msg));
+ ExpireQuery(crs.get(), Status::Expected(err_msg));
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
@@ -2003,14 +2037,14 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
// Now check to see if the idle timeout has expired. We must check the actual
// expiration time in case the query has updated 'last_active_ms' since the last
// time we looked.
- int32_t idle_timeout_s = query_state->query_options().query_timeout_s;
+ int32_t idle_timeout_s = crs->query_options().query_timeout_s;
if (FLAGS_idle_query_timeout > 0 && idle_timeout_s > 0) {
idle_timeout_s = min(FLAGS_idle_query_timeout, idle_timeout_s);
} else {
// Use a non-zero timeout, if one exists
idle_timeout_s = max(FLAGS_idle_query_timeout, idle_timeout_s);
}
- int64_t expiration = query_state->last_active_ms() + (idle_timeout_s * 1000L);
+ int64_t expiration = crs->last_active_ms() + (idle_timeout_s * 1000L);
if (now < expiration) {
// If the real expiration date is in the future we may need to re-insert the
// query's expiration event at its correct location.
@@ -2026,17 +2060,17 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
queries_by_timestamp_.emplace(ExpirationEvent{
expiration, query_id, ExpirationKind::IDLE_TIMEOUT});
}
- } else if (!query_state->is_active()) {
+ } else if (!crs->is_active()) {
// Otherwise time to expire this query
VLOG_QUERY
<< "Expiring query due to client inactivity: "
<< PrintId(expiration_event->query_id) << ", last activity was at: "
- << ToStringFromUnixMillis(query_state->last_active_ms());
+ << ToStringFromUnixMillis(crs->last_active_ms());
const string& err_msg = Substitute(
"Query $0 expired due to client inactivity (timeout is $1)",
PrintId(expiration_event->query_id),
PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
- ExpireQuery(query_state.get(), Status::Expected(err_msg));
+ ExpireQuery(crs.get(), Status::Expected(err_msg));
expiration_event = queries_by_timestamp_.erase(expiration_event);
} else {
// Iterator is moved on in every other branch.
@@ -2048,8 +2082,38 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
// could expire is in 1s time. An existing query may expire sooner, but we are
// comfortable with a maximum error of 1s as a trade-off for not frequently waking
// this thread.
- SleepForMs(1000L);
+ SleepForMs(EXPIRATION_CHECK_INTERVAL_MS);
+ }
+}
+
+Status ImpalaServer::CheckResourceLimits(ClientRequestState* crs) {
+ Coordinator* coord = crs->GetCoordinator();
+ // Coordinator may be null if query has not started executing, check again later.
+ if (coord == nullptr) return Status::OK();
+ Coordinator::ResourceUtilization utilization = coord->ComputeQueryResourceUtilization();
+
+ // CPU time consumed by the query so far
+ int64_t cpu_time_ns = utilization.cpu_sys_ns + utilization.cpu_user_ns;
+ int64_t cpu_limit_s = crs->query_options().cpu_limit_s;
+ int64_t cpu_limit_ns = cpu_limit_s * 1000'000'000L;
+ if (cpu_limit_ns > 0 && cpu_time_ns > cpu_limit_ns) {
+ const string& err_msg = Substitute("Query $0 terminated due to CPU limit of $1",
+ PrintId(crs->query_id()), PrettyPrinter::Print(cpu_limit_s, TUnit::TIME_S));
+ VLOG_QUERY << err_msg;
+ return Status::Expected(err_msg);
}
+
+ int64_t scan_bytes = utilization.bytes_read;
+ int64_t scan_bytes_limit = crs->query_options().scan_bytes_limit;
+ if (scan_bytes_limit > 0 && scan_bytes > scan_bytes_limit) {
+ const string& err_msg = Substitute(
+ "Query $0 terminated due to scan bytes limit of $1", PrintId(crs->query_id()),
+ PrettyPrinter::Print(scan_bytes_limit, TUnit::BYTES));
+ VLOG_QUERY << err_msg;
+ return Status::Expected(err_msg);
+ }
+ // Query is within the resource limits, check again later.
+ return Status::OK();
}
void ImpalaServer::ExpireQuery(ClientRequestState* crs, const Status& status) {
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 8b7af09..14c33d1 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -831,6 +831,12 @@ class ImpalaServer : public ImpalaServiceIf,
/// FLAGS_idle_query_timeout seconds.
[[noreturn]] void ExpireQueries();
+ /// Called from ExpireQueries() to check query resource limits for 'crs'. If the query
+ /// exceeded a resource limit, returns a non-OK status with information about what
+ /// limit was exceeded. Returns OK if the query will continue running and expiration
+ /// check should be rescheduled for a later time.
+ Status CheckResourceLimits(ClientRequestState* crs);
+
/// Expire 'crs' and cancel it with status 'status'.
void ExpireQuery(ClientRequestState* crs, const Status& status);
@@ -1065,7 +1071,10 @@ class ImpalaServer : public ImpalaServiceIf,
IDLE_TIMEOUT,
// A hard time limit on query execution. The query is cancelled if this event occurs
// before the query finishes.
- EXEC_TIME_LIMIT
+ EXEC_TIME_LIMIT,
+ // A hard limit on cpu and scanned bytes. The query is cancelled if this event occurs
+ // before the query finishes.
+ RESOURCE_LIMIT,
};
// Describes a query expiration event where the query identified by 'query_id' is
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options-test.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index b9bda60..114d830 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -135,16 +135,17 @@ void TestByteCaseSet(TQueryOptions& options,
TEST(QueryOptions, SetByteOptions) {
TQueryOptions options;
// key and its valid range: [(key, (min, max))]
- vector<pair<OptionDef<int64_t>, Range<int64_t>>> case_set_i64 {
- {MAKE_OPTIONDEF(mem_limit), {-1, I64_MAX}},
+ vector<pair<OptionDef<int64_t>, Range<int64_t>>> case_set_i64{
+ {MAKE_OPTIONDEF(mem_limit), {-1, I64_MAX}},
{MAKE_OPTIONDEF(max_scan_range_length), {-1, I64_MAX}},
- {MAKE_OPTIONDEF(buffer_pool_limit), {-1, I64_MAX}},
- {MAKE_OPTIONDEF(max_row_size), {1, ROW_SIZE_LIMIT}},
- {MAKE_OPTIONDEF(parquet_file_size), {-1, I32_MAX}},
+ {MAKE_OPTIONDEF(buffer_pool_limit), {-1, I64_MAX}},
+ {MAKE_OPTIONDEF(max_row_size), {1, ROW_SIZE_LIMIT}},
+ {MAKE_OPTIONDEF(parquet_file_size), {-1, I32_MAX}},
{MAKE_OPTIONDEF(compute_stats_min_sample_size), {-1, I64_MAX}},
{MAKE_OPTIONDEF(max_mem_estimate_for_admission), {-1, I64_MAX}},
+ {MAKE_OPTIONDEF(scan_bytes_limit), {-1, I64_MAX}},
};
- vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32 {
+ vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
{MAKE_OPTIONDEF(runtime_filter_min_size),
{RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
@@ -154,8 +155,7 @@ TEST(QueryOptions, SetByteOptions) {
{8 * 1024, RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}},
{MAKE_OPTIONDEF(runtime_bloom_filter_size),
{RuntimeFilterBank::MIN_BLOOM_FILTER_SIZE,
- RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}
- };
+ RuntimeFilterBank::MAX_BLOOM_FILTER_SIZE}}};
TestByteCaseSet(options, case_set_i64);
TestByteCaseSet(options, case_set_i32);
}
@@ -245,6 +245,29 @@ TEST(QueryOptions, SetIntOptions) {
}
}
+// Test integer options. Some of them have lower/upper bounds.
+TEST(QueryOptions, SetBigIntOptions) {
+ TQueryOptions options;
+ // List of pairs of Key and its valid range
+ pair<OptionDef<int64_t>, Range<int64_t>> case_set[] {
+ {MAKE_OPTIONDEF(cpu_limit_s), {0, I64_MAX}},
+ };
+ for (const auto& test_case : case_set) {
+ const OptionDef<int64_t>& option_def = test_case.first;
+ const Range<int64_t>& range = test_case.second;
+ auto TestOk = MakeTestOkFn(options, option_def);
+ auto TestError = MakeTestErrFn(options, option_def);
+ TestError("1M");
+ TestError("0B");
+ TestError("1%");
+ TestOk(to_string(range.lower_bound).c_str(), range.lower_bound);
+ TestOk(to_string(range.upper_bound).c_str(), range.upper_bound);
+ TestError(to_string(int64_t(range.lower_bound) - 1).c_str());
+ // 2^63 is I64_MAX + 1.
+ TestError("9223372036854775808");
+ }
+}
+
// Test options with non regular validation rule
TEST(QueryOptions, SetSpecialOptions) {
// REPLICA_PREFERENCE has unsettable enum values: cache_rack(1) & disk_rack(3)
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options.cc
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index e401553..1424896 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -674,7 +674,8 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_kudu_read_mode(TKuduReadMode::READ_AT_SNAPSHOT);
} else {
return Status(Substitute("Invalid kudu_read_mode '$0'. Valid values are "
- "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.", value));
+ "DEFAULT, READ_LATEST, and READ_AT_SNAPSHOT.",
+ value));
}
break;
}
@@ -695,6 +696,24 @@ Status impala::SetQueryOption(const string& key, const string& value,
query_options->__set_timezone(timezone);
break;
}
+ case TImpalaQueryOptions::SCAN_BYTES_LIMIT: {
+ int64_t bytes_limit;
+ RETURN_IF_ERROR(ParseMemValue(value, "query scan bytes limit", &bytes_limit));
+ query_options->__set_scan_bytes_limit(bytes_limit);
+ break;
+ }
+ case TImpalaQueryOptions::CPU_LIMIT_S: {
+ StringParser::ParseResult result;
+ const int64_t cpu_limit_s =
+ StringParser::StringToInt<int64_t>(value.c_str(), value.length(), &result);
+ if (result != StringParser::PARSE_SUCCESS || cpu_limit_s < 0) {
+ return Status(
+ Substitute("Invalid CPU limit: '$0'. "
+ "Only non-negative numbers are allowed.", value));
+ }
+ query_options->__set_cpu_limit_s(cpu_limit_s);
+ break;
+ }
default:
if (IsRemovedQueryOption(key)) {
LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'";
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/be/src/service/query-options.h
----------------------------------------------------------------------
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 08749b8..46cdf05 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -41,7 +41,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
// the DCHECK.
#define QUERY_OPTS_TABLE\
DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
- TImpalaQueryOptions::TIMEZONE + 1);\
+ TImpalaQueryOptions::CPU_LIMIT_S + 1);\
REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -140,6 +140,9 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
QUERY_OPT_FN(allow_erasure_coded_files, ALLOW_ERASURE_CODED_FILES,\
TQueryOptionLevel::DEVELOPMENT)\
QUERY_OPT_FN(timezone, TIMEZONE, TQueryOptionLevel::REGULAR)\
+ QUERY_OPT_FN(scan_bytes_limit, SCAN_BYTES_LIMIT,\
+ TQueryOptionLevel::ADVANCED)\
+ QUERY_OPT_FN(cpu_limit_s, CPU_LIMIT_S, TQueryOptionLevel::DEVELOPMENT)\
;
/// Enforce practical limits on some query options to avoid undesired query state.
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/common/thrift/ImpalaInternalService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 80da2a9..5367a6e 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -298,6 +298,12 @@ struct TQueryOptions {
// See comment in ImpalaService.thrift.
70: optional string timezone = ""
+
+ // See comment in ImpalaService.thrift.
+ 71: optional i64 scan_bytes_limit = 0;
+
+ // See comment in ImpalaService.thrift.
+ 72: optional i64 cpu_limit_s = 0;
}
// Impala currently has two types of sessions: Beeswax and HiveServer2
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/common/thrift/ImpalaService.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 5d8260e..05a1431 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -331,6 +331,14 @@ enum TImpalaQueryOptions {
// The timezone used in UTC<->localtime conversions. The default is the OS's timezone
// at the coordinator, which can be overridden by environment variable $TZ.
TIMEZONE,
+
+ // Scan bytes limit, after which a query will be terminated with an error.
+ SCAN_BYTES_LIMIT,
+
+ // CPU time limit in seconds, after which a query will be terminated with an error.
+ // Note that until IMPALA-7318 is fixed, CPU usage can be very stale and this may not
+ // terminate queries soon enough.
+ CPU_LIMIT_S,
}
// The summary of a DML statement.
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
new file mode 100644
index 0000000..e3cd401
--- /dev/null
+++ b/testdata/workloads/functional-query/queries/QueryTest/query-resource-limits.test
@@ -0,0 +1,75 @@
+====
+---- QUERY
+# Query should succeed.
+set CPU_LIMIT_S=10000;
+set SCAN_BYTES_LIMIT="10G";
+select id from functional.alltypessmall order by 1 limit 1
+---- TYPES
+INT
+---- RESULTS
+0
+====
+---- QUERY
+# Query should fail due to exceeding scan bytes limit.
+set SCAN_BYTES_LIMIT="100M";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+ l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+ l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+ having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to scan bytes limit of 100.00 M.*
+====
+---- QUERY
+# Query should fail due to CPU time limit.
+set CPU_LIMIT_S=1;
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+ l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+ l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+ having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to CPU limit of 1s000ms.*
+====
+---- QUERY
+# Query should fail due to CPU time limit.
+set CPU_LIMIT_S=1;
+set SCAN_BYTES_LIMIT="100G";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+ l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+ l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+ having count(*) = 99
+---- CATCH
+row_regex:.*terminated due to CPU limit of 1s000ms.*
+====
+---- QUERY
+# Query should fail due to exceeding time limit.
+set EXEC_TIME_LIMIT_S=2;
+set CPU_LIMIT_S=10000;
+set SCAN_BYTES_LIMIT="100G";
+select count(*) from tpch.lineitem l1,tpch.lineitem l2, tpch.lineitem l3 where
+ l1.l_suppkey = l2.l_linenumber and l1.l_orderkey = l2.l_orderkey and
+ l1.l_orderkey = l3.l_orderkey group by l1.l_comment, l2.l_comment
+ having count(*) = 99
+---- CATCH
+row_regex:.*expired due to execution time limit of 2s000ms.*
+====
+---- QUERY
+# Bytes limit not enforced for Kudu yet.
+set SCAN_BYTES_LIMIT="1k";
+select min(l_orderkey) from tpch_kudu.lineitem
+---- TYPES
+BIGINT
+---- RESULTS
+1
+====
+---- QUERY
+# Bytes limit enforced for HBase.
+# Add a sleep to slow down query and avoid race with bytes check.
+set SCAN_BYTES_LIMIT="1k";
+select count(*)
+from (
+ select distinct *
+ from functional_hbase.alltypesagg
+ where sleep(100) = id) v
+---- CATCH
+row_regex:.*terminated due to scan bytes limit of 1.00 K.*
+====
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/tests/query_test/test_cancellation.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_cancellation.py b/tests/query_test/test_cancellation.py
index a63a628..ecf3126 100644
--- a/tests/query_test/test_cancellation.py
+++ b/tests/query_test/test_cancellation.py
@@ -53,6 +53,9 @@ WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
# Verify that failed CancelFInstances() RPCs don't lead to hung queries
FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL']
+# Test cancelling when there is a resource limit.
+CPU_LIMIT_S = [0, 100000]
+
# Verify close rpc running concurrently with fetch rpc. The two cases verify:
# False: close and fetch rpc run concurrently.
# True: cancel rpc is enough to ensure that the fetch rpc is unblocked.
@@ -85,6 +88,8 @@ class TestCancellation(ImpalaTestSuite):
ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('buffer_pool_limit', 0))
+ cls.ImpalaTestMatrix.add_dimension(
+ ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('query_type') != 'CTAS' or (\
@@ -138,6 +143,7 @@ class TestCancellation(ImpalaTestSuite):
vector.get_value('exec_option')['buffer_pool_limit'] =\
vector.get_value('buffer_pool_limit')
+ vector.get_value('exec_option')['cpu_limit_s'] = vector.get_value('cpu_limit_s')
# Execute the query multiple times, cancelling it each time.
for i in xrange(NUM_CANCELATION_ITERATIONS):
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/tests/query_test/test_resource_limits.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_resource_limits.py b/tests/query_test/test_resource_limits.py
index 58e2ce6..be0895b 100644
--- a/tests/query_test/test_resource_limits.py
+++ b/tests/query_test/test_resource_limits.py
@@ -38,3 +38,7 @@ class TestResourceLimits(ImpalaTestSuite):
# Remove option from vector to allow test file to override it per query.
del vector.get_value('exec_option')['num_nodes']
self.run_test_case('QueryTest/thread-limits', vector)
+
+ @SkipIfLocal.multiple_impalad
+ def test_resource_limits(self, vector):
+ self.run_test_case('QueryTest/query-resource-limits', vector)
http://git-wip-us.apache.org/repos/asf/impala/blob/3e17705e/www/query_backends.tmpl
----------------------------------------------------------------------
diff --git a/www/query_backends.tmpl b/www/query_backends.tmpl
index c1833c6..fedae37 100644
--- a/www/query_backends.tmpl
+++ b/www/query_backends.tmpl
@@ -36,7 +36,10 @@ under the License.
<th>Num. instances</th>
<th>Num. remaining instances</th>
<th>Done</th>
- <th>Peak mem. consumption</th>
+ <th>Peak per-host mem. consumption</th>
+ <th>Scan bytes read</th>
+ <th>User CPU(s)</th>
+ <th>Kernel CPU(s)</th>
<th>Time since last report (ms)</th>
</tr>
</thead>
@@ -64,7 +67,10 @@ $(document).ready(function() {
{data: 'num_instances'},
{data: 'num_remaining_instances'},
{data: 'done'},
- {data: 'peak_mem_consumption', render: renderSize},
+ {data: 'peak_per_host_mem_consumption', render: renderSize},
+ {data: 'bytes_read', render: renderSize},
+ {data: 'cpu_user_s'},
+ {data: 'cpu_sys_s'},
{data: 'time_since_last_heard_from'}],
"order": [[ 0, "desc" ]],
"pageLength": 100