You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/26 01:45:14 UTC
impala git commit: IMPALA-6577: avoid slow Status constructor on
expiration thread
Repository: impala
Updated Branches:
refs/heads/master fd66890bf -> 46f74df0a
IMPALA-6577: avoid slow Status constructor on expiration thread
As described in the JIRA, the query expiration thread gets bogged down
generated stack traces, leading to slow expiration. I fixed that
particular location to use Status::Expected().
I also fixed other similar problems in ImpalaServer while I was here.
I added some logging where there wasn't already some so that we're not
losing useful log output.
Testing:
Ran private core build on CentOS 6 (where the bug was reproducing at a
high rate).
Looped the test on my system for several hours.
Change-Id: I7946dc3464b11580f768acbd5909d7e5fcae3360
Reviewed-on: http://gerrit.cloudera.org:8080/9443
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/46f74df0
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/46f74df0
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/46f74df0
Branch: refs/heads/master
Commit: 46f74df0ad055737a8a088d9aed16617a6d9052e
Parents: fd66890
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Feb 23 18:37:13 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Mon Feb 26 00:32:10 2018 +0000
----------------------------------------------------------------------
be/src/service/impala-beeswax-server.cc | 8 +++-
be/src/service/impala-hs2-server.cc | 12 +++---
be/src/service/impala-server.cc | 56 ++++++++++++++++++++--------
3 files changed, 53 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/46f74df0/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index eb011a3..2ac80b2 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -485,7 +485,9 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
- return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+ string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
+ VLOG(1) << err_msg;
+ return Status::Expected(err_msg);
}
// Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
@@ -546,7 +548,9 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
TInsertResult* insert_result) {
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
- return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+ string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
+ VLOG(1) << err_msg;
+ return Status::Expected(err_msg);
}
Status query_status;
http://git-wip-us.apache.org/repos/asf/impala/blob/46f74df0/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 2f2ec40..e495dec 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -182,7 +182,9 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size
bool fetch_first, TFetchResultsResp* fetch_results) {
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (UNLIKELY(request_state == nullptr)) {
- return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
+ string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
+ VLOG(1) << err_msg;
+ return Status::Expected(err_msg);
}
// FetchResults doesn't have an associated session handle, so we presume that this
@@ -430,9 +432,9 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
SQLSTATE_GENERAL_ERROR);
if (session == NULL) {
- HS2_RETURN_IF_ERROR(
- return_val, Status(Substitute("Invalid session id: $0",
- PrintId(session_id))), SQLSTATE_GENERAL_ERROR);
+ string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
+ VLOG(1) << err_msg;
+ HS2_RETURN_IF_ERROR(return_val, Status::Expected(err_msg), SQLSTATE_GENERAL_ERROR);
}
// Optionally enable result caching to allow restarting fetches.
@@ -446,7 +448,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
iter->second.c_str(), iter->second.size(), &parse_result);
if (parse_result != StringParser::PARSE_SUCCESS) {
HS2_RETURN_IF_ERROR(
- return_val, Status(Substitute("Invalid value '$0' for '$1' option.",
+ return_val, Status::Expected(Substitute("Invalid value '$0' for '$1' option.",
iter->second, IMPALA_RESULT_CACHING_OPT)), SQLSTATE_GENERAL_ERROR);
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/46f74df0/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 4452205..66b8d0e 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -931,7 +931,10 @@ Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
// expire while checked out, so it must not be expired.
DCHECK(session_state->ref_count > 0 && !session_state->expired);
// The session may have been closed after it was checked out.
- if (session_state->closed) return Status("Session has been closed, ignoring query.");
+ if (session_state->closed) {
+ VLOG(1) << "RegisterQuery(): session has been closed, ignoring query.";
+ return Status::Expected("Session has been closed, ignoring query.");
+ }
const TUniqueId& query_id = request_state->query_id();
{
ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
@@ -962,7 +965,10 @@ Status ImpalaServer::SetQueryInflight(shared_ptr<SessionState> session_state,
DCHECK_GT(session_state->ref_count, 0);
DCHECK(!session_state->expired);
// The session may have been closed after it was checked out.
- if (session_state->closed) return Status("Session closed");
+ if (session_state->closed) {
+ VLOG(1) << "Session closed: cannot set " << PrintId(query_id) << " in-flight";
+ return Status::Expected("Session closed");
+ }
// Add query to the set that will be unregistered if sesssion is closed.
session_state->inflight_queries.insert(query_id);
++session_state->total_queries;
@@ -1009,7 +1015,8 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
auto entry = map_ref->find(query_id);
if (entry == map_ref->end()) {
- return Status("Invalid or unknown query handle");
+ VLOG(1) << "Invalid or unknown query handle " << PrintId(query_id);
+ return Status::Expected("Invalid or unknown query handle");
} else {
request_state = entry->second;
}
@@ -1085,7 +1092,9 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflig
const Status* cause) {
VLOG_QUERY << "Cancel(): query_id=" << PrintId(query_id);
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
- if (request_state == nullptr) return Status("Invalid or unknown query handle");
+ if (request_state == nullptr) {
+ return Status::Expected("Invalid or unknown query handle");
+ }
RETURN_IF_ERROR(request_state->Cancel(check_inflight, cause));
return Status::OK();
}
@@ -1101,7 +1110,9 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
if (ignore_if_absent) {
return Status::OK();
} else {
- return Status(Substitute("Invalid session id: $0", PrintId(session_id)));
+ string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
+ VLOG(1) << "CloseSessionInternal(): " << err_msg;
+ return Status::Expected(err_msg);
}
}
session_state = entry->second;
@@ -1139,7 +1150,9 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
SessionStateMap::iterator i = session_state_map_.find(session_id);
if (i == session_state_map_.end()) {
*session_state = std::shared_ptr<SessionState>();
- return Status(Substitute("Invalid session id: $0", PrintId(session_id)));
+ string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
+ VLOG(1) << "GetSessionState(): " << err_msg;
+ return Status::Expected(err_msg);
} else {
if (mark_active) {
lock_guard<mutex> session_lock(i->second->lock);
@@ -1148,9 +1161,12 @@ Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
ss << "Client session expired due to more than " << i->second->session_timeout
<< "s of inactivity (last activity was at: "
<< ToStringFromUnixMillis(i->second->last_accessed_ms) << ").";
- return Status(ss.str());
+ return Status::Expected(ss.str());
+ }
+ if (i->second->closed) {
+ VLOG(1) << "GetSessionState(): session " << PrintId(session_id) << " is closed.";
+ return Status::Expected("Session is closed");
}
- if (i->second->closed) return Status("Session is closed");
++i->second->ref_count;
}
*session_state = i->second;
@@ -1296,9 +1312,13 @@ void ImpalaServer::CancelFromThreadPool(uint32_t thread_id,
Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_user) {
if (user.empty()) {
- return Status("Unable to delegate using empty proxy username.");
+ const string err_msg("Unable to delegate using empty proxy username.");
+ VLOG(1) << err_msg;
+ return Status::Expected(err_msg);
} else if (do_as_user.empty()) {
- return Status("Unable to delegate using empty doAs username.");
+ const string err_msg("Unable to delegate using empty doAs username.");
+ VLOG(1) << err_msg;
+ return Status::Expected(err_msg);
}
stringstream error_msg;
@@ -1306,7 +1326,8 @@ Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_
<< do_as_user << "'.";
if (authorized_proxy_user_config_.size() == 0) {
error_msg << " User delegation is disabled.";
- return Status(error_msg.str());
+ VLOG(1) << error_msg;
+ return Status::Expected(error_msg.str());
}
// Get the short version of the user name (the user name up to the first '/' or '@')
@@ -1326,7 +1347,8 @@ Status ImpalaServer::AuthorizeProxyUser(const string& user, const string& do_as_
if (user == "*" || user == do_as_user) return Status::OK();
}
}
- return Status(error_msg.str());
+ VLOG(1) << error_msg;
+ return Status::Expected(error_msg.str());
}
void ImpalaServer::CatalogUpdateVersionInfo::UpdateCatalogVersionMetrics()
@@ -1600,8 +1622,10 @@ void ImpalaServer::MembershipCallback(
cause_msg << cancellation_entry->second[i];
if (i + 1 != cancellation_entry->second.size()) cause_msg << ", ";
}
- cancellation_thread_pool_->Offer(
- CancellationWork(cancellation_entry->first, Status(cause_msg.str()), false));
+ string cause_str = cause_msg.str();
+ LOG(INFO) << "Query " << PrintId(cancellation_entry->first) << ": " << cause_str;
+ cancellation_thread_pool_->Offer(CancellationWork(cancellation_entry->first,
+ Status::Expected(cause_msg.str()), false));
}
}
}
@@ -1877,7 +1901,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
"Query $0 expired due to execution time limit of $1",
PrintId(expiration_event->query_id),
PrettyPrinter::Print(exec_time_limit_s, TUnit::TIME_S));
- ExpireQuery(query_state.get(), Status(err_msg));
+ ExpireQuery(query_state.get(), Status::Expected(err_msg));
expiration_event = queries_by_timestamp_.erase(expiration_event);
continue;
}
@@ -1920,7 +1944,7 @@ void ImpalaServer::UnregisterSessionTimeout(int32_t session_timeout) {
"Query $0 expired due to client inactivity (timeout is $1)",
PrintId(expiration_event->query_id),
PrettyPrinter::Print(idle_timeout_s, TUnit::TIME_S));
- ExpireQuery(query_state.get(), Status(err_msg));
+ ExpireQuery(query_state.get(), Status::Expected(err_msg));
expiration_event = queries_by_timestamp_.erase(expiration_event);
} else {
// Iterator is moved on in every other branch.