You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:00 UTC
[06/13] incubator-impala git commit: IMPALA-2550: Switch to per-query
exec rpc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index c886bf4..cfb0bf3 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -30,7 +30,7 @@ add_library(Service
impala-hs2-server.cc
impala-beeswax-server.cc
impala-internal-service.cc
- query-exec-state.cc
+ client-request-state.cc
query-options.cc
query-result-set.cc
child-query.cc
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 5b68de4..f58aacc 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -17,7 +17,7 @@
#include "service/child-query.h"
#include "service/impala-server.inline.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
#include "service/query-options.h"
#include "util/debug-util.h"
@@ -34,7 +34,7 @@ const string ChildQuery::PARENT_QUERY_OPT = "impala.parent_query_id";
// any HS2 "RPC" into the impala server. It is important not to hold any locks (in
// particular the parent query's lock_) while invoking HS2 functions to avoid deadlock.
Status ChildQuery::ExecAndFetch() {
- const TUniqueId& session_id = parent_exec_state_->session_id();
+ const TUniqueId& session_id = parent_request_state_->session_id();
VLOG_QUERY << "Executing child query: " << query_ << " in session "
<< PrintId(session_id);
@@ -45,8 +45,9 @@ Status ChildQuery::ExecAndFetch() {
ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id,
&exec_stmt_req.sessionHandle.sessionId);
exec_stmt_req.__set_statement(query_);
- SetQueryOptions(parent_exec_state_->exec_request().query_options, &exec_stmt_req);
- exec_stmt_req.confOverlay[PARENT_QUERY_OPT] = PrintId(parent_exec_state_->query_id());
+ SetQueryOptions(parent_request_state_->exec_request().query_options, &exec_stmt_req);
+ exec_stmt_req.confOverlay[PARENT_QUERY_OPT] =
+ PrintId(parent_request_state_->query_id());
// Starting executing of the child query and setting is_running are not made atomic
// because holding a lock while calling into the parent_server_ may result in deadlock.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.h
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h
index 1c7a20e..36f6197 100644
--- a/be/src/service/child-query.h
+++ b/be/src/service/child-query.h
@@ -39,25 +39,25 @@ class ImpalaServer;
//
/// Parent queries are expected to call ExecAndWait() of a child query in a
/// separate thread, and then join that thread to wait for child-query completion.
-/// The parent QueryExecState is independent of the child query's QueryExecState,
+/// The parent ClientRequestState is independent of the child query's ClientRequestState,
/// with the exception that the child query selectively checks the parent's status
/// for failure/cancellation detection. Child queries should never call into their
-/// parent's QueryExecState to avoid deadlock.
+/// parent's ClientRequestState to avoid deadlock.
//
/// TODO: Compute stats is the only stmt that requires child queries. Once the
/// CatalogService performs background stats gathering the concept of child queries
/// will likely become obsolete. Remove this class and all child-query related code.
class ChildQuery {
public:
- ChildQuery(const std::string& query, ImpalaServer::QueryExecState* parent_exec_state,
+ ChildQuery(const std::string& query, ClientRequestState* parent_request_state,
ImpalaServer* parent_server)
: query_(query),
- parent_exec_state_(parent_exec_state),
+ parent_request_state_(parent_request_state),
parent_server_(parent_server),
is_running_(false),
is_cancelled_(false) {
DCHECK(!query_.empty());
- DCHECK(parent_exec_state_ != NULL);
+ DCHECK(parent_request_state_ != NULL);
DCHECK(parent_server_ != NULL);
}
@@ -65,7 +65,7 @@ class ChildQuery {
/// (boost::mutex's operator= and copy c'tor are private)
ChildQuery(const ChildQuery& other)
: query_(other.query_),
- parent_exec_state_(other.parent_exec_state_),
+ parent_request_state_(other.parent_request_state_),
parent_server_(other.parent_server_),
is_running_(other.is_running_),
is_cancelled_(other.is_cancelled_) {}
@@ -74,7 +74,7 @@ class ChildQuery {
/// (boost::mutex's operator= and copy c'tor are private)
ChildQuery& operator=(const ChildQuery& other) {
query_ = other.query_;
- parent_exec_state_ = other.parent_exec_state_;
+ parent_request_state_ = other.parent_request_state_;
parent_server_ = other.parent_server_;
is_running_ = other.is_running_;
is_cancelled_ = other.is_cancelled_;
@@ -85,11 +85,11 @@ class ChildQuery {
Status ExecAndFetch();
/// Cancels and closes the given child query if it is running. Sets is_cancelled_.
- /// Child queries can be cancelled by the parent query through QueryExecState::Cancel().
+ /// Child queries can be cancelled by the parent query through ClientRequestState::Cancel().
/// Child queries should never cancel their parent to avoid deadlock (but the parent
/// query may decide to cancel itself based on a non-OK status from a child query).
- /// Note that child queries have a different QueryExecState than their parent query,
- /// so cancellation of a child query does not call into the parent's QueryExecState.
+ /// Note that child queries have a different ClientRequestState than their parent query,
+ /// so cancellation of a child query does not call into the parent's ClientRequestState.
void Cancel();
const apache::hive::service::cli::thrift::TTableSchema& result_schema() {
@@ -119,7 +119,7 @@ class ChildQuery {
/// Execution state of parent query. Used to synchronize and propagate parent
/// cancellations/failures to this child query. Not owned.
- ImpalaServer::QueryExecState* parent_exec_state_;
+ ClientRequestState* parent_request_state_;
/// Parent Impala server used for executing this child query. Not owned.
ImpalaServer* parent_server_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
new file mode 100644
index 0000000..2f447c7
--- /dev/null
+++ b/be/src/service/client-request-state.cc
@@ -0,0 +1,1085 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/client-request-state.h"
+
+#include <limits>
+#include <gutil/strings/substitute.h>
+
+#include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/exec-env.h"
+#include "scheduling/admission-controller.h"
+#include "scheduling/scheduler.h"
+#include "service/frontend.h"
+#include "service/impala-server.h"
+#include "service/query-options.h"
+#include "service/query-result-set.h"
+#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
+#include "util/time.h"
+
+#include "gen-cpp/CatalogService.h"
+#include "gen-cpp/CatalogService_types.h"
+
+#include <thrift/Thrift.h>
+
+#include "common/names.h"
+
+using boost::algorithm::join;
+using namespace apache::hive::service::cli::thrift;
+using namespace apache::thrift;
+using namespace beeswax;
+using namespace strings;
+
+DECLARE_int32(catalog_service_port);
+DECLARE_string(catalog_service_host);
+DECLARE_int64(max_result_cache_size);
+
+namespace impala {
+
+// Keys into the info string map of the runtime profile referring to specific
+// items used by CM for monitoring purposes.
+static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
+static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory Reservation";
+static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
+static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
+static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
+
+ClientRequestState::ClientRequestState(
+ const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
+ ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session)
+ : query_ctx_(query_ctx),
+ last_active_time_ms_(numeric_limits<int64_t>::max()),
+ ref_count_(0L),
+ child_query_executor_(new ChildQueryExecutor),
+ exec_env_(exec_env),
+ is_block_on_wait_joining_(false),
+ session_(session),
+ schedule_(NULL),
+ coord_(NULL),
+ result_cache_max_size_(-1),
+ profile_(&profile_pool_, "Query"), // assign name w/ id after planning
+ server_profile_(&profile_pool_, "ImpalaServer"),
+ summary_profile_(&profile_pool_, "Summary"),
+ is_cancelled_(false),
+ eos_(false),
+ query_state_(beeswax::QueryState::CREATED),
+ current_batch_(NULL),
+ current_batch_row_(0),
+ num_rows_fetched_(0),
+ fetched_rows_(false),
+ frontend_(frontend),
+ parent_server_(server),
+ start_time_(TimestampValue::LocalTime()) {
+#ifndef NDEBUG
+ profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
+ "DEBUG build of Impala. Use RELEASE builds to measure query performance.");
+#endif
+ row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
+ client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
+ query_events_ = summary_profile_.AddEventSequence("Query Timeline");
+ query_events_->Start();
+ profile_.AddChild(&summary_profile_);
+
+ profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
+ summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
+ summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type()));
+ if (session_type() == TSessionType::HIVESERVER2) {
+ summary_profile_.AddInfoString("HiveServer2 Protocol Version",
+ Substitute("V$0", 1 + session->hs2_version));
+ }
+ summary_profile_.AddInfoString("Start Time", start_time().DebugString());
+ summary_profile_.AddInfoString("End Time", "");
+ summary_profile_.AddInfoString("Query Type", "N/A");
+ summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+ summary_profile_.AddInfoString("Query Status", "OK");
+ summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
+ summary_profile_.AddInfoString("User", effective_user());
+ summary_profile_.AddInfoString("Connected User", connected_user());
+ summary_profile_.AddInfoString("Delegated User", do_as_user());
+ summary_profile_.AddInfoString("Network Address",
+ lexical_cast<string>(session_->network_address));
+ summary_profile_.AddInfoString("Default Db", default_db());
+ summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
+ summary_profile_.AddInfoString("Coordinator",
+ TNetworkAddressToString(exec_env->backend_address()));
+}
+
+ClientRequestState::~ClientRequestState() {
+ DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
+}
+
+Status ClientRequestState::SetResultCache(QueryResultSet* cache,
+ int64_t max_size) {
+ lock_guard<mutex> l(lock_);
+ DCHECK(result_cache_ == NULL);
+ result_cache_.reset(cache);
+ if (max_size > FLAGS_max_result_cache_size) {
+ return Status(
+ Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.",
+ max_size, FLAGS_max_result_cache_size));
+ }
+ result_cache_max_size_ = max_size;
+ return Status::OK();
+}
+
+Status ClientRequestState::Exec(TExecRequest* exec_request) {
+ MarkActive();
+ exec_request_ = *exec_request;
+
+ profile_.AddChild(&server_profile_);
+ summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
+ summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+ summary_profile_.AddInfoString("Query Options (non default)",
+ DebugQueryOptions(query_ctx_.client_request.query_options));
+
+ switch (exec_request->stmt_type) {
+ case TStmtType::QUERY:
+ case TStmtType::DML:
+ DCHECK(exec_request_.__isset.query_exec_request);
+ return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
+ case TStmtType::EXPLAIN: {
+ request_result_set_.reset(new vector<TResultRow>(
+ exec_request_.explain_result.results));
+ return Status::OK();
+ }
+ case TStmtType::DDL: {
+ DCHECK(exec_request_.__isset.catalog_op_request);
+ return ExecDdlRequest();
+ }
+ case TStmtType::LOAD: {
+ DCHECK(exec_request_.__isset.load_data_request);
+ TLoadDataResp response;
+ RETURN_IF_ERROR(
+ frontend_->LoadData(exec_request_.load_data_request, &response));
+ request_result_set_.reset(new vector<TResultRow>);
+ request_result_set_->push_back(response.load_summary);
+
+ // Now refresh the table metadata.
+ TCatalogOpRequest reset_req;
+ reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
+ reset_req.__set_reset_metadata_params(TResetMetadataRequest());
+ reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+ reset_req.reset_metadata_params.__set_is_refresh(true);
+ reset_req.reset_metadata_params.__set_table_name(
+ exec_request_.load_data_request.table_name);
+ catalog_op_executor_.reset(
+ new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+ RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
+ RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+ *catalog_op_executor_->update_catalog_result(),
+ exec_request_.query_options.sync_ddl));
+ return Status::OK();
+ }
+ case TStmtType::SET: {
+ DCHECK(exec_request_.__isset.set_query_option_request);
+ lock_guard<mutex> l(session_->lock);
+ if (exec_request_.set_query_option_request.__isset.key) {
+ // "SET key=value" updates the session query options.
+ DCHECK(exec_request_.set_query_option_request.__isset.value);
+ RETURN_IF_ERROR(SetQueryOption(
+ exec_request_.set_query_option_request.key,
+ exec_request_.set_query_option_request.value,
+ &session_->default_query_options,
+ &session_->set_query_options_mask));
+ SetResultSet({}, {});
+ } else {
+ // "SET" returns a table of all query options.
+ map<string, string> config;
+ TQueryOptionsToMap(
+ session_->default_query_options, &config);
+ vector<string> keys, values;
+ map<string, string>::const_iterator itr = config.begin();
+ for (; itr != config.end(); ++itr) {
+ keys.push_back(itr->first);
+ values.push_back(itr->second);
+ }
+ SetResultSet(keys, values);
+ }
+ return Status::OK();
+ }
+ default:
+ stringstream errmsg;
+ errmsg << "Unknown exec request stmt type: " << exec_request_.stmt_type;
+ return Status(errmsg.str());
+ }
+}
+
+Status ClientRequestState::ExecLocalCatalogOp(
+ const TCatalogOpRequest& catalog_op) {
+ switch (catalog_op.op_type) {
+ case TCatalogOpType::USE: {
+ lock_guard<mutex> l(session_->lock);
+ session_->database = exec_request_.catalog_op_request.use_db_params.db;
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_TABLES: {
+ const TShowTablesParams* params = &catalog_op.show_tables_params;
+ // A NULL pattern means match all tables. However, Thrift string types can't
+ // be NULL in C++, so we have to test if it's set rather than just blindly
+ // using the value.
+ const string* table_name =
+ params->__isset.show_pattern ? &(params->show_pattern) : NULL;
+ TGetTablesResult table_names;
+ RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name,
+ &query_ctx_.session, &table_names));
+ SetResultSet(table_names.tables);
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_DBS: {
+ const TShowDbsParams* params = &catalog_op.show_dbs_params;
+ TGetDbsResult dbs;
+ const string* db_pattern =
+ params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
+ RETURN_IF_ERROR(
+ frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs));
+ vector<string> names, comments;
+ names.reserve(dbs.dbs.size());
+ comments.reserve(dbs.dbs.size());
+ for (const TDatabase& db: dbs.dbs) {
+ names.push_back(db.db_name);
+ comments.push_back(db.metastore_db.description);
+ }
+ SetResultSet(names, comments);
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_DATA_SRCS: {
+ const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
+ TGetDataSrcsResult result;
+ const string* pattern =
+ params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
+ RETURN_IF_ERROR(
+ frontend_->GetDataSrcMetadata(pattern, &result));
+ SetResultSet(result.data_src_names, result.locations, result.class_names,
+ result.api_versions);
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_STATS: {
+ const TShowStatsParams& params = catalog_op.show_stats_params;
+ TResultSet response;
+ RETURN_IF_ERROR(frontend_->GetStats(params, &response));
+ // Set the result set and its schema from the response.
+ request_result_set_.reset(new vector<TResultRow>(response.rows));
+ result_metadata_ = response.schema;
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_FUNCTIONS: {
+ const TShowFunctionsParams* params = &catalog_op.show_fns_params;
+ TGetFunctionsResult functions;
+ const string* fn_pattern =
+ params->__isset.show_pattern ? (¶ms->show_pattern) : NULL;
+ RETURN_IF_ERROR(frontend_->GetFunctions(
+ params->category, params->db, fn_pattern, &query_ctx_.session, &functions));
+ SetResultSet(functions.fn_ret_types, functions.fn_signatures,
+ functions.fn_binary_types, functions.fn_persistence);
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_ROLES: {
+ const TShowRolesParams& params = catalog_op.show_roles_params;
+ if (params.is_admin_op) {
+ // Verify the user has privileges to perform this operation by checking against
+ // the Sentry Service (via the Catalog Server).
+ catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+ &server_profile_));
+
+ TSentryAdminCheckRequest req;
+ req.__set_header(TCatalogServiceRequestHeader());
+ req.header.__set_requesting_user(effective_user());
+ RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
+ }
+
+ // If we have made it here, the user has privileges to execute this operation.
+ // Return the results.
+ TShowRolesResult result;
+ RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
+ SetResultSet(result.role_names);
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_GRANT_ROLE: {
+ const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
+ if (params.is_admin_op) {
+ // Verify the user has privileges to perform this operation by checking against
+ // the Sentry Service (via the Catalog Server).
+ catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+ &server_profile_));
+
+ TSentryAdminCheckRequest req;
+ req.__set_header(TCatalogServiceRequestHeader());
+ req.header.__set_requesting_user(effective_user());
+ RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
+ }
+
+ TResultSet response;
+ RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response));
+ // Set the result set and its schema from the response.
+ request_result_set_.reset(new vector<TResultRow>(response.rows));
+ result_metadata_ = response.schema;
+ return Status::OK();
+ }
+ case TCatalogOpType::DESCRIBE_DB: {
+ TDescribeResult response;
+ RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params,
+ &response));
+ // Set the result set
+ request_result_set_.reset(new vector<TResultRow>(response.results));
+ return Status::OK();
+ }
+ case TCatalogOpType::DESCRIBE_TABLE: {
+ TDescribeResult response;
+ RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
+ &response));
+ // Set the result set
+ request_result_set_.reset(new vector<TResultRow>(response.results));
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_CREATE_TABLE: {
+ string response;
+ RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
+ &response));
+ SetResultSet(vector<string>(1, response));
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_CREATE_FUNCTION: {
+ string response;
+ RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params,
+ &response));
+ SetResultSet(vector<string>(1, response));
+ return Status::OK();
+ }
+ case TCatalogOpType::SHOW_FILES: {
+ TResultSet response;
+ RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
+ // Set the result set and its schema from the response.
+ request_result_set_.reset(new vector<TResultRow>(response.rows));
+ result_metadata_ = response.schema;
+ return Status::OK();
+ }
+ default: {
+ stringstream ss;
+ ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
+ return Status(ss.str());
+ }
+ }
+}
+
+Status ClientRequestState::ExecQueryOrDmlRequest(
+ const TQueryExecRequest& query_exec_request) {
+ // we always need at least one plan fragment
+ DCHECK(query_exec_request.plan_exec_info.size() > 0);
+
+ if (query_exec_request.__isset.query_plan) {
+ stringstream plan_ss;
+ // Add some delimiters to make it clearer where the plan
+ // begins and the profile ends
+ plan_ss << "\n----------------\n"
+ << query_exec_request.query_plan
+ << "----------------";
+ summary_profile_.AddInfoString("Plan", plan_ss.str());
+ }
+ // Add info strings consumed by CM: Estimated mem and tables missing stats.
+ if (query_exec_request.__isset.per_host_mem_estimate) {
+ stringstream ss;
+ ss << query_exec_request.per_host_mem_estimate;
+ summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
+ }
+ if (query_exec_request.__isset.per_host_min_reservation) {
+ stringstream ss;
+ ss << query_exec_request.per_host_min_reservation;
+ summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
+ }
+ if (!query_exec_request.query_ctx.__isset.parent_query_id &&
+ query_exec_request.query_ctx.__isset.tables_missing_stats &&
+ !query_exec_request.query_ctx.tables_missing_stats.empty()) {
+ stringstream ss;
+ const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
+ for (int i = 0; i < tbls.size(); ++i) {
+ if (i != 0) ss << ",";
+ ss << tbls[i].db_name << "." << tbls[i].table_name;
+ }
+ summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+ }
+
+ if (!query_exec_request.query_ctx.__isset.parent_query_id &&
+ query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
+ !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
+ stringstream ss;
+ const vector<TTableName>& tbls =
+ query_exec_request.query_ctx.tables_with_corrupt_stats;
+ for (int i = 0; i < tbls.size(); ++i) {
+ if (i != 0) ss << ",";
+ ss << tbls[i].db_name << "." << tbls[i].table_name;
+ }
+ summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+ }
+
+ if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
+ !query_exec_request.query_ctx.tables_missing_diskids.empty()) {
+ stringstream ss;
+ const vector<TTableName>& tbls =
+ query_exec_request.query_ctx.tables_missing_diskids;
+ for (int i = 0; i < tbls.size(); ++i) {
+ if (i != 0) ss << ",";
+ ss << tbls[i].db_name << "." << tbls[i].table_name;
+ }
+ summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
+ }
+
+ {
+ lock_guard<mutex> l(lock_);
+ // Don't start executing the query if Cancel() was called concurrently with Exec().
+ if (is_cancelled_) return Status::CANCELLED;
+ // TODO: make schedule local to coordinator and move schedule_->Release() into
+ // Coordinator::TearDown()
+ schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
+ exec_request_.query_options, &summary_profile_, query_events_));
+ }
+ Status status = exec_env_->scheduler()->Schedule(schedule_.get());
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+
+ if (exec_env_->admission_controller() != nullptr) {
+ status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+ }
+
+ coord_.reset(new Coordinator(*schedule_, query_events_));
+ status = coord_->Exec();
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+
+ profile_.AddChild(coord_->query_profile());
+ return Status::OK();
+}
+
+Status ClientRequestState::ExecDdlRequest() {
+ string op_type = catalog_op_type() == TCatalogOpType::DDL ?
+ PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
+ summary_profile_.AddInfoString("DDL Type", op_type);
+
+ if (catalog_op_type() != TCatalogOpType::DDL &&
+ catalog_op_type() != TCatalogOpType::RESET_METADATA) {
+ Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
+ lock_guard<mutex> l(lock_);
+ return UpdateQueryStatus(status);
+ }
+
+ if (ddl_type() == TDdlType::COMPUTE_STATS) {
+ TComputeStatsParams& compute_stats_params =
+ exec_request_.catalog_op_request.ddl_params.compute_stats_params;
+ // Add child queries for computing table and column stats.
+ vector<ChildQuery> child_queries;
+ if (compute_stats_params.__isset.tbl_stats_query) {
+ child_queries.push_back(
+ ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_));
+ }
+ if (compute_stats_params.__isset.col_stats_query) {
+ child_queries.push_back(
+ ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
+ }
+
+ if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries));
+ return Status::OK();
+ }
+
+ catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+ &server_profile_));
+ Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+
+ // If this is a CTAS request, there will usually be more work to do
+ // after executing the CREATE TABLE statement (the INSERT portion of the operation).
+ // The exception is if the user specified IF NOT EXISTS and the table already
+ // existed, in which case we do not execute the INSERT.
+ if (catalog_op_type() == TCatalogOpType::DDL &&
+ ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
+ !catalog_op_executor_->ddl_exec_response()->new_table_created) {
+ DCHECK(exec_request_.catalog_op_request.
+ ddl_params.create_table_params.if_not_exists);
+ return Status::OK();
+ }
+
+ // Add newly created table to catalog cache.
+ RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+ *catalog_op_executor_->update_catalog_result(),
+ exec_request_.query_options.sync_ddl));
+
+ if (catalog_op_type() == TCatalogOpType::DDL &&
+ ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
+ // At this point, the remainder of the CTAS request executes
+ // like a normal DML request. As with other DML requests, it will
+ // wait for another catalog update if any partitions were altered as a result
+ // of the operation.
+ DCHECK(exec_request_.__isset.query_exec_request);
+ RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
+ }
+
+ // Set the results to be reported to the client.
+ SetResultSet(catalog_op_executor_->ddl_exec_response());
+ return Status::OK();
+}
+
+void ClientRequestState::Done() {
+ MarkActive();
+ // Make sure we join on wait_thread_ before we finish (and especially before this object
+ // is destroyed).
+ BlockOnWait();
+
+ // Update latest observed Kudu timestamp stored in the session from the coordinator.
+ // Needs to take the session_ lock which must not be taken while holding lock_, so this
+ // must happen before taking lock_ below.
+ if (coord_.get() != NULL) {
+ // This is safe to access on coord_ after Wait() has been called.
+ uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+ if (latest_kudu_ts > 0) {
+ VLOG_RPC << "Updating session (id=" << session_id() << ") with latest "
+ << "observed Kudu timestamp: " << latest_kudu_ts;
+ lock_guard<mutex> session_lock(session_->lock);
+ session_->kudu_latest_observed_ts = std::max<uint64_t>(
+ session_->kudu_latest_observed_ts, latest_kudu_ts);
+ }
+ }
+
+ unique_lock<mutex> l(lock_);
+ end_time_ = TimestampValue::LocalTime();
+ summary_profile_.AddInfoString("End Time", end_time().DebugString());
+ summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+ query_events_->MarkEvent("Unregister query");
+
+ // Update result set cache metrics, and update mem limit accounting before tearing
+ // down the coordinator.
+ ClearResultCache();
+
+ if (coord_.get() != NULL) {
+ // Release any reserved resources.
+ if (exec_env_->admission_controller() != nullptr) {
+ Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
+ if (!status.ok()) {
+ LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
+ << " because of error: " << status.GetDetail();
+ }
+ }
+ coord_->TearDown();
+ }
+}
+
+Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
+ TResultSet metadata_op_result;
+ // Like the other Exec(), fill out as much profile information as we're able to.
+ summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
+ summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+ RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
+ &metadata_op_result));
+ result_metadata_ = metadata_op_result.schema;
+ request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
+ return Status::OK();
+}
+
+void ClientRequestState::WaitAsync() {
+ wait_thread_.reset(new Thread(
+ "query-exec-state", "wait-thread", &ClientRequestState::Wait, this));
+}
+
+void ClientRequestState::BlockOnWait() {
+ unique_lock<mutex> l(lock_);
+ if (wait_thread_.get() == NULL) return;
+ if (!is_block_on_wait_joining_) {
+ // No other thread is already joining on wait_thread_, so this thread needs to do
+ // it. Other threads will need to block on the cond-var.
+ is_block_on_wait_joining_ = true;
+ l.unlock();
+ wait_thread_->Join();
+ l.lock();
+ is_block_on_wait_joining_ = false;
+ wait_thread_.reset();
+ block_on_wait_cv_.notify_all();
+ } else {
+ // Another thread is already joining with wait_thread_. Block on the cond-var
+ // until the Join() executed in the other thread has completed.
+ do {
+ block_on_wait_cv_.wait(l);
+ } while (is_block_on_wait_joining_);
+ }
+}
+
+void ClientRequestState::Wait() {
+ // block until results are ready
+ Status status = WaitInternal();
+ {
+ lock_guard<mutex> l(lock_);
+ if (returns_result_set()) {
+ query_events()->MarkEvent("Rows available");
+ } else {
+ query_events()->MarkEvent("Request finished");
+ }
+ (void) UpdateQueryStatus(status);
+ }
+ if (status.ok()) {
+ UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+ }
+}
+
+Status ClientRequestState::WaitInternal() {
+ // Explain requests have already populated the result set. Nothing to do here.
+ if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
+ MarkInactive();
+ return Status::OK();
+ }
+
+ vector<ChildQuery*> child_queries;
+ Status child_queries_status = child_query_executor_->WaitForAll(&child_queries);
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(query_status_);
+ RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
+ }
+ if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished");
+
+ if (coord_.get() != NULL) {
+ RETURN_IF_ERROR(coord_->Wait());
+ RETURN_IF_ERROR(UpdateCatalog());
+ }
+
+ if (catalog_op_type() == TCatalogOpType::DDL &&
+ ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
+ RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
+ }
+
+ if (!returns_result_set()) {
+ // Queries that do not return a result are finished at this point. This includes
+ // DML operations and a subset of the DDL operations.
+ eos_ = true;
+ } else if (catalog_op_type() == TCatalogOpType::DDL &&
+ ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
+ SetCreateTableAsSelectResultSet();
+ }
+ // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
+ // how long Impala waits for the client to fetch rows. For other statements, track the
+ // time until a Close() is received.
+ MarkInactive();
+ return Status::OK();
+}
+
+Status ClientRequestState::FetchRows(const int32_t max_rows,
+ QueryResultSet* fetched_rows) {
+ // Pause the wait timer, since the client has instructed us to do work on its behalf.
+ MarkActive();
+
+ // ImpalaServer::FetchInternal has already taken our lock_
+ (void) UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
+
+ MarkInactive();
+ return query_status_;
+}
+
+Status ClientRequestState::RestartFetch() {
+ // No result caching for this query. Restart is invalid.
+ if (result_cache_max_size_ <= 0) {
+ return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
+ "Restarting of fetch requires enabling of query result caching."));
+ }
+ // The cache overflowed on a previous fetch.
+ if (result_cache_.get() == NULL) {
+ stringstream ss;
+ ss << "The query result cache exceeded its limit of " << result_cache_max_size_
+ << " rows. Restarting the fetch is not possible.";
+ return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
+ }
+ // Reset fetch state to start over.
+ eos_ = false;
+ num_rows_fetched_ = 0;
+ return Status::OK();
+}
+
+void ClientRequestState::UpdateNonErrorQueryState(
+ beeswax::QueryState::type query_state) {
+ lock_guard<mutex> l(lock_);
+ DCHECK(query_state != beeswax::QueryState::EXCEPTION);
+ if (query_state_ < query_state) query_state_ = query_state;
+}
+
+Status ClientRequestState::UpdateQueryStatus(const Status& status) {
+ // Preserve the first non-ok status
+ if (!status.ok() && query_status_.ok()) {
+ query_state_ = beeswax::QueryState::EXCEPTION;
+ query_status_ = status;
+ summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
+ }
+
+ return status;
+}
+
+Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
+ QueryResultSet* fetched_rows) {
+ DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
+
+ if (eos_) return Status::OK();
+
+ if (request_result_set_ != NULL) {
+ query_state_ = beeswax::QueryState::FINISHED;
+ int num_rows = 0;
+ const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
+ // max_rows <= 0 means no limit
+ while ((num_rows < max_rows || max_rows <= 0)
+ && num_rows_fetched_ < all_rows.size()) {
+ fetched_rows->AddOneRow(all_rows[num_rows_fetched_]);
+ ++num_rows_fetched_;
+ ++num_rows;
+ }
+ eos_ = (num_rows_fetched_ == all_rows.size());
+ return Status::OK();
+ }
+
+ if (coord_.get() == nullptr) {
+ return Status("Client tried to fetch rows on a query that produces no results.");
+ }
+
+ int32_t num_rows_fetched_from_cache = 0;
+ if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
+ // Satisfy the fetch from the result cache if possible.
+ int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
+ num_rows_fetched_from_cache =
+ fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
+ num_rows_fetched_ += num_rows_fetched_from_cache;
+ if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
+ }
+
+ query_state_ = beeswax::QueryState::FINISHED; // results will be ready after this call
+
+ // Maximum number of rows to be fetched from the coord.
+ int32_t max_coord_rows = max_rows;
+ if (max_rows > 0) {
+ DCHECK_LE(num_rows_fetched_from_cache, max_rows);
+ max_coord_rows = max_rows - num_rows_fetched_from_cache;
+ }
+ {
+ SCOPED_TIMER(row_materialization_timer_);
+ size_t before = fetched_rows->size();
+ // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
+ // (already held) ensures that we do not call coord_->GetNext() multiple times
+ // concurrently.
+ // TODO: Simplify this.
+ lock_.unlock();
+ Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
+ lock_.lock();
+ int num_fetched = fetched_rows->size() - before;
+ DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
+ "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
+ num_rows_fetched_ += num_fetched;
+
+ RETURN_IF_ERROR(status);
+ // Check if query status has changed during GetNext() call
+ if (!query_status_.ok()) {
+ eos_ = true;
+ return query_status_;
+ }
+ }
+
+ // Update the result cache if necessary.
+ if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
+ int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache;
+ if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
+ // Set the cache to NULL to indicate that adding the rows fetched from the coord
+ // would exceed the bound of the cache, and therefore, RestartFetch() should fail.
+ ClearResultCache();
+ return Status::OK();
+ }
+
+ // We guess the size of the cache after adding fetched_rows by looking at the size of
+ // fetched_rows itself, and using this estimate to confirm that the memtracker will
+ // allow us to use this much extra memory. In fact, this might be an overestimate, as
+ // the size of two result sets combined into one is not always the size of both result
+ // sets added together (the best example is the null bitset for each column: it might
+ // have only one entry in each result set, and as a result consume two bytes, but when
+ // the result sets are combined, only one byte is needed). Therefore after we add the
+ // new result set into the cache, we need to fix up the memory consumption to the
+ // actual levels to ensure we don't 'leak' bytes that we aren't using.
+ int64_t before = result_cache_->ByteSize();
+
+ // Upper-bound on memory required to add fetched_rows to the cache.
+ int64_t delta_bytes =
+ fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
+ MemTracker* query_mem_tracker = coord_->query_mem_tracker();
+ // Count the cached rows towards the mem limit.
+ if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
+ string details("Failed to allocate memory for result cache.");
+ return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details,
+ delta_bytes);
+ }
+ // Append all rows fetched from the coordinator into the cache.
+ int num_rows_added = result_cache_->AddRows(
+ fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
+
+ int64_t after = result_cache_->ByteSize();
+
+ // Confirm that this was not an underestimate of the memory required.
+ DCHECK_GE(before + delta_bytes, after)
+ << "Combined result sets consume more memory than both individually "
+ << Substitute("(before: $0, delta_bytes: $1, after: $2)",
+ before, delta_bytes, after);
+
+ // Fix up the tracked values
+ if (before + delta_bytes > after) {
+ query_mem_tracker->Release(before + delta_bytes - after);
+ delta_bytes = after - before;
+ }
+
+ // Update result set cache metrics.
+ ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
+ ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
+ }
+
+ return Status::OK();
+}
+
+Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
+ if (check_inflight) {
+ // If the query is in 'inflight_queries' it means that the query has actually started
+ // executing. It is ok if the query is removed from 'inflight_queries' during
+ // cancellation, so we can release the session lock before starting the cancellation
+ // work.
+ lock_guard<mutex> session_lock(session_->lock);
+ if (session_->inflight_queries.find(query_id()) == session_->inflight_queries.end()) {
+ return Status("Query not yet running");
+ }
+ }
+
+ Coordinator* coord;
+ {
+ lock_guard<mutex> lock(lock_);
+ // If the query is completed or cancelled, no need to update state.
+ bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
+ if (!already_done && cause != NULL) {
+ DCHECK(!cause->ok());
+ (void) UpdateQueryStatus(*cause);
+ query_events_->MarkEvent("Cancelled");
+ DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
+ }
+ // Get a copy of the coordinator pointer while holding 'lock_'.
+ coord = coord_.get();
+ is_cancelled_ = true;
+ } // Release lock_ before doing cancellation work.
+
+ // Cancel and close child queries before cancelling parent. 'lock_' should not be held
+ // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation
+ // involves RPCs and can take quite some time.
+ child_query_executor_->Cancel();
+
+ // Cancel the parent query. 'lock_' should not be held because cancellation involves
+ // RPCs and can block for a long time.
+ if (coord != NULL) coord->Cancel(cause);
+ return Status::OK();
+}
+
+Status ClientRequestState::UpdateCatalog() {
+ if (!exec_request().__isset.query_exec_request ||
+ exec_request().query_exec_request.stmt_type != TStmtType::DML) {
+ return Status::OK();
+ }
+
+ query_events_->MarkEvent("DML data written");
+ SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
+
+ TQueryExecRequest query_exec_request = exec_request().query_exec_request;
+ if (query_exec_request.__isset.finalize_params) {
+ const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
+ TUpdateCatalogRequest catalog_update;
+ catalog_update.__set_header(TCatalogServiceRequestHeader());
+ catalog_update.header.__set_requesting_user(effective_user());
+ if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+ VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
+ << query_id() << ")";
+ } else {
+ // TODO: We track partitions written to, not created, which means
+ // that we do more work than is necessary, because written-to
+ // partitions don't always require a metastore change.
+ VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size()
+ << " altered partitions ("
+ << join (catalog_update.created_partitions, ", ") << ")";
+
+ catalog_update.target_table = finalize_params.table_name;
+ catalog_update.db_name = finalize_params.table_db;
+
+ Status cnxn_status;
+ const TNetworkAddress& address =
+ MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
+ CatalogServiceConnection client(
+ exec_env_->catalogd_client_cache(), address, &cnxn_status);
+ RETURN_IF_ERROR(cnxn_status);
+
+ VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
+ TUpdateCatalogResponse resp;
+ RETURN_IF_ERROR(
+ client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp));
+
+ Status status(resp.result.status);
+ if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
+ RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
+ exec_request_.query_options.sync_ddl));
+ }
+ }
+ query_events_->MarkEvent("DML Metastore update finished");
+ return Status::OK();
+}
+
+void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) {
+ if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
+ result_metadata_ = ddl_resp->result_set.schema;
+ request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows));
+ }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& results) {
+ request_result_set_.reset(new vector<TResultRow>);
+ request_result_set_->resize(results.size());
+ for (int i = 0; i < results.size(); ++i) {
+ (*request_result_set_.get())[i].__isset.colVals = true;
+ (*request_result_set_.get())[i].colVals.resize(1);
+ (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
+ }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& col1,
+ const vector<string>& col2) {
+ DCHECK_EQ(col1.size(), col2.size());
+
+ request_result_set_.reset(new vector<TResultRow>);
+ request_result_set_->resize(col1.size());
+ for (int i = 0; i < col1.size(); ++i) {
+ (*request_result_set_.get())[i].__isset.colVals = true;
+ (*request_result_set_.get())[i].colVals.resize(2);
+ (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
+ (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
+ }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& col1,
+ const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) {
+ DCHECK_EQ(col1.size(), col2.size());
+ DCHECK_EQ(col1.size(), col3.size());
+ DCHECK_EQ(col1.size(), col4.size());
+
+ request_result_set_.reset(new vector<TResultRow>);
+ request_result_set_->resize(col1.size());
+ for (int i = 0; i < col1.size(); ++i) {
+ (*request_result_set_.get())[i].__isset.colVals = true;
+ (*request_result_set_.get())[i].colVals.resize(4);
+ (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
+ (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
+ (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
+ (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
+ }
+}
+
+void ClientRequestState::SetCreateTableAsSelectResultSet() {
+ DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
+ int64_t total_num_rows_inserted = 0;
+ // There will only be rows inserted in the case a new table was created as part of this
+ // operation.
+ if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
+ DCHECK(coord_.get());
+ for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
+ total_num_rows_inserted += p.second.num_modified_rows;
+ }
+ }
+ const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
+ VLOG_QUERY << summary_msg;
+ vector<string> results(1, summary_msg);
+ SetResultSet(results);
+}
+
+void ClientRequestState::MarkInactive() {
+ client_wait_sw_.Start();
+ lock_guard<mutex> l(expiration_data_lock_);
+ last_active_time_ms_ = UnixMillis();
+ DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
+ --ref_count_;
+}
+
+void ClientRequestState::MarkActive() {
+ client_wait_sw_.Stop();
+ int64_t elapsed_time = client_wait_sw_.ElapsedTime();
+ client_wait_timer_->Set(elapsed_time);
+ lock_guard<mutex> l(expiration_data_lock_);
+ last_active_time_ms_ = UnixMillis();
+ ++ref_count_;
+}
+
+Status ClientRequestState::UpdateTableAndColumnStats(
+ const vector<ChildQuery*>& child_queries) {
+ DCHECK_GE(child_queries.size(), 1);
+ DCHECK_LE(child_queries.size(), 2);
+ catalog_op_executor_.reset(
+ new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+
+ // If there was no column stats query, pass in empty thrift structures to
+ // ExecComputeStats(). Otherwise pass in the column stats result.
+ TTableSchema col_stats_schema;
+ TRowSet col_stats_data;
+ if (child_queries.size() > 1) {
+ col_stats_schema = child_queries[1]->result_schema();
+ col_stats_data = child_queries[1]->result_data();
+ }
+
+ Status status = catalog_op_executor_->ExecComputeStats(
+ exec_request_.catalog_op_request.ddl_params.compute_stats_params,
+ child_queries[0]->result_schema(),
+ child_queries[0]->result_data(),
+ col_stats_schema,
+ col_stats_data);
+ {
+ lock_guard<mutex> l(lock_);
+ RETURN_IF_ERROR(UpdateQueryStatus(status));
+ }
+ RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+ *catalog_op_executor_->update_catalog_result(),
+ exec_request_.query_options.sync_ddl));
+
+ // Set the results to be reported to the client.
+ SetResultSet(catalog_op_executor_->ddl_exec_response());
+ query_events_->MarkEvent("Metastore update finished");
+ return Status::OK();
+}
+
+void ClientRequestState::ClearResultCache() {
+ if (result_cache_ == NULL) return;
+ // Update result set cache metrics and mem limit accounting.
+ ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
+ int64_t total_bytes = result_cache_->ByteSize();
+ ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
+ if (coord_ != NULL) {
+ DCHECK(coord_->query_mem_tracker() != NULL);
+ coord_->query_mem_tracker()->Release(total_bytes);
+ }
+ result_cache_.reset(NULL);
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
new file mode 100644
index 0000000..0e18957
--- /dev/null
+++ b/be/src/service/client-request-state.h
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_CLIENT_REQUEST_STATE_H
+#define IMPALA_SERVICE_CLIENT_REQUEST_STATE_H
+
+#include "common/status.h"
+#include "exec/catalog-op-executor.h"
+#include "runtime/timestamp-value.h"
+#include "scheduling/query-schedule.h"
+#include "service/child-query.h"
+#include "service/impala-server.h"
+#include "util/auth-util.h"
+#include "util/runtime-profile.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include <boost/thread.hpp>
+#include <boost/unordered_set.hpp>
+#include <vector>
+
+namespace impala {
+
+class ExecEnv;
+class Coordinator;
+class RuntimeState;
+class RowBatch;
+class Expr;
+class TupleRow;
+class Frontend;
+class ClientRequestStateCleaner;
+
+/// Execution state of the client-facing side a query. This captures everything
+/// necessary to convert row batches received by the coordinator into results
+/// we can return to the client. It also captures all state required for
+/// servicing query-related requests from the client.
+/// Thread safety: this class is generally not thread-safe, callers need to
+/// synchronize access explicitly via lock(). See the ImpalaServer class comment for
+/// the required lock acquisition order.
+///
+/// TODO: Compute stats is the only stmt that requires child queries. Once the
+/// CatalogService performs background stats gathering the concept of child queries
+/// will likely become obsolete. Remove all child-query related code from this class.
+class ClientRequestState {
+ public:
+ ClientRequestState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
+ ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session);
+
+ ~ClientRequestState();
+
+ /// Initiates execution of a exec_request.
+ /// Non-blocking.
+ /// Must *not* be called with lock_ held.
+ Status Exec(TExecRequest* exec_request) WARN_UNUSED_RESULT;
+
+ /// Execute a HiveServer2 metadata operation
+ /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different
+ /// code paths.
+ Status Exec(const TMetadataOpRequest& exec_request) WARN_UNUSED_RESULT;
+
+ /// Call this to ensure that rows are ready when calling FetchRows(). Updates the
+ /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded
+ /// by call to Exec(). Waits for all child queries to complete. Takes lock_.
+ void Wait();
+
+ /// Calls Wait() asynchronously in a thread and returns immediately.
+ void WaitAsync();
+
+ /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
+ /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
+ /// from multiple threads (all threads will block until wait_thread_ has completed)
+ /// and multiple times (non-blocking once wait_thread_ has completed). Do not call
+ /// while holding lock_.
+ void BlockOnWait();
+
+ /// Return at most max_rows from the current batch. If the entire current batch has
+ /// been returned, fetch another batch first.
+ /// Caller needs to hold fetch_rows_lock_ and lock_.
+ /// Caller should verify that EOS has not be reached before calling.
+ /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
+ /// Also updates query_state_/status_ in case of error.
+ Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows)
+ WARN_UNUSED_RESULT;
+
+ /// Resets the state of this query such that the next fetch() returns results from the
+ /// beginning of the query result set (by using the using result_cache_).
+ /// It is valid to call this function for any type of statement that returns a result
+ /// set, including queries, show stmts, compute stats, etc.
+ /// Returns a recoverable error status if the restart is not possible, ok() otherwise.
+ /// The error is recoverable to allow clients to resume fetching.
+ /// The caller must hold fetch_rows_lock_ and lock_.
+ Status RestartFetch() WARN_UNUSED_RESULT;
+
+ /// Update query state if the requested state isn't already obsolete. This is only for
+ /// non-error states - if the query encounters an error the query status needs to be set
+ /// with information about the error so UpdateQueryStatus must be used instead.
+ /// Takes lock_.
+ void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
+
+ /// Update the query status and the "Query Status" summary profile string.
+ /// If current status is already != ok, no update is made (we preserve the first error)
+ /// If called with a non-ok argument, the expectation is that the query will be aborted
+ /// quickly.
+ /// Returns the status argument (so we can write
+ /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())).
+ /// Does not take lock_, but requires it: caller must ensure lock_
+ /// is taken before calling UpdateQueryStatus
+ Status UpdateQueryStatus(const Status& status) WARN_UNUSED_RESULT;
+
+ /// Cancels the child queries and the coordinator with the given cause.
+ /// If cause is NULL, assume this was deliberately cancelled by the user.
+ /// Otherwise, sets state to EXCEPTION.
+ /// Does nothing if the query has reached EOS or already cancelled.
+ ///
+ /// Only returns an error if 'check_inflight' is true and the query is not yet
+ /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
+ /// in-flight (for cleaning up after an error on the query issuing path).
+ Status Cancel(bool check_inflight, const Status* cause) WARN_UNUSED_RESULT;
+
+ /// This is called when the query is done (finished, cancelled, or failed).
+ /// Takes lock_: callers must not hold lock() before calling.
+ void Done();
+
+ /// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
+ /// The given cache is owned by this query exec state, even if an error is returned.
+ /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum.
+ Status SetResultCache(QueryResultSet* cache, int64_t max_size) WARN_UNUSED_RESULT;
+
+ ImpalaServer::SessionState* session() const { return session_.get(); }
+
+ /// Queries are run and authorized on behalf of the effective_user.
+ const std::string& effective_user() const {
+ return GetEffectiveUser(query_ctx_.session);
+ }
+ const std::string& connected_user() const { return query_ctx_.session.connected_user; }
+ const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
+ TSessionType::type session_type() const { return query_ctx_.session.session_type; }
+ const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
+ const std::string& default_db() const { return query_ctx_.session.database; }
+ bool eos() const { return eos_; }
+ Coordinator* coord() const { return coord_.get(); }
+ QuerySchedule* schedule() { return schedule_.get(); }
+
+ /// Resource pool associated with this query, or an empty string if the schedule has not
+ /// been created and had the pool set yet, or this StmtType doesn't go through admission
+ /// control.
+ std::string request_pool() const {
+ return schedule_ == nullptr ? "" : schedule_->request_pool();
+ }
+ int num_rows_fetched() const { return num_rows_fetched_; }
+ void set_fetched_rows() { fetched_rows_ = true; }
+ bool fetched_rows() const { return fetched_rows_; }
+ bool returns_result_set() { return !result_metadata_.columns.empty(); }
+ const TResultSetMetadata* result_metadata() { return &result_metadata_; }
+ const TUniqueId& query_id() const { return query_ctx_.query_id; }
+ const TExecRequest& exec_request() const { return exec_request_; }
+ TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
+ TCatalogOpType::type catalog_op_type() const {
+ return exec_request_.catalog_op_request.op_type;
+ }
+ TDdlType::type ddl_type() const {
+ return exec_request_.catalog_op_request.ddl_params.ddl_type;
+ }
+ boost::mutex* lock() { return &lock_; }
+ boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
+ beeswax::QueryState::type query_state() const { return query_state_; }
+ const Status& query_status() const { return query_status_; }
+ void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
+ const RuntimeProfile& profile() const { return profile_; }
+ const RuntimeProfile& summary_profile() const { return summary_profile_; }
+ const TimestampValue& start_time() const { return start_time_; }
+ const TimestampValue& end_time() const { return end_time_; }
+ const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
+ const TQueryOptions& query_options() const {
+ return query_ctx_.client_request.query_options;
+ }
+ /// Returns 0:0 if this is a root query
+ TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
+
+ const std::vector<std::string>& GetAnalysisWarnings() const {
+ return exec_request_.analysis_warnings;
+ }
+
+ inline int64_t last_active_ms() const {
+ boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+ return last_active_time_ms_;
+ }
+
+ /// Returns true if Impala is actively processing this query.
+ inline bool is_active() const {
+ boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+ return ref_count_ > 0;
+ }
+
+ RuntimeProfile::EventSequence* query_events() const { return query_events_; }
+ RuntimeProfile* summary_profile() { return &summary_profile_; }
+
+ private:
+ const TQueryCtx query_ctx_;
+
+ /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are
+ /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_
+ /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during
+ /// its execution).
+ /// See "Locking" in the class comment for lock acquisition order.
+ boost::mutex fetch_rows_lock_;
+
+ /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
+ /// - no other locks should be acquired while holding this lock.
+ mutable boost::mutex expiration_data_lock_;
+
+ /// Stores the last time that the query was actively doing work, in Unix milliseconds.
+ int64_t last_active_time_ms_;
+
+ /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
+ /// time a client instructs Impala to do work on behalf of this query, the ref count is
+ /// increased, and decreased once that work is completed.
+ uint32_t ref_count_;
+
+ /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
+ const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
+
+ /// Protects all following fields. Acquirers should be careful not to hold it for too
+ /// long, e.g. during RPCs because this lock is required to make progress on various
+ /// ImpalaServer requests. If held for too long it can block progress of client
+ /// requests for this query, e.g. query status and cancellation. Furthermore, until
+ /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries.
+ /// See "Locking" in the class comment for lock acquisition order.
+ boost::mutex lock_;
+
+ /// TODO: remove and use ExecEnv::GetInstance() instead
+ ExecEnv* exec_env_;
+
+ /// Thread for asynchronously running Wait().
+ boost::scoped_ptr<Thread> wait_thread_;
+
+ /// Condition variable to make BlockOnWait() thread-safe. One thread joins
+ /// wait_thread_, and all other threads block on this cv. Used with lock_.
+ boost::condition_variable block_on_wait_cv_;
+
+ /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
+ bool is_block_on_wait_joining_;
+
+ /// Session that this query is from
+ std::shared_ptr<ImpalaServer::SessionState> session_;
+
+ /// Resource assignment determined by scheduler. Owned by obj_pool_.
+ boost::scoped_ptr<QuerySchedule> schedule_;
+
+ /// Not set for ddl queries.
+ boost::scoped_ptr<Coordinator> coord_;
+
+ /// Runs statements that query or modify the catalog via the CatalogService.
+ boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
+
+ /// Result set used for requests that return results and are not QUERY
+ /// statements. For example, EXPLAIN, LOAD, and SHOW use this.
+ boost::scoped_ptr<std::vector<TResultRow>> request_result_set_;
+
+ /// Cache of the first result_cache_max_size_ query results to allow clients to restart
+ /// fetching from the beginning of the result set. This cache is appended to in
+ /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded,
+ /// then clients cannot restart fetching because some results have been lost since the
+ /// last fetch. Only set if result_cache_max_size_ > 0.
+ boost::scoped_ptr<QueryResultSet> result_cache_;
+
+ /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
+ int64_t result_cache_max_size_;
+
+ ObjectPool profile_pool_;
+
+ /// The ClientRequestState builds three separate profiles.
+ /// * profile_ is the top-level profile which houses the other
+ /// profiles, plus the query timeline
+ /// * summary_profile_ contains mostly static information about the
+ /// query, including the query statement, the plan and the user who submitted it.
+ /// * server_profile_ tracks time spent inside the ImpalaServer,
+ /// but not inside fragment execution, i.e. the time taken to
+ /// register and set-up the query and for rows to be fetched.
+ //
+ /// There's a fourth profile which is not built here (but is a
+ /// child of profile_); the execution profile which tracks the
+ /// actual fragment execution.
+ RuntimeProfile profile_;
+ RuntimeProfile server_profile_;
+ RuntimeProfile summary_profile_;
+ RuntimeProfile::Counter* row_materialization_timer_;
+
+ /// Tracks how long we are idle waiting for a client to fetch rows.
+ RuntimeProfile::Counter* client_wait_timer_;
+ /// Timer to track idle time for the above counter.
+ MonotonicStopWatch client_wait_sw_;
+
+ RuntimeProfile::EventSequence* query_events_;
+
+ bool is_cancelled_; // if true, Cancel() was called.
+ bool eos_; // if true, there are no more rows to return
+ // We enforce the invariant that query_status_ is not OK iff query_state_
+ // is EXCEPTION, given that lock_ is held.
+ beeswax::QueryState::type query_state_;
+ Status query_status_;
+ TExecRequest exec_request_;
+
+ TResultSetMetadata result_metadata_; // metadata for select query
+ RowBatch* current_batch_; // the current row batch; only applicable if coord is set
+ int current_batch_row_; // number of rows fetched within the current batch
+ int num_rows_fetched_; // number of rows fetched by client for the entire query
+
+ /// True if a fetch was attempted by a client, regardless of whether a result set
+ /// (or error) was returned to the client.
+ bool fetched_rows_;
+
+ /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
+ Frontend* frontend_;
+
+ /// The parent ImpalaServer; called to wait until the the impalad has processed a
+ /// catalog update request. Not owned.
+ ImpalaServer* parent_server_;
+
+ /// Start/end time of the query
+ TimestampValue start_time_, end_time_;
+
+ /// Executes a local catalog operation (an operation that does not need to execute
+ /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
+ Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT;
+
+ /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not
+ /// doing any work. Takes expiration_data_lock_.
+ void MarkInactive();
+
+ /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being
+ /// actively processed. Takes expiration_data_lock_.
+ void MarkActive();
+
+ /// Core logic of initiating a query or dml execution request.
+ /// Initiates execution of plan fragments, if there are any, and sets
+ /// up the output exprs for subsequent calls to FetchRows().
+ /// 'coord_' is only valid after this method is called, and may be invalid if it
+ /// returns an error.
+ /// Also sets up profile and pre-execution counters.
+ /// Non-blocking.
+ Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request)
+ WARN_UNUSED_RESULT;
+
+ /// Core logic of executing a ddl statement. May internally initiate execution of
+ /// queries (e.g., compute stats) or dml (e.g., create table as select)
+ Status ExecDdlRequest() WARN_UNUSED_RESULT;
+
+ /// Executes a LOAD DATA
+ Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
+
+ /// Core logic of Wait(). Does not update query_state_/status_.
+ Status WaitInternal() WARN_UNUSED_RESULT;
+
+ /// Core logic of FetchRows(). Does not update query_state_/status_.
+ /// Caller needs to hold fetch_rows_lock_ and lock_.
+ Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows)
+ WARN_UNUSED_RESULT;
+
+ /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
+ /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
+ /// result and scales must have been resized to the number of columns before call.
+ Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales)
+ WARN_UNUSED_RESULT;
+
+ /// Gather and publish all required updates to the metastore
+ Status UpdateCatalog() WARN_UNUSED_RESULT;
+
+ /// Copies results into request_result_set_
+ /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary
+ void SetResultSet(const TDdlExecResponse* ddl_resp);
+ void SetResultSet(const std::vector<std::string>& results);
+ void SetResultSet(const std::vector<std::string>& col1,
+ const std::vector<std::string>& col2);
+ void SetResultSet(const std::vector<std::string>& col1,
+ const std::vector<std::string>& col2, const std::vector<std::string>& col3,
+ const std::vector<std::string>& col4);
+
+ /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be
+ /// ready until all BEs complete execution. This can be called as part of Wait(),
+ /// at which point results will be avilable.
+ void SetCreateTableAsSelectResultSet();
+
+ /// Updates the metastore's table and column statistics based on the child-query results
+ /// of a compute stats command.
+ /// TODO: Unify the various ways that the Metastore is updated for DDL/DML.
+ /// For example, INSERT queries update partition metadata in UpdateCatalog() using a
+ /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar
+ /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well.
+ Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries)
+ WARN_UNUSED_RESULT;
+
+ /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption.
+ /// This function is a no-op if the cache has already been cleared.
+ void ClearResultCache();
+};
+
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 8f52352..bb35089 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -103,10 +103,11 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
// Allow logging of at least one error, so we can detect and convert it into a
// Java exception.
query_ctx.client_request.query_options.max_errors = 1;
-
// Track memory against a dummy "fe-eval-exprs" resource pool - we don't
// know what resource pool the query has been assigned to yet.
- RuntimeState state(query_ctx, ExecEnv::GetInstance(), "fe-eval-exprs");
+ query_ctx.request_pool = "fe-eval-exprs";
+
+ RuntimeState state(query_ctx, ExecEnv::GetInstance());
// Make sure to close the runtime state no matter how this scope is exited.
const auto close_runtime_state =
MakeScopeExitTrigger([&state]() { state.ReleaseResources(); });
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index bd4ee67..67ecc79 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -25,7 +25,7 @@
#include "runtime/exec-env.h"
#include "runtime/raw-value.inline.h"
#include "runtime/timestamp-value.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
#include "service/query-options.h"
#include "service/query-result-set.h"
#include "util/impalad-metrics.h"
@@ -64,22 +64,22 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
// raise Syntax error or access violation; it's likely to be syntax/analysis error
// TODO: that may not be true; fix this
- shared_ptr<QueryExecState> exec_state;
- RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
+ shared_ptr<ClientRequestState> request_state;
+ RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
- exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+ request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
// start thread to wait for results to become available, which will allow
// us to advance query state to FINISHED or EXCEPTION
- exec_state->WaitAsync();
+ request_state->WaitAsync();
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
- Status status = SetQueryInflight(session, exec_state);
+ Status status = SetQueryInflight(session, request_state);
if (!status.ok()) {
- UnregisterQuery(exec_state->query_id(), false, &status);
+ (void) UnregisterQuery(request_state->query_id(), false, &status);
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
- TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
+ TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
}
void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
@@ -94,7 +94,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
// raise general error for request conversion error;
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
- shared_ptr<QueryExecState> exec_state;
+ shared_ptr<ClientRequestState> request_state;
DCHECK(session != NULL); // The session should exist.
{
// The session is created when the client connects. Depending on the underlying
@@ -106,27 +106,27 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
// raise Syntax error or access violation; it's likely to be syntax/analysis error
// TODO: that may not be true; fix this
- RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
+ RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
- exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+ request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
// Once the query is running do a final check for session closure and add it to the
// set of in-flight queries.
- Status status = SetQueryInflight(session, exec_state);
+ Status status = SetQueryInflight(session, request_state);
if (!status.ok()) {
- UnregisterQuery(exec_state->query_id(), false, &status);
+ (void) UnregisterQuery(request_state->query_id(), false, &status);
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
// block until results are ready
- exec_state->Wait();
- status = exec_state->query_status();
+ request_state->Wait();
+ status = request_state->query_status();
if (!status.ok()) {
- UnregisterQuery(exec_state->query_id(), false, &status);
+ (void) UnregisterQuery(request_state->query_id(), false, &status);
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
- exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
- TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
+ request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+ TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
// If the input log context id is an empty string, then create a new number and
// set it to _return. Otherwise, set _return with the input log context
@@ -172,7 +172,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
VLOG_ROW << "fetch result: #results=" << query_results.data.size()
<< " has_more=" << (query_results.has_more ? "true" : "false");
if (!status.ok()) {
- UnregisterQuery(query_id, false, &status);
+ (void) UnregisterQuery(query_id, false, &status);
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
}
}
@@ -188,18 +188,18 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
- shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
- if (UNLIKELY(exec_state.get() == nullptr)) {
+ shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+ if (UNLIKELY(request_state.get() == nullptr)) {
RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
SQLSTATE_GENERAL_ERROR);
}
{
- // make sure we release the lock on exec_state if we see any error
- lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
+ // make sure we release the lock on request_state if we see any error
+ lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
// Convert TResultSetMetadata to Beeswax.ResultsMetadata
- const TResultSetMetadata* result_set_md = exec_state->result_metadata();
+ const TResultSetMetadata* result_set_md = request_state->result_metadata();
results_metadata.__isset.schema = true;
results_metadata.schema.__isset.fieldSchemas = true;
results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
@@ -232,7 +232,7 @@ void ImpalaServer::close(const QueryHandle& handle) {
QueryHandleToTUniqueId(handle, &query_id);
VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
// TODO: do we need to raise an exception if the query state is EXCEPTION?
- // TODO: use timeout to get rid of unwanted exec_state.
+ // TODO: use timeout to get rid of unwanted request_state.
RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
}
@@ -244,9 +244,9 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
QueryHandleToTUniqueId(handle, &query_id);
VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
- lock_guard<mutex> l(query_exec_state_map_lock_);
- QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
- if (entry != query_exec_state_map_.end()) {
+ lock_guard<mutex> l(client_request_state_map_lock_);
+ ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
+ if (entry != client_request_state_map_.end()) {
return entry->second->query_state();
} else {
VLOG_QUERY << "ImpalaServer::get_state invalid handle";
@@ -277,8 +277,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
TUniqueId query_id;
QueryHandleToTUniqueId(handle, &query_id);
- shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
- if (exec_state.get() == NULL) {
+ shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+ if (request_state.get() == NULL) {
stringstream str;
str << "unknown query id: " << query_id;
LOG(ERROR) << str.str();
@@ -286,17 +286,17 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
}
stringstream error_log_ss;
// If the query status is !ok, include the status error message at the top of the log.
- if (!exec_state->query_status().ok()) {
- error_log_ss << exec_state->query_status().GetDetail() << "\n";
+ if (!request_state->query_status().ok()) {
+ error_log_ss << request_state->query_status().GetDetail() << "\n";
}
// Add warnings from analysis
- error_log_ss << join(exec_state->GetAnalysisWarnings(), "\n");
+ error_log_ss << join(request_state->GetAnalysisWarnings(), "\n");
// Add warnings from execution
- if (exec_state->coord() != NULL) {
- if (!exec_state->query_status().ok()) error_log_ss << "\n\n";
- error_log_ss << exec_state->coord()->GetErrorLog();
+ if (request_state->coord() != NULL) {
+ if (!request_state->query_status().ok()) error_log_ss << "\n\n";
+ error_log_ss << request_state->coord()->GetErrorLog();
}
log = error_log_ss.str();
}
@@ -455,30 +455,31 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
- shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
- if (UNLIKELY(exec_state == nullptr)) {
+ shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+ if (UNLIKELY(request_state == nullptr)) {
return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
}
- // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
- // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
- // which are evaluated in QueryExecState::FetchRows() below).
- exec_state->BlockOnWait();
+ // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
+ // ensures that rows are ready to be fetched (e.g., Wait() opens
+ // ClientRequestState::output_exprs_, which are evaluated in
+ // ClientRequestState::FetchRows() below).
+ request_state->BlockOnWait();
- lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
- lock_guard<mutex> l(*exec_state->lock());
+ lock_guard<mutex> frl(*request_state->fetch_rows_lock());
+ lock_guard<mutex> l(*request_state->lock());
- if (exec_state->num_rows_fetched() == 0) {
- exec_state->query_events()->MarkEvent("First row fetched");
- exec_state->set_fetched_rows();
+ if (request_state->num_rows_fetched() == 0) {
+ request_state->query_events()->MarkEvent("First row fetched");
+ request_state->set_fetched_rows();
}
// Check for cancellation or an error.
- RETURN_IF_ERROR(exec_state->query_status());
+ RETURN_IF_ERROR(request_state->query_status());
// ODBC-190: set Beeswax's Results.columns to work around bug ODBC-190;
// TODO: remove the block of code when ODBC-190 is resolved.
- const TResultSetMetadata* result_metadata = exec_state->result_metadata();
+ const TResultSetMetadata* result_metadata = request_state->result_metadata();
query_results->columns.resize(result_metadata->columns.size());
for (int i = 0; i < result_metadata->columns.size(); ++i) {
// TODO: As of today, the ODBC driver does not support boolean and timestamp data
@@ -498,16 +499,16 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
query_results->__set_ready(true);
// It's likely that ODBC doesn't care about start_row, but Hue needs it. For Hue,
// start_row starts from zero, not one.
- query_results->__set_start_row(exec_state->num_rows_fetched());
+ query_results->__set_start_row(request_state->num_rows_fetched());
Status fetch_rows_status;
query_results->data.clear();
- if (!exec_state->eos()) {
+ if (!request_state->eos()) {
scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
- *exec_state->result_metadata(), &query_results->data));
- fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get());
+ *request_state->result_metadata(), &query_results->data));
+ fetch_rows_status = request_state->FetchRows(fetch_size, result_set.get());
}
- query_results->__set_has_more(!exec_state->eos());
+ query_results->__set_has_more(!request_state->eos());
query_results->__isset.data = true;
return fetch_rows_status;
@@ -515,15 +516,15 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
TInsertResult* insert_result) {
- shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
- if (UNLIKELY(exec_state == nullptr)) {
+ shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+ if (UNLIKELY(request_state == nullptr)) {
return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
}
Status query_status;
{
- lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
- query_status = exec_state->query_status();
+ lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
+ query_status = request_state->query_status();
if (query_status.ok()) {
// Coord may be NULL for a SELECT with LIMIT 0.
// Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
@@ -531,9 +532,9 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
// coordinator, depending on how we choose to drive the table sink.
int64_t num_row_errors = 0;
bool has_kudu_stats = false;
- if (exec_state->coord() != NULL) {
+ if (request_state->coord() != NULL) {
for (const PartitionStatusMap::value_type& v:
- exec_state->coord()->per_partition_status()) {
+ request_state->coord()->per_partition_status()) {
const pair<string, TInsertPartitionStatus> partition_status = v;
insert_result->rows_modified[partition_status.first] =
partition_status.second.num_modified_rows;