You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/03/27 18:47:18 UTC
[4/7] impala git commit: IMPALA-6614: ClientRequestState should use
HS2 TOperationState
IMPALA-6614: ClientRequestState should use HS2 TOperationState
Currently it uses beeswax's QueryState enum, but the TOperationState
is a superset. In order to remove dependencies on beeswax, and also
set things up for a future change to use the TOperationState explicit
CANCELED_STATE (see IMPALA-1262), migrate CLR to use TOperationState.
The intent of this change is to make no client visible change.
Change-Id: I36287eaf8f1dac23c306b470f95f379dfdc6bb5b
Reviewed-on: http://gerrit.cloudera.org:8080/9501
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/46c95b52
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/46c95b52
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/46c95b52
Branch: refs/heads/2.x
Commit: 46c95b52075454c99e19523e132a8a026822b9ff
Parents: f05cf90
Author: Dan Hecht <dh...@cloudera.com>
Authored: Mon Mar 5 16:08:28 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Tue Mar 27 03:35:00 2018 +0000
----------------------------------------------------------------------
be/src/service/client-request-state.cc | 55 ++++++++++++++++++----------
be/src/service/client-request-state.h | 54 ++++++++++++++++-----------
be/src/service/impala-beeswax-server.cc | 11 +++---
be/src/service/impala-hs2-server.cc | 19 ++--------
be/src/service/impala-http-handler.cc | 3 +-
be/src/service/impala-server.cc | 9 +++--
6 files changed, 84 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 12bf3b7..29c493e 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -101,7 +101,7 @@ ClientRequestState::ClientRequestState(
TimePrecision::Nanosecond));
summary_profile_->AddInfoString("End Time", "");
summary_profile_->AddInfoString("Query Type", "N/A");
- summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
+ summary_profile_->AddInfoString("Query State", PrintQueryState(BeeswaxQueryState()));
summary_profile_->AddInfoString("Query Status", "OK");
summary_profile_->AddInfoString("Impala Version", GetVersionString(/* compact */ true));
summary_profile_->AddInfoString("User", effective_user());
@@ -214,7 +214,7 @@ Status ClientRequestState::Exec(TExecRequest* exec_request) {
}
default:
stringstream errmsg;
- errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
+ errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
return Status(errmsg.str());
}
}
@@ -643,8 +643,11 @@ void ClientRequestState::Wait() {
discard_result(UpdateQueryStatus(status));
}
if (status.ok()) {
- UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+ UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
}
+ // UpdateQueryStatus() or UpdateNonErrorOperationState() have updated operation_state_.
+ DCHECK(operation_state_ == TOperationState::FINISHED_STATE ||
+ operation_state_ == TOperationState::ERROR_STATE);
}
Status ClientRequestState::WaitInternal() {
@@ -719,17 +722,18 @@ Status ClientRequestState::RestartFetch() {
return Status::OK();
}
-void ClientRequestState::UpdateNonErrorQueryState(
- beeswax::QueryState::type query_state) {
+void ClientRequestState::UpdateNonErrorOperationState(
+ TOperationState::type operation_state) {
lock_guard<mutex> l(lock_);
- DCHECK(query_state != beeswax::QueryState::EXCEPTION);
- if (query_state_ < query_state) UpdateQueryState(query_state);
+ DCHECK(operation_state == TOperationState::RUNNING_STATE
+ || operation_state == TOperationState::FINISHED_STATE);
+ if (operation_state_ < operation_state) UpdateOperationState(operation_state);
}
Status ClientRequestState::UpdateQueryStatus(const Status& status) {
// Preserve the first non-ok status
if (!status.ok() && query_status_.ok()) {
- UpdateQueryState(beeswax::QueryState::EXCEPTION);
+ UpdateOperationState(TOperationState::ERROR_STATE);
query_status_ = status;
summary_profile_->AddInfoStringRedacted("Query Status", query_status_.GetDetail());
}
@@ -739,12 +743,13 @@ Status ClientRequestState::UpdateQueryStatus(const Status& status) {
Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
QueryResultSet* fetched_rows) {
- DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
+ // Wait() guarantees that we've transitioned at least to FINISHED_STATE (and any
+ // state beyond that should have a non-OK query_status_ set).
+ DCHECK(operation_state_ == TOperationState::FINISHED_STATE);
if (eos_) return Status::OK();
if (request_result_set_ != NULL) {
- UpdateQueryState(beeswax::QueryState::FINISHED);
int num_rows = 0;
const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
// max_rows <= 0 means no limit
@@ -772,9 +777,6 @@ Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
}
- // results will be ready after this call
- UpdateQueryState(beeswax::QueryState::FINISHED);
-
// Maximum number of rows to be fetched from the coord.
int32_t max_coord_rows = max_rows;
if (max_rows > 0) {
@@ -875,13 +877,13 @@ Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
Coordinator* coord;
{
lock_guard<mutex> lock(lock_);
- // If the query is completed or cancelled, no need to update state.
- bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
+ // If the query has reached a terminal state, no need to update the state.
+ bool already_done = eos_ || operation_state_ == TOperationState::ERROR_STATE;
if (!already_done && cause != NULL) {
DCHECK(!cause->ok());
discard_result(UpdateQueryStatus(*cause));
query_events_->MarkEvent("Cancelled");
- DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
+ DCHECK_EQ(operation_state_, TOperationState::ERROR_STATE);
}
// Get a copy of the coordinator pointer while holding 'lock_'.
coord = coord_.get();
@@ -1100,10 +1102,23 @@ void ClientRequestState::ClearResultCache() {
result_cache_.reset(NULL);
}
-void ClientRequestState::UpdateQueryState(
- beeswax::QueryState::type query_state) {
- query_state_ = query_state;
- summary_profile_->AddInfoString("Query State", PrintQueryState(query_state_));
+void ClientRequestState::UpdateOperationState(
+ TOperationState::type operation_state) {
+ operation_state_ = operation_state;
+ summary_profile_->AddInfoString("Query State", PrintQueryState(BeeswaxQueryState()));
+}
+
+beeswax::QueryState::type ClientRequestState::BeeswaxQueryState() const {
+ switch (operation_state_) {
+ case TOperationState::INITIALIZED_STATE: return beeswax::QueryState::CREATED;
+ case TOperationState::RUNNING_STATE: return beeswax::QueryState::RUNNING;
+ case TOperationState::FINISHED_STATE: return beeswax::QueryState::FINISHED;
+ case TOperationState::ERROR_STATE: return beeswax::QueryState::EXCEPTION;
+ default: {
+ DCHECK(false) << "Add explicit translation for all used TOperationState values";
+ return beeswax::QueryState::EXCEPTION;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 0341ec5..0c05bc4 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -28,7 +28,7 @@
#include "util/condition-variable.h"
#include "util/runtime-profile.h"
#include "gen-cpp/Frontend_types.h"
-#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/ImpalaHiveServer2Service.h"
#include <boost/thread.hpp>
#include <boost/unordered_set.hpp>
@@ -74,8 +74,8 @@ class ClientRequestState {
Status Exec(const TMetadataOpRequest& exec_request) WARN_UNUSED_RESULT;
/// Call this to ensure that rows are ready when calling FetchRows(). Updates the
- /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded
- /// by call to Exec(). Waits for all child queries to complete. Takes lock_.
+ /// query_status_, and advances operation_state_ to FINISHED or EXCEPTION. Must be
+ /// preceded by call to Exec(). Waits for all child queries to complete. Takes lock_.
void Wait();
/// Calls Wait() asynchronously in a thread and returns immediately.
@@ -93,7 +93,7 @@ class ClientRequestState {
/// Caller needs to hold fetch_rows_lock_ and lock_.
/// Caller should verify that EOS has not be reached before calling.
/// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
- /// Also updates query_state_/status_ in case of error.
+ /// Also updates operation_state_/query_status_ in case of error.
Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows)
WARN_UNUSED_RESULT;
@@ -106,11 +106,12 @@ class ClientRequestState {
/// The caller must hold fetch_rows_lock_ and lock_.
Status RestartFetch() WARN_UNUSED_RESULT;
- /// Update query state if the requested state isn't already obsolete. This is only for
- /// non-error states - if the query encounters an error the query status needs to be set
- /// with information about the error so UpdateQueryStatus must be used instead.
- /// Takes lock_.
- void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
+ /// Update operation state if the requested state isn't already obsolete. This is
+ /// only for non-error states - if the query encounters an error the query status
+ /// needs to be set with information about the error so UpdateQueryStatus() must be
+ /// used instead. Takes lock_.
+ void UpdateNonErrorOperationState(
+ apache::hive::service::cli::thrift::TOperationState::type operation_state);
/// Update the query status and the "Query Status" summary profile string.
/// If current status is already != ok, no update is made (we preserve the first error)
@@ -124,7 +125,7 @@ class ClientRequestState {
/// Cancels the child queries and the coordinator with the given cause.
/// If cause is NULL, assume this was deliberately cancelled by the user.
- /// Otherwise, sets state to EXCEPTION.
+ /// Otherwise, sets state to ERROR_STATE (TODO: use CANCELED_STATE).
/// Does nothing if the query has reached EOS or already cancelled.
///
/// Only returns an error if 'check_inflight' is true and the query is not yet
@@ -137,7 +138,7 @@ class ClientRequestState {
void Done();
/// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
- /// The given cache is owned by this query exec state, even if an error is returned.
+ /// The given cache is owned by this client request state, even if an error is returned.
/// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum.
Status SetResultCache(QueryResultSet* cache, int64_t max_size) WARN_UNUSED_RESULT;
@@ -179,7 +180,12 @@ class ClientRequestState {
}
boost::mutex* lock() { return &lock_; }
boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
- beeswax::QueryState::type query_state() const { return query_state_; }
+ apache::hive::service::cli::thrift::TOperationState::type operation_state() const {
+ return operation_state_;
+ }
+ // Translate operation_state() to a beeswax::QueryState. TODO: remove calls to this
+ // and replace with uses of operation_state() directly.
+ beeswax::QueryState::type BeeswaxQueryState() const;
const Status& query_status() const { return query_status_; }
void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
void set_user_profile_access(bool user_has_profile_access) {
@@ -335,15 +341,20 @@ class ClientRequestState {
bool is_cancelled_ = false; // if true, Cancel() was called.
bool eos_ = false; // if true, there are no more rows to return
- /// We enforce the invariant that query_status_ is not OK iff query_state_ is EXCEPTION,
- /// given that lock_ is held. query_state_ should only be updated using
- /// UpdateQueryState(), to ensure that the query profile is also updated.
- beeswax::QueryState::type query_state_ = beeswax::QueryState::CREATED;
+
+ /// We enforce the invariant that query_status_ is not OK iff operation_state_ is
+ /// ERROR_STATE, given that lock_ is held. operation_state_ should only be updated
+ /// using UpdateOperationState(), to ensure that the query profile is also updated.
+ apache::hive::service::cli::thrift::TOperationState::type operation_state_ =
+ apache::hive::service::cli::thrift::TOperationState::INITIALIZED_STATE;
+
Status query_status_;
TExecRequest exec_request_;
+
/// If true, effective_user() has access to the runtime profile and execution
/// summary.
bool user_has_profile_access_ = true;
+
TResultSetMetadata result_metadata_; // metadata for select query
RowBatch* current_batch_ = nullptr; // the current row batch; only applicable if coord is set
int current_batch_row_ = 0 ; // number of rows fetched within the current batch
@@ -395,10 +406,10 @@ class ClientRequestState {
/// Executes a LOAD DATA
Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
- /// Core logic of Wait(). Does not update query_state_/status_.
+ /// Core logic of Wait(). Does not update operation_state_/query_status_.
Status WaitInternal() WARN_UNUSED_RESULT;
- /// Core logic of FetchRows(). Does not update query_state_/status_.
+ /// Core logic of FetchRows(). Does not update operation_state_/query_status_.
/// Caller needs to hold fetch_rows_lock_ and lock_.
Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows)
WARN_UNUSED_RESULT;
@@ -436,10 +447,11 @@ class ClientRequestState {
/// This function is a no-op if the cache has already been cleared.
void ClearResultCache();
- /// Update the query state and the "Query State" summary profile string.
+ /// Update the operation state and the "Query State" summary profile string.
/// Does not take lock_, but requires it: caller must ensure lock_
- /// is taken before calling UpdateQueryState.
- void UpdateQueryState(beeswax::QueryState::type query_state);
+ /// is taken before calling UpdateOperationState.
+ void UpdateOperationState(
+ apache::hive::service::cli::thrift::TOperationState::type operation_state);
/// Gets the query options, their values and levels and populates the result set
/// with them. It covers the subset of options for 'SET' and all of them for
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 2ac80b2..b827ff3 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -65,7 +65,7 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
- request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+ request_state->UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
// start thread to wait for results to become available, which will allow
// us to advance query state to FINISHED or EXCEPTION
Status status = request_state->WaitAsync();
@@ -110,7 +110,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
- request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+ request_state->UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
Status status = SetQueryInflight(session, request_state);
@@ -255,9 +255,10 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
// Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
- DCHECK_EQ(request_state->query_state() == beeswax::QueryState::EXCEPTION,
+ beeswax::QueryState::type query_state = request_state->BeeswaxQueryState();
+ DCHECK_EQ(query_state == beeswax::QueryState::EXCEPTION,
!request_state->query_status().ok());
- return request_state->query_state();
+ return query_state;
}
void ImpalaServer::echo(string& echo_string, const string& input_string) {
@@ -293,7 +294,7 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
// Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
// guaranteed to see the error query_status.
lock_guard<mutex> l(*request_state->lock());
- DCHECK_EQ(request_state->query_state() == beeswax::QueryState::EXCEPTION,
+ DCHECK_EQ(request_state->BeeswaxQueryState() == beeswax::QueryState::EXCEPTION,
!request_state->query_status().ok());
// If the query status is !ok, include the status error message at the top of the log.
if (!request_state->query_status().ok()) {
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/impala-hs2-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index e495dec..4381f81 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -86,18 +86,6 @@ namespace impala {
const string IMPALA_RESULT_CACHING_OPT = "impala.resultset.cache.size";
-// Helper function to translate between Beeswax and HiveServer2 type
-static TOperationState::type QueryStateToTOperationState(
- const beeswax::QueryState::type& query_state) {
- switch (query_state) {
- case beeswax::QueryState::CREATED: return TOperationState::INITIALIZED_STATE;
- case beeswax::QueryState::RUNNING: return TOperationState::RUNNING_STATE;
- case beeswax::QueryState::FINISHED: return TOperationState::FINISHED_STATE;
- case beeswax::QueryState::EXCEPTION: return TOperationState::ERROR_STATE;
- default: return TOperationState::UKNOWN_STATE;
- }
-}
-
void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
TMetadataOpRequest* request, TOperationHandle* handle, thrift::TStatus* status) {
TUniqueId session_id;
@@ -161,7 +149,7 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
return;
}
- request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+ request_state->UpdateNonErrorOperationState(TOperationState::FINISHED_STATE);
Status inflight_status = SetQueryInflight(session, request_state);
if (!inflight_status.ok()) {
@@ -466,7 +454,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
cache_num_rows);
if (!status.ok()) goto return_error;
}
- request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+ request_state->UpdateNonErrorOperationState(TOperationState::RUNNING_STATE);
// Start thread to wait for results to become available.
status = request_state->WaitAsync();
if (!status.ok()) goto return_error;
@@ -651,8 +639,7 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
{
lock_guard<mutex> l(*request_state->lock());
- TOperationState::type operation_state = QueryStateToTOperationState(
- request_state->query_state());
+ TOperationState::type operation_state = request_state->operation_state();
return_val.__set_operationState(operation_state);
if (operation_state == TOperationState::ERROR_STATE) {
DCHECK(!request_state->query_status().ok());
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 0156023..43b03d1 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -42,6 +42,7 @@
#include "common/names.h"
+using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace beeswax;
using namespace impala;
@@ -757,7 +758,7 @@ void ImpalaHttpHandler::QuerySummaryHandler(bool include_json_plan, bool include
// state lock to be acquired, since it could potentially be an expensive
// call, if the table Catalog metadata loading is in progress. Instead
// update the caller that the plan information is unavailable.
- if (request_state->query_state() == beeswax::QueryState::CREATED) {
+ if (request_state->operation_state() == TOperationState::INITIALIZED_STATE) {
document->AddMember(
"plan_metadata_unavailable", "true", document->GetAllocator());
return;
http://git-wip-us.apache.org/repos/asf/impala/blob/46c95b52/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index dd43dd1..ea88d73 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -98,10 +98,11 @@ using boost::get_system_time;
using boost::system_time;
using boost::uuids::random_generator;
using boost::uuids::uuid;
+using namespace apache::hive::service::cli::thrift;
using namespace apache::thrift;
using namespace apache::thrift::transport;
-using namespace boost::posix_time;
using namespace beeswax;
+using namespace boost::posix_time;
using namespace rapidjson;
using namespace strings;
@@ -615,8 +616,8 @@ Status ImpalaServer::GetRuntimeProfileStr(const TUniqueId& query_id,
{
shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
if (request_state.get() != nullptr) {
- // For queries in CREATED state, the profile information isn't populated yet.
- if (request_state->query_state() == beeswax::QueryState::CREATED) {
+ // For queries in INITIALIZED_STATE, the profile information isn't populated yet.
+ if (request_state->operation_state() == TOperationState::INITIALIZED_STATE) {
return Status::Expected("Query plan is not ready.");
}
lock_guard<mutex> l(*request_state->lock());
@@ -1689,7 +1690,7 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
total_fragments = coord->progress().total();
has_coord = true;
}
- query_state = request_state.query_state();
+ query_state = request_state.BeeswaxQueryState();
num_rows_fetched = request_state.num_rows_fetched();
query_status = request_state.query_status();