You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/05/09 15:56:00 UTC

[06/13] incubator-impala git commit: IMPALA-2550: Switch to per-query exec rpc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt
index c886bf4..cfb0bf3 100644
--- a/be/src/service/CMakeLists.txt
+++ b/be/src/service/CMakeLists.txt
@@ -30,7 +30,7 @@ add_library(Service
   impala-hs2-server.cc
   impala-beeswax-server.cc
   impala-internal-service.cc
-  query-exec-state.cc
+  client-request-state.cc
   query-options.cc
   query-result-set.cc
   child-query.cc

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.cc
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index 5b68de4..f58aacc 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -17,7 +17,7 @@
 
 #include "service/child-query.h"
 #include "service/impala-server.inline.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
 #include "service/query-options.h"
 #include "util/debug-util.h"
 
@@ -34,7 +34,7 @@ const string ChildQuery::PARENT_QUERY_OPT = "impala.parent_query_id";
 // any HS2 "RPC" into the impala server. It is important not to hold any locks (in
 // particular the parent query's lock_) while invoking HS2 functions to avoid deadlock.
 Status ChildQuery::ExecAndFetch() {
-  const TUniqueId& session_id = parent_exec_state_->session_id();
+  const TUniqueId& session_id = parent_request_state_->session_id();
   VLOG_QUERY << "Executing child query: " << query_ << " in session "
              << PrintId(session_id);
 
@@ -45,8 +45,9 @@ Status ChildQuery::ExecAndFetch() {
   ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id,
       &exec_stmt_req.sessionHandle.sessionId);
   exec_stmt_req.__set_statement(query_);
-  SetQueryOptions(parent_exec_state_->exec_request().query_options, &exec_stmt_req);
-  exec_stmt_req.confOverlay[PARENT_QUERY_OPT] = PrintId(parent_exec_state_->query_id());
+  SetQueryOptions(parent_request_state_->exec_request().query_options, &exec_stmt_req);
+  exec_stmt_req.confOverlay[PARENT_QUERY_OPT] =
+      PrintId(parent_request_state_->query_id());
 
   // Starting executing of the child query and setting is_running are not made atomic
   // because holding a lock while calling into the parent_server_ may result in deadlock.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/child-query.h
----------------------------------------------------------------------
diff --git a/be/src/service/child-query.h b/be/src/service/child-query.h
index 1c7a20e..36f6197 100644
--- a/be/src/service/child-query.h
+++ b/be/src/service/child-query.h
@@ -39,25 +39,25 @@ class ImpalaServer;
 //
 /// Parent queries are expected to call ExecAndWait() of a child query in a
 /// separate thread, and then join that thread to wait for child-query completion.
-/// The parent QueryExecState is independent of the child query's QueryExecState,
+/// The parent ClientRequestState is independent of the child query's ClientRequestState,
 /// with the exception that the child query selectively checks the parent's status
 /// for failure/cancellation detection. Child queries should never call into their
-/// parent's QueryExecState to avoid deadlock.
+/// parent's ClientRequestState to avoid deadlock.
 //
 /// TODO: Compute stats is the only stmt that requires child queries. Once the
 /// CatalogService performs background stats gathering the concept of child queries
 /// will likely become obsolete. Remove this class and all child-query related code.
 class ChildQuery {
  public:
-  ChildQuery(const std::string& query, ImpalaServer::QueryExecState* parent_exec_state,
+  ChildQuery(const std::string& query, ClientRequestState* parent_request_state,
       ImpalaServer* parent_server)
     : query_(query),
-      parent_exec_state_(parent_exec_state),
+      parent_request_state_(parent_request_state),
       parent_server_(parent_server),
       is_running_(false),
       is_cancelled_(false) {
     DCHECK(!query_.empty());
-    DCHECK(parent_exec_state_ != NULL);
+    DCHECK(parent_request_state_ != NULL);
     DCHECK(parent_server_ != NULL);
   }
 
@@ -65,7 +65,7 @@ class ChildQuery {
   /// (boost::mutex's operator= and copy c'tor are private)
   ChildQuery(const ChildQuery& other)
     : query_(other.query_),
-      parent_exec_state_(other.parent_exec_state_),
+      parent_request_state_(other.parent_request_state_),
       parent_server_(other.parent_server_),
       is_running_(other.is_running_),
       is_cancelled_(other.is_cancelled_) {}
@@ -74,7 +74,7 @@ class ChildQuery {
   /// (boost::mutex's operator= and copy c'tor are private)
   ChildQuery& operator=(const ChildQuery& other) {
     query_ = other.query_;
-    parent_exec_state_ = other.parent_exec_state_;
+    parent_request_state_ = other.parent_request_state_;
     parent_server_ = other.parent_server_;
     is_running_ = other.is_running_;
     is_cancelled_ = other.is_cancelled_;
@@ -85,11 +85,11 @@ class ChildQuery {
   Status ExecAndFetch();
 
   /// Cancels and closes the given child query if it is running. Sets is_cancelled_.
-  /// Child queries can be cancelled by the parent query through QueryExecState::Cancel().
+  /// Child queries can be cancelled by the parent query through ClientRequestState::Cancel().
   /// Child queries should never cancel their parent to avoid deadlock (but the parent
   /// query may decide to cancel itself based on a non-OK status from a child query).
-  /// Note that child queries have a different QueryExecState than their parent query,
-  /// so cancellation of a child query does not call into the parent's QueryExecState.
+  /// Note that child queries have a different ClientRequestState than their parent query,
+  /// so cancellation of a child query does not call into the parent's ClientRequestState.
   void Cancel();
 
   const apache::hive::service::cli::thrift::TTableSchema& result_schema() {
@@ -119,7 +119,7 @@ class ChildQuery {
 
   /// Execution state of parent query. Used to synchronize and propagate parent
   /// cancellations/failures to this child query. Not owned.
-  ImpalaServer::QueryExecState* parent_exec_state_;
+  ClientRequestState* parent_request_state_;
 
   /// Parent Impala server used for executing this child query. Not owned.
   ImpalaServer* parent_server_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
new file mode 100644
index 0000000..2f447c7
--- /dev/null
+++ b/be/src/service/client-request-state.cc
@@ -0,0 +1,1085 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "service/client-request-state.h"
+
+#include <limits>
+#include <gutil/strings/substitute.h>
+
+#include "exprs/expr-context.h"
+#include "exprs/expr.h"
+#include "runtime/mem-tracker.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+#include "runtime/exec-env.h"
+#include "scheduling/admission-controller.h"
+#include "scheduling/scheduler.h"
+#include "service/frontend.h"
+#include "service/impala-server.h"
+#include "service/query-options.h"
+#include "service/query-result-set.h"
+#include "util/debug-util.h"
+#include "util/impalad-metrics.h"
+#include "util/runtime-profile-counters.h"
+#include "util/time.h"
+
+#include "gen-cpp/CatalogService.h"
+#include "gen-cpp/CatalogService_types.h"
+
+#include <thrift/Thrift.h>
+
+#include "common/names.h"
+
+using boost::algorithm::join;
+using namespace apache::hive::service::cli::thrift;
+using namespace apache::thrift;
+using namespace beeswax;
+using namespace strings;
+
+DECLARE_int32(catalog_service_port);
+DECLARE_string(catalog_service_host);
+DECLARE_int64(max_result_cache_size);
+
+namespace impala {
+
+// Keys into the info string map of the runtime profile referring to specific
+// items used by CM for monitoring purposes.
+static const string PER_HOST_MEM_KEY = "Estimated Per-Host Mem";
+static const string PER_HOST_MEMORY_RESERVATION_KEY = "Per-Host Memory Reservation";
+static const string TABLES_MISSING_STATS_KEY = "Tables Missing Stats";
+static const string TABLES_WITH_CORRUPT_STATS_KEY = "Tables With Corrupt Table Stats";
+static const string TABLES_WITH_MISSING_DISK_IDS_KEY = "Tables With Missing Disk Ids";
+
+ClientRequestState::ClientRequestState(
+    const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
+    ImpalaServer* server, shared_ptr<ImpalaServer::SessionState> session)
+  : query_ctx_(query_ctx),
+    last_active_time_ms_(numeric_limits<int64_t>::max()),
+    ref_count_(0L),
+    child_query_executor_(new ChildQueryExecutor),
+    exec_env_(exec_env),
+    is_block_on_wait_joining_(false),
+    session_(session),
+    schedule_(NULL),
+    coord_(NULL),
+    result_cache_max_size_(-1),
+    profile_(&profile_pool_, "Query"), // assign name w/ id after planning
+    server_profile_(&profile_pool_, "ImpalaServer"),
+    summary_profile_(&profile_pool_, "Summary"),
+    is_cancelled_(false),
+    eos_(false),
+    query_state_(beeswax::QueryState::CREATED),
+    current_batch_(NULL),
+    current_batch_row_(0),
+    num_rows_fetched_(0),
+    fetched_rows_(false),
+    frontend_(frontend),
+    parent_server_(server),
+    start_time_(TimestampValue::LocalTime()) {
+#ifndef NDEBUG
+  profile_.AddInfoString("DEBUG MODE WARNING", "Query profile created while running a "
+      "DEBUG build of Impala. Use RELEASE builds to measure query performance.");
+#endif
+  row_materialization_timer_ = ADD_TIMER(&server_profile_, "RowMaterializationTimer");
+  client_wait_timer_ = ADD_TIMER(&server_profile_, "ClientFetchWaitTimer");
+  query_events_ = summary_profile_.AddEventSequence("Query Timeline");
+  query_events_->Start();
+  profile_.AddChild(&summary_profile_);
+
+  profile_.set_name("Query (id=" + PrintId(query_id()) + ")");
+  summary_profile_.AddInfoString("Session ID", PrintId(session_id()));
+  summary_profile_.AddInfoString("Session Type", PrintTSessionType(session_type()));
+  if (session_type() == TSessionType::HIVESERVER2) {
+    summary_profile_.AddInfoString("HiveServer2 Protocol Version",
+        Substitute("V$0", 1 + session->hs2_version));
+  }
+  summary_profile_.AddInfoString("Start Time", start_time().DebugString());
+  summary_profile_.AddInfoString("End Time", "");
+  summary_profile_.AddInfoString("Query Type", "N/A");
+  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_.AddInfoString("Query Status", "OK");
+  summary_profile_.AddInfoString("Impala Version", GetVersionString(/* compact */ true));
+  summary_profile_.AddInfoString("User", effective_user());
+  summary_profile_.AddInfoString("Connected User", connected_user());
+  summary_profile_.AddInfoString("Delegated User", do_as_user());
+  summary_profile_.AddInfoString("Network Address",
+      lexical_cast<string>(session_->network_address));
+  summary_profile_.AddInfoString("Default Db", default_db());
+  summary_profile_.AddInfoString("Sql Statement", query_ctx_.client_request.stmt);
+  summary_profile_.AddInfoString("Coordinator",
+      TNetworkAddressToString(exec_env->backend_address()));
+}
+
+ClientRequestState::~ClientRequestState() {
+  DCHECK(wait_thread_.get() == NULL) << "BlockOnWait() needs to be called!";
+}
+
+Status ClientRequestState::SetResultCache(QueryResultSet* cache,
+    int64_t max_size) {
+  lock_guard<mutex> l(lock_);
+  DCHECK(result_cache_ == NULL);
+  result_cache_.reset(cache);
+  if (max_size > FLAGS_max_result_cache_size) {
+    return Status(
+        Substitute("Requested result-cache size of $0 exceeds Impala's maximum of $1.",
+            max_size, FLAGS_max_result_cache_size));
+  }
+  result_cache_max_size_ = max_size;
+  return Status::OK();
+}
+
+Status ClientRequestState::Exec(TExecRequest* exec_request) {
+  MarkActive();
+  exec_request_ = *exec_request;
+
+  profile_.AddChild(&server_profile_);
+  summary_profile_.AddInfoString("Query Type", PrintTStmtType(stmt_type()));
+  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  summary_profile_.AddInfoString("Query Options (non default)",
+      DebugQueryOptions(query_ctx_.client_request.query_options));
+
+  switch (exec_request->stmt_type) {
+    case TStmtType::QUERY:
+    case TStmtType::DML:
+      DCHECK(exec_request_.__isset.query_exec_request);
+      return ExecQueryOrDmlRequest(exec_request_.query_exec_request);
+    case TStmtType::EXPLAIN: {
+      request_result_set_.reset(new vector<TResultRow>(
+          exec_request_.explain_result.results));
+      return Status::OK();
+    }
+    case TStmtType::DDL: {
+      DCHECK(exec_request_.__isset.catalog_op_request);
+      return ExecDdlRequest();
+    }
+    case TStmtType::LOAD: {
+      DCHECK(exec_request_.__isset.load_data_request);
+      TLoadDataResp response;
+      RETURN_IF_ERROR(
+          frontend_->LoadData(exec_request_.load_data_request, &response));
+      request_result_set_.reset(new vector<TResultRow>);
+      request_result_set_->push_back(response.load_summary);
+
+      // Now refresh the table metadata.
+      TCatalogOpRequest reset_req;
+      reset_req.__set_op_type(TCatalogOpType::RESET_METADATA);
+      reset_req.__set_reset_metadata_params(TResetMetadataRequest());
+      reset_req.reset_metadata_params.__set_header(TCatalogServiceRequestHeader());
+      reset_req.reset_metadata_params.__set_is_refresh(true);
+      reset_req.reset_metadata_params.__set_table_name(
+          exec_request_.load_data_request.table_name);
+      catalog_op_executor_.reset(
+          new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+      RETURN_IF_ERROR(catalog_op_executor_->Exec(reset_req));
+      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+          *catalog_op_executor_->update_catalog_result(),
+          exec_request_.query_options.sync_ddl));
+      return Status::OK();
+    }
+    case TStmtType::SET: {
+      DCHECK(exec_request_.__isset.set_query_option_request);
+      lock_guard<mutex> l(session_->lock);
+      if (exec_request_.set_query_option_request.__isset.key) {
+        // "SET key=value" updates the session query options.
+        DCHECK(exec_request_.set_query_option_request.__isset.value);
+        RETURN_IF_ERROR(SetQueryOption(
+            exec_request_.set_query_option_request.key,
+            exec_request_.set_query_option_request.value,
+            &session_->default_query_options,
+            &session_->set_query_options_mask));
+        SetResultSet({}, {});
+      } else {
+        // "SET" returns a table of all query options.
+        map<string, string> config;
+        TQueryOptionsToMap(
+            session_->default_query_options, &config);
+        vector<string> keys, values;
+        map<string, string>::const_iterator itr = config.begin();
+        for (; itr != config.end(); ++itr) {
+          keys.push_back(itr->first);
+          values.push_back(itr->second);
+        }
+        SetResultSet(keys, values);
+      }
+      return Status::OK();
+    }
+    default:
+      stringstream errmsg;
+      errmsg << "Unknown  exec request stmt type: " << exec_request_.stmt_type;
+      return Status(errmsg.str());
+  }
+}
+
+Status ClientRequestState::ExecLocalCatalogOp(
+    const TCatalogOpRequest& catalog_op) {
+  switch (catalog_op.op_type) {
+    case TCatalogOpType::USE: {
+      lock_guard<mutex> l(session_->lock);
+      session_->database = exec_request_.catalog_op_request.use_db_params.db;
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_TABLES: {
+      const TShowTablesParams* params = &catalog_op.show_tables_params;
+      // A NULL pattern means match all tables. However, Thrift string types can't
+      // be NULL in C++, so we have to test if it's set rather than just blindly
+      // using the value.
+      const string* table_name =
+          params->__isset.show_pattern ? &(params->show_pattern) : NULL;
+      TGetTablesResult table_names;
+      RETURN_IF_ERROR(frontend_->GetTableNames(params->db, table_name,
+          &query_ctx_.session, &table_names));
+      SetResultSet(table_names.tables);
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_DBS: {
+      const TShowDbsParams* params = &catalog_op.show_dbs_params;
+      TGetDbsResult dbs;
+      const string* db_pattern =
+          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
+      RETURN_IF_ERROR(
+          frontend_->GetDbs(db_pattern, &query_ctx_.session, &dbs));
+      vector<string> names, comments;
+      names.reserve(dbs.dbs.size());
+      comments.reserve(dbs.dbs.size());
+      for (const TDatabase& db: dbs.dbs) {
+        names.push_back(db.db_name);
+        comments.push_back(db.metastore_db.description);
+      }
+      SetResultSet(names, comments);
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_DATA_SRCS: {
+      const TShowDataSrcsParams* params = &catalog_op.show_data_srcs_params;
+      TGetDataSrcsResult result;
+      const string* pattern =
+          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
+      RETURN_IF_ERROR(
+          frontend_->GetDataSrcMetadata(pattern, &result));
+      SetResultSet(result.data_src_names, result.locations, result.class_names,
+          result.api_versions);
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_STATS: {
+      const TShowStatsParams& params = catalog_op.show_stats_params;
+      TResultSet response;
+      RETURN_IF_ERROR(frontend_->GetStats(params, &response));
+      // Set the result set and its schema from the response.
+      request_result_set_.reset(new vector<TResultRow>(response.rows));
+      result_metadata_ = response.schema;
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_FUNCTIONS: {
+      const TShowFunctionsParams* params = &catalog_op.show_fns_params;
+      TGetFunctionsResult functions;
+      const string* fn_pattern =
+          params->__isset.show_pattern ? (&params->show_pattern) : NULL;
+      RETURN_IF_ERROR(frontend_->GetFunctions(
+          params->category, params->db, fn_pattern, &query_ctx_.session, &functions));
+      SetResultSet(functions.fn_ret_types, functions.fn_signatures,
+          functions.fn_binary_types, functions.fn_persistence);
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_ROLES: {
+      const TShowRolesParams& params = catalog_op.show_roles_params;
+      if (params.is_admin_op) {
+        // Verify the user has privileges to perform this operation by checking against
+        // the Sentry Service (via the Catalog Server).
+        catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+            &server_profile_));
+
+        TSentryAdminCheckRequest req;
+        req.__set_header(TCatalogServiceRequestHeader());
+        req.header.__set_requesting_user(effective_user());
+        RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
+      }
+
+      // If we have made it here, the user has privileges to execute this operation.
+      // Return the results.
+      TShowRolesResult result;
+      RETURN_IF_ERROR(frontend_->ShowRoles(params, &result));
+      SetResultSet(result.role_names);
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_GRANT_ROLE: {
+      const TShowGrantRoleParams& params = catalog_op.show_grant_role_params;
+      if (params.is_admin_op) {
+        // Verify the user has privileges to perform this operation by checking against
+        // the Sentry Service (via the Catalog Server).
+        catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+            &server_profile_));
+
+        TSentryAdminCheckRequest req;
+        req.__set_header(TCatalogServiceRequestHeader());
+        req.header.__set_requesting_user(effective_user());
+        RETURN_IF_ERROR(catalog_op_executor_->SentryAdminCheck(req));
+      }
+
+      TResultSet response;
+      RETURN_IF_ERROR(frontend_->GetRolePrivileges(params, &response));
+      // Set the result set and its schema from the response.
+      request_result_set_.reset(new vector<TResultRow>(response.rows));
+      result_metadata_ = response.schema;
+      return Status::OK();
+    }
+    case TCatalogOpType::DESCRIBE_DB: {
+      TDescribeResult response;
+      RETURN_IF_ERROR(frontend_->DescribeDb(catalog_op.describe_db_params,
+          &response));
+      // Set the result set
+      request_result_set_.reset(new vector<TResultRow>(response.results));
+      return Status::OK();
+    }
+    case TCatalogOpType::DESCRIBE_TABLE: {
+      TDescribeResult response;
+      RETURN_IF_ERROR(frontend_->DescribeTable(catalog_op.describe_table_params,
+          &response));
+      // Set the result set
+      request_result_set_.reset(new vector<TResultRow>(response.results));
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_CREATE_TABLE: {
+      string response;
+      RETURN_IF_ERROR(frontend_->ShowCreateTable(catalog_op.show_create_table_params,
+          &response));
+      SetResultSet(vector<string>(1, response));
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_CREATE_FUNCTION: {
+      string response;
+      RETURN_IF_ERROR(frontend_->ShowCreateFunction(catalog_op.show_create_function_params,
+          &response));
+      SetResultSet(vector<string>(1, response));
+      return Status::OK();
+    }
+    case TCatalogOpType::SHOW_FILES: {
+      TResultSet response;
+      RETURN_IF_ERROR(frontend_->GetTableFiles(catalog_op.show_files_params, &response));
+      // Set the result set and its schema from the response.
+      request_result_set_.reset(new vector<TResultRow>(response.rows));
+      result_metadata_ = response.schema;
+      return Status::OK();
+    }
+    default: {
+      stringstream ss;
+      ss << "Unexpected TCatalogOpType: " << catalog_op.op_type;
+      return Status(ss.str());
+    }
+  }
+}
+
+Status ClientRequestState::ExecQueryOrDmlRequest(
+    const TQueryExecRequest& query_exec_request) {
+  // we always need at least one plan fragment
+  DCHECK(query_exec_request.plan_exec_info.size() > 0);
+
+  if (query_exec_request.__isset.query_plan) {
+    stringstream plan_ss;
+    // Add some delimiters to make it clearer where the plan
+    // begins and the profile ends
+    plan_ss << "\n----------------\n"
+            << query_exec_request.query_plan
+            << "----------------";
+    summary_profile_.AddInfoString("Plan", plan_ss.str());
+  }
+  // Add info strings consumed by CM: Estimated mem and tables missing stats.
+  if (query_exec_request.__isset.per_host_mem_estimate) {
+    stringstream ss;
+    ss << query_exec_request.per_host_mem_estimate;
+    summary_profile_.AddInfoString(PER_HOST_MEM_KEY, ss.str());
+  }
+  if (query_exec_request.__isset.per_host_min_reservation) {
+    stringstream ss;
+    ss << query_exec_request.per_host_min_reservation;
+    summary_profile_.AddInfoString(PER_HOST_MEMORY_RESERVATION_KEY, ss.str());
+  }
+  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
+      query_exec_request.query_ctx.__isset.tables_missing_stats &&
+      !query_exec_request.query_ctx.tables_missing_stats.empty()) {
+    stringstream ss;
+    const vector<TTableName>& tbls = query_exec_request.query_ctx.tables_missing_stats;
+    for (int i = 0; i < tbls.size(); ++i) {
+      if (i != 0) ss << ",";
+      ss << tbls[i].db_name << "." << tbls[i].table_name;
+    }
+    summary_profile_.AddInfoString(TABLES_MISSING_STATS_KEY, ss.str());
+  }
+
+  if (!query_exec_request.query_ctx.__isset.parent_query_id &&
+      query_exec_request.query_ctx.__isset.tables_with_corrupt_stats &&
+      !query_exec_request.query_ctx.tables_with_corrupt_stats.empty()) {
+    stringstream ss;
+    const vector<TTableName>& tbls =
+        query_exec_request.query_ctx.tables_with_corrupt_stats;
+    for (int i = 0; i < tbls.size(); ++i) {
+      if (i != 0) ss << ",";
+      ss << tbls[i].db_name << "." << tbls[i].table_name;
+    }
+    summary_profile_.AddInfoString(TABLES_WITH_CORRUPT_STATS_KEY, ss.str());
+  }
+
+  if (query_exec_request.query_ctx.__isset.tables_missing_diskids &&
+      !query_exec_request.query_ctx.tables_missing_diskids.empty()) {
+    stringstream ss;
+    const vector<TTableName>& tbls =
+        query_exec_request.query_ctx.tables_missing_diskids;
+    for (int i = 0; i < tbls.size(); ++i) {
+      if (i != 0) ss << ",";
+      ss << tbls[i].db_name << "." << tbls[i].table_name;
+    }
+    summary_profile_.AddInfoString(TABLES_WITH_MISSING_DISK_IDS_KEY, ss.str());
+  }
+
+  {
+    lock_guard<mutex> l(lock_);
+    // Don't start executing the query if Cancel() was called concurrently with Exec().
+    if (is_cancelled_) return Status::CANCELLED;
+    // TODO: make schedule local to coordinator and move schedule_->Release() into
+    // Coordinator::TearDown()
+    schedule_.reset(new QuerySchedule(query_id(), query_exec_request,
+        exec_request_.query_options, &summary_profile_, query_events_));
+  }
+  Status status = exec_env_->scheduler()->Schedule(schedule_.get());
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  if (exec_env_->admission_controller() != nullptr) {
+    status = exec_env_->admission_controller()->AdmitQuery(schedule_.get());
+    {
+      lock_guard<mutex> l(lock_);
+      RETURN_IF_ERROR(UpdateQueryStatus(status));
+    }
+  }
+
+  coord_.reset(new Coordinator(*schedule_, query_events_));
+  status = coord_->Exec();
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  profile_.AddChild(coord_->query_profile());
+  return Status::OK();
+}
+
+Status ClientRequestState::ExecDdlRequest() {
+  string op_type = catalog_op_type() == TCatalogOpType::DDL ?
+      PrintTDdlType(ddl_type()) : PrintTCatalogOpType(catalog_op_type());
+  summary_profile_.AddInfoString("DDL Type", op_type);
+
+  if (catalog_op_type() != TCatalogOpType::DDL &&
+      catalog_op_type() != TCatalogOpType::RESET_METADATA) {
+    Status status = ExecLocalCatalogOp(exec_request_.catalog_op_request);
+    lock_guard<mutex> l(lock_);
+    return UpdateQueryStatus(status);
+  }
+
+  if (ddl_type() == TDdlType::COMPUTE_STATS) {
+    TComputeStatsParams& compute_stats_params =
+        exec_request_.catalog_op_request.ddl_params.compute_stats_params;
+    // Add child queries for computing table and column stats.
+    vector<ChildQuery> child_queries;
+    if (compute_stats_params.__isset.tbl_stats_query) {
+      child_queries.push_back(
+          ChildQuery(compute_stats_params.tbl_stats_query, this, parent_server_));
+    }
+    if (compute_stats_params.__isset.col_stats_query) {
+      child_queries.push_back(
+          ChildQuery(compute_stats_params.col_stats_query, this, parent_server_));
+    }
+
+    if (child_queries.size() > 0) child_query_executor_->ExecAsync(move(child_queries));
+    return Status::OK();
+  }
+
+  catalog_op_executor_.reset(new CatalogOpExecutor(exec_env_, frontend_,
+      &server_profile_));
+  Status status = catalog_op_executor_->Exec(exec_request_.catalog_op_request);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(UpdateQueryStatus(status));
+  }
+
+  // If this is a CTAS request, there will usually be more work to do
+  // after executing the CREATE TABLE statement (the INSERT portion of the operation).
+  // The exception is if the user specified IF NOT EXISTS and the table already
+  // existed, in which case we do not execute the INSERT.
+  if (catalog_op_type() == TCatalogOpType::DDL &&
+      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT &&
+      !catalog_op_executor_->ddl_exec_response()->new_table_created) {
+    DCHECK(exec_request_.catalog_op_request.
+        ddl_params.create_table_params.if_not_exists);
+    return Status::OK();
+  }
+
+  // Add newly created table to catalog cache.
+  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+      *catalog_op_executor_->update_catalog_result(),
+      exec_request_.query_options.sync_ddl));
+
+  if (catalog_op_type() == TCatalogOpType::DDL &&
+      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
+    // At this point, the remainder of the CTAS request executes
+    // like a normal DML request. As with other DML requests, it will
+    // wait for another catalog update if any partitions were altered as a result
+    // of the operation.
+    DCHECK(exec_request_.__isset.query_exec_request);
+    RETURN_IF_ERROR(ExecQueryOrDmlRequest(exec_request_.query_exec_request));
+  }
+
+  // Set the results to be reported to the client.
+  SetResultSet(catalog_op_executor_->ddl_exec_response());
+  return Status::OK();
+}
+
+void ClientRequestState::Done() {
+  MarkActive();
+  // Make sure we join on wait_thread_ before we finish (and especially before this object
+  // is destroyed).
+  BlockOnWait();
+
+  // Update latest observed Kudu timestamp stored in the session from the coordinator.
+  // Needs to take the session_ lock which must not be taken while holding lock_, so this
+  // must happen before taking lock_ below.
+  if (coord_.get() != NULL) {
+    // This is safe to access on coord_ after Wait() has been called.
+    uint64_t latest_kudu_ts = coord_->GetLatestKuduInsertTimestamp();
+    if (latest_kudu_ts > 0) {
+      VLOG_RPC << "Updating session (id=" << session_id()  << ") with latest "
+               << "observed Kudu timestamp: " << latest_kudu_ts;
+      lock_guard<mutex> session_lock(session_->lock);
+      session_->kudu_latest_observed_ts = std::max<uint64_t>(
+          session_->kudu_latest_observed_ts, latest_kudu_ts);
+    }
+  }
+
+  unique_lock<mutex> l(lock_);
+  end_time_ = TimestampValue::LocalTime();
+  summary_profile_.AddInfoString("End Time", end_time().DebugString());
+  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  query_events_->MarkEvent("Unregister query");
+
+  // Update result set cache metrics, and update mem limit accounting before tearing
+  // down the coordinator.
+  ClearResultCache();
+
+  if (coord_.get() != NULL) {
+    // Release any reserved resources.
+    if (exec_env_->admission_controller() != nullptr) {
+      Status status = exec_env_->admission_controller()->ReleaseQuery(schedule_.get());
+      if (!status.ok()) {
+        LOG(WARNING) << "Failed to release resources of query " << schedule_->query_id()
+                     << " because of error: " << status.GetDetail();
+      }
+    }
+    coord_->TearDown();
+  }
+}
+
+Status ClientRequestState::Exec(const TMetadataOpRequest& exec_request) {
+  TResultSet metadata_op_result;
+  // Like the other Exec(), fill out as much profile information as we're able to.
+  summary_profile_.AddInfoString("Query Type", PrintTStmtType(TStmtType::DDL));
+  summary_profile_.AddInfoString("Query State", PrintQueryState(query_state_));
+  RETURN_IF_ERROR(frontend_->ExecHiveServer2MetadataOp(exec_request,
+      &metadata_op_result));
+  result_metadata_ = metadata_op_result.schema;
+  request_result_set_.reset(new vector<TResultRow>(metadata_op_result.rows));
+  return Status::OK();
+}
+
+void ClientRequestState::WaitAsync() {
+  wait_thread_.reset(new Thread(
+      "query-exec-state", "wait-thread", &ClientRequestState::Wait, this));
+}
+
+void ClientRequestState::BlockOnWait() {
+  unique_lock<mutex> l(lock_);
+  if (wait_thread_.get() == NULL) return;
+  if (!is_block_on_wait_joining_) {
+    // No other thread is already joining on wait_thread_, so this thread needs to do
+    // it.  Other threads will need to block on the cond-var.
+    is_block_on_wait_joining_ = true;
+    l.unlock();
+    wait_thread_->Join();
+    l.lock();
+    is_block_on_wait_joining_ = false;
+    wait_thread_.reset();
+    block_on_wait_cv_.notify_all();
+  } else {
+    // Another thread is already joining with wait_thread_.  Block on the cond-var
+    // until the Join() executed in the other thread has completed.
+    do {
+      block_on_wait_cv_.wait(l);
+    } while (is_block_on_wait_joining_);
+  }
+}
+
+void ClientRequestState::Wait() {
+  // block until results are ready
+  Status status = WaitInternal();
+  {
+    lock_guard<mutex> l(lock_);
+    if (returns_result_set()) {
+      query_events()->MarkEvent("Rows available");
+    } else {
+      query_events()->MarkEvent("Request finished");
+    }
+    (void) UpdateQueryStatus(status);
+  }
+  if (status.ok()) {
+    UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+  }
+}
+
+Status ClientRequestState::WaitInternal() {
+  // Explain requests have already populated the result set. Nothing to do here.
+  if (exec_request_.stmt_type == TStmtType::EXPLAIN) {
+    MarkInactive();
+    return Status::OK();
+  }
+
+  vector<ChildQuery*> child_queries;
+  Status child_queries_status = child_query_executor_->WaitForAll(&child_queries);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(query_status_);
+    RETURN_IF_ERROR(UpdateQueryStatus(child_queries_status));
+  }
+  if (!child_queries.empty()) query_events_->MarkEvent("Child queries finished");
+
+  if (coord_.get() != NULL) {
+    RETURN_IF_ERROR(coord_->Wait());
+    RETURN_IF_ERROR(UpdateCatalog());
+  }
+
+  if (catalog_op_type() == TCatalogOpType::DDL &&
+      ddl_type() == TDdlType::COMPUTE_STATS && child_queries.size() > 0) {
+    RETURN_IF_ERROR(UpdateTableAndColumnStats(child_queries));
+  }
+
+  if (!returns_result_set()) {
+    // Queries that do not return a result are finished at this point. This includes
+    // DML operations and a subset of the DDL operations.
+    eos_ = true;
+  } else if (catalog_op_type() == TCatalogOpType::DDL &&
+      ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT) {
+    SetCreateTableAsSelectResultSet();
+  }
+  // Rows are available now (for SELECT statement), so start the 'wait' timer that tracks
+  // how long Impala waits for the client to fetch rows. For other statements, track the
+  // time until a Close() is received.
+  MarkInactive();
+  return Status::OK();
+}
+
+Status ClientRequestState::FetchRows(const int32_t max_rows,
+    QueryResultSet* fetched_rows) {
+  // Pause the wait timer, since the client has instructed us to do work on its behalf.
+  MarkActive();
+
+  // ImpalaServer::FetchInternal has already taken our lock_
+  (void) UpdateQueryStatus(FetchRowsInternal(max_rows, fetched_rows));
+
+  MarkInactive();
+  return query_status_;
+}
+
+Status ClientRequestState::RestartFetch() {
+  // No result caching for this query. Restart is invalid.
+  if (result_cache_max_size_ <= 0) {
+    return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR,
+        "Restarting of fetch requires enabling of query result caching."));
+  }
+  // The cache overflowed on a previous fetch.
+  if (result_cache_.get() == NULL) {
+    stringstream ss;
+    ss << "The query result cache exceeded its limit of " << result_cache_max_size_
+       << " rows. Restarting the fetch is not possible.";
+    return Status(ErrorMsg(TErrorCode::RECOVERABLE_ERROR, ss.str()));
+  }
+  // Reset fetch state to start over.
+  eos_ = false;
+  num_rows_fetched_ = 0;
+  return Status::OK();
+}
+
+void ClientRequestState::UpdateNonErrorQueryState(
+    beeswax::QueryState::type query_state) {
+  lock_guard<mutex> l(lock_);
+  DCHECK(query_state != beeswax::QueryState::EXCEPTION);
+  if (query_state_ < query_state) query_state_ = query_state;
+}
+
+Status ClientRequestState::UpdateQueryStatus(const Status& status) {
+  // Preserve the first non-ok status
+  if (!status.ok() && query_status_.ok()) {
+    query_state_ = beeswax::QueryState::EXCEPTION;
+    query_status_ = status;
+    summary_profile_.AddInfoString("Query Status", query_status_.GetDetail());
+  }
+
+  return status;
+}
+
+Status ClientRequestState::FetchRowsInternal(const int32_t max_rows,
+    QueryResultSet* fetched_rows) {
+  DCHECK(query_state_ != beeswax::QueryState::EXCEPTION);
+
+  if (eos_) return Status::OK();
+
+  if (request_result_set_ != NULL) {
+    query_state_ = beeswax::QueryState::FINISHED;
+    int num_rows = 0;
+    const vector<TResultRow>& all_rows = (*(request_result_set_.get()));
+    // max_rows <= 0 means no limit
+    while ((num_rows < max_rows || max_rows <= 0)
+        && num_rows_fetched_ < all_rows.size()) {
+      fetched_rows->AddOneRow(all_rows[num_rows_fetched_]);
+      ++num_rows_fetched_;
+      ++num_rows;
+    }
+    eos_ = (num_rows_fetched_ == all_rows.size());
+    return Status::OK();
+  }
+
+  if (coord_.get() == nullptr) {
+    return Status("Client tried to fetch rows on a query that produces no results.");
+  }
+
+  int32_t num_rows_fetched_from_cache = 0;
+  if (result_cache_max_size_ > 0 && result_cache_ != NULL) {
+    // Satisfy the fetch from the result cache if possible.
+    int cache_fetch_size = (max_rows <= 0) ? result_cache_->size() : max_rows;
+    num_rows_fetched_from_cache =
+        fetched_rows->AddRows(result_cache_.get(), num_rows_fetched_, cache_fetch_size);
+    num_rows_fetched_ += num_rows_fetched_from_cache;
+    if (num_rows_fetched_from_cache >= max_rows) return Status::OK();
+  }
+
+  query_state_ = beeswax::QueryState::FINISHED;  // results will be ready after this call
+
+  // Maximum number of rows to be fetched from the coord.
+  int32_t max_coord_rows = max_rows;
+  if (max_rows > 0) {
+    DCHECK_LE(num_rows_fetched_from_cache, max_rows);
+    max_coord_rows = max_rows - num_rows_fetched_from_cache;
+  }
+  {
+    SCOPED_TIMER(row_materialization_timer_);
+    size_t before = fetched_rows->size();
+    // Temporarily release lock so calls to Cancel() are not blocked. fetch_rows_lock_
+    // (already held) ensures that we do not call coord_->GetNext() multiple times
+    // concurrently.
+    // TODO: Simplify this.
+    lock_.unlock();
+    Status status = coord_->GetNext(fetched_rows, max_coord_rows, &eos_);
+    lock_.lock();
+    int num_fetched = fetched_rows->size() - before;
+    DCHECK(max_coord_rows <= 0 || num_fetched <= max_coord_rows) << Substitute(
+        "Fetched more rows ($0) than asked for ($1)", num_fetched, max_coord_rows);
+    num_rows_fetched_ += num_fetched;
+
+    RETURN_IF_ERROR(status);
+    // Check if query status has changed during GetNext() call
+    if (!query_status_.ok()) {
+      eos_ = true;
+      return query_status_;
+    }
+  }
+
+  // Update the result cache if necessary.
+  if (result_cache_max_size_ > 0 && result_cache_.get() != NULL) {
+    int rows_fetched_from_coord = fetched_rows->size() - num_rows_fetched_from_cache;
+    if (result_cache_->size() + rows_fetched_from_coord > result_cache_max_size_) {
+      // Set the cache to NULL to indicate that adding the rows fetched from the coord
+      // would exceed the bound of the cache, and therefore, RestartFetch() should fail.
+      ClearResultCache();
+      return Status::OK();
+    }
+
+    // We guess the size of the cache after adding fetched_rows by looking at the size of
+    // fetched_rows itself, and using this estimate to confirm that the memtracker will
+    // allow us to use this much extra memory. In fact, this might be an overestimate, as
+    // the size of two result sets combined into one is not always the size of both result
+    // sets added together (the best example is the null bitset for each column: it might
+    // have only one entry in each result set, and as a result consume two bytes, but when
+    // the result sets are combined, only one byte is needed). Therefore after we add the
+    // new result set into the cache, we need to fix up the memory consumption to the
+    // actual levels to ensure we don't 'leak' bytes that we aren't using.
+    int64_t before = result_cache_->ByteSize();
+
+    // Upper-bound on memory required to add fetched_rows to the cache.
+    int64_t delta_bytes =
+        fetched_rows->ByteSize(num_rows_fetched_from_cache, fetched_rows->size());
+    MemTracker* query_mem_tracker = coord_->query_mem_tracker();
+    // Count the cached rows towards the mem limit.
+    if (UNLIKELY(!query_mem_tracker->TryConsume(delta_bytes))) {
+      string details("Failed to allocate memory for result cache.");
+      return query_mem_tracker->MemLimitExceeded(coord_->runtime_state(), details,
+          delta_bytes);
+    }
+    // Append all rows fetched from the coordinator into the cache.
+    int num_rows_added = result_cache_->AddRows(
+        fetched_rows, num_rows_fetched_from_cache, fetched_rows->size());
+
+    int64_t after = result_cache_->ByteSize();
+
+    // Confirm that this was not an underestimate of the memory required.
+    DCHECK_GE(before + delta_bytes, after)
+        << "Combined result sets consume more memory than both individually "
+        << Substitute("(before: $0, delta_bytes: $1, after: $2)",
+            before, delta_bytes, after);
+
+    // Fix up the tracked values
+    if (before + delta_bytes > after) {
+      query_mem_tracker->Release(before + delta_bytes - after);
+      delta_bytes = after - before;
+    }
+
+    // Update result set cache metrics.
+    ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(num_rows_added);
+    ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(delta_bytes);
+  }
+
+  return Status::OK();
+}
+
+Status ClientRequestState::Cancel(bool check_inflight, const Status* cause) {
+  if (check_inflight) {
+    // If the query is in 'inflight_queries' it means that the query has actually started
+    // executing. It is ok if the query is removed from 'inflight_queries' during
+    // cancellation, so we can release the session lock before starting the cancellation
+    // work.
+    lock_guard<mutex> session_lock(session_->lock);
+    if (session_->inflight_queries.find(query_id()) == session_->inflight_queries.end()) {
+      return Status("Query not yet running");
+    }
+  }
+
+  Coordinator* coord;
+  {
+    lock_guard<mutex> lock(lock_);
+    // If the query is completed or cancelled, no need to update state.
+    bool already_done = eos_ || query_state_ == beeswax::QueryState::EXCEPTION;
+    if (!already_done && cause != NULL) {
+      DCHECK(!cause->ok());
+      (void) UpdateQueryStatus(*cause);
+      query_events_->MarkEvent("Cancelled");
+      DCHECK_EQ(query_state_, beeswax::QueryState::EXCEPTION);
+    }
+    // Get a copy of the coordinator pointer while holding 'lock_'.
+    coord = coord_.get();
+    is_cancelled_ = true;
+  } // Release lock_ before doing cancellation work.
+
+  // Cancel and close child queries before cancelling parent. 'lock_' should not be held
+  // because a) ChildQuery::Cancel() calls back into ImpalaServer and b) cancellation
+  // involves RPCs and can take quite some time.
+  child_query_executor_->Cancel();
+
+  // Cancel the parent query. 'lock_' should not be held because cancellation involves
+  // RPCs and can block for a long time.
+  if (coord != NULL) coord->Cancel(cause);
+  return Status::OK();
+}
+
+Status ClientRequestState::UpdateCatalog() {
+  if (!exec_request().__isset.query_exec_request ||
+      exec_request().query_exec_request.stmt_type != TStmtType::DML) {
+    return Status::OK();
+  }
+
+  query_events_->MarkEvent("DML data written");
+  SCOPED_TIMER(ADD_TIMER(&server_profile_, "MetastoreUpdateTimer"));
+
+  TQueryExecRequest query_exec_request = exec_request().query_exec_request;
+  if (query_exec_request.__isset.finalize_params) {
+    const TFinalizeParams& finalize_params = query_exec_request.finalize_params;
+    TUpdateCatalogRequest catalog_update;
+    catalog_update.__set_header(TCatalogServiceRequestHeader());
+    catalog_update.header.__set_requesting_user(effective_user());
+    if (!coord()->PrepareCatalogUpdate(&catalog_update)) {
+      VLOG_QUERY << "No partitions altered, not updating metastore (query id: "
+                 << query_id() << ")";
+    } else {
+      // TODO: We track partitions written to, not created, which means
+      // that we do more work than is necessary, because written-to
+      // partitions don't always require a metastore change.
+      VLOG_QUERY << "Updating metastore with " << catalog_update.created_partitions.size()
+                 << " altered partitions ("
+                 << join (catalog_update.created_partitions, ", ") << ")";
+
+      catalog_update.target_table = finalize_params.table_name;
+      catalog_update.db_name = finalize_params.table_db;
+
+      Status cnxn_status;
+      const TNetworkAddress& address =
+          MakeNetworkAddress(FLAGS_catalog_service_host, FLAGS_catalog_service_port);
+      CatalogServiceConnection client(
+          exec_env_->catalogd_client_cache(), address, &cnxn_status);
+      RETURN_IF_ERROR(cnxn_status);
+
+      VLOG_QUERY << "Executing FinalizeDml() using CatalogService";
+      TUpdateCatalogResponse resp;
+      RETURN_IF_ERROR(
+          client.DoRpc(&CatalogServiceClient::UpdateCatalog, catalog_update, &resp));
+
+      Status status(resp.result.status);
+      if (!status.ok()) LOG(ERROR) << "ERROR Finalizing DML: " << status.GetDetail();
+      RETURN_IF_ERROR(status);
+      RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(resp.result,
+          exec_request_.query_options.sync_ddl));
+    }
+  }
+  query_events_->MarkEvent("DML Metastore update finished");
+  return Status::OK();
+}
+
+void ClientRequestState::SetResultSet(const TDdlExecResponse* ddl_resp) {
+  if (ddl_resp != NULL && ddl_resp->__isset.result_set) {
+    result_metadata_ = ddl_resp->result_set.schema;
+    request_result_set_.reset(new vector<TResultRow>(ddl_resp->result_set.rows));
+  }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& results) {
+  request_result_set_.reset(new vector<TResultRow>);
+  request_result_set_->resize(results.size());
+  for (int i = 0; i < results.size(); ++i) {
+    (*request_result_set_.get())[i].__isset.colVals = true;
+    (*request_result_set_.get())[i].colVals.resize(1);
+    (*request_result_set_.get())[i].colVals[0].__set_string_val(results[i]);
+  }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& col1,
+    const vector<string>& col2) {
+  DCHECK_EQ(col1.size(), col2.size());
+
+  request_result_set_.reset(new vector<TResultRow>);
+  request_result_set_->resize(col1.size());
+  for (int i = 0; i < col1.size(); ++i) {
+    (*request_result_set_.get())[i].__isset.colVals = true;
+    (*request_result_set_.get())[i].colVals.resize(2);
+    (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
+    (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
+  }
+}
+
+void ClientRequestState::SetResultSet(const vector<string>& col1,
+    const vector<string>& col2, const vector<string>& col3, const vector<string>& col4) {
+  DCHECK_EQ(col1.size(), col2.size());
+  DCHECK_EQ(col1.size(), col3.size());
+  DCHECK_EQ(col1.size(), col4.size());
+
+  request_result_set_.reset(new vector<TResultRow>);
+  request_result_set_->resize(col1.size());
+  for (int i = 0; i < col1.size(); ++i) {
+    (*request_result_set_.get())[i].__isset.colVals = true;
+    (*request_result_set_.get())[i].colVals.resize(4);
+    (*request_result_set_.get())[i].colVals[0].__set_string_val(col1[i]);
+    (*request_result_set_.get())[i].colVals[1].__set_string_val(col2[i]);
+    (*request_result_set_.get())[i].colVals[2].__set_string_val(col3[i]);
+    (*request_result_set_.get())[i].colVals[3].__set_string_val(col4[i]);
+  }
+}
+
+void ClientRequestState::SetCreateTableAsSelectResultSet() {
+  DCHECK(ddl_type() == TDdlType::CREATE_TABLE_AS_SELECT);
+  int64_t total_num_rows_inserted = 0;
+  // There will only be rows inserted in the case a new table was created as part of this
+  // operation.
+  if (catalog_op_executor_->ddl_exec_response()->new_table_created) {
+    DCHECK(coord_.get());
+    for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) {
+      total_num_rows_inserted += p.second.num_modified_rows;
+    }
+  }
+  const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted);
+  VLOG_QUERY << summary_msg;
+  vector<string> results(1, summary_msg);
+  SetResultSet(results);
+}
+
+void ClientRequestState::MarkInactive() {
+  client_wait_sw_.Start();
+  lock_guard<mutex> l(expiration_data_lock_);
+  last_active_time_ms_ = UnixMillis();
+  DCHECK(ref_count_ > 0) << "Invalid MarkInactive()";
+  --ref_count_;
+}
+
+void ClientRequestState::MarkActive() {
+  client_wait_sw_.Stop();
+  int64_t elapsed_time = client_wait_sw_.ElapsedTime();
+  client_wait_timer_->Set(elapsed_time);
+  lock_guard<mutex> l(expiration_data_lock_);
+  last_active_time_ms_ = UnixMillis();
+  ++ref_count_;
+}
+
+Status ClientRequestState::UpdateTableAndColumnStats(
+    const vector<ChildQuery*>& child_queries) {
+  DCHECK_GE(child_queries.size(), 1);
+  DCHECK_LE(child_queries.size(), 2);
+  catalog_op_executor_.reset(
+      new CatalogOpExecutor(exec_env_, frontend_, &server_profile_));
+
+  // If there was no column stats query, pass in empty thrift structures to
+  // ExecComputeStats(). Otherwise pass in the column stats result.
+  TTableSchema col_stats_schema;
+  TRowSet col_stats_data;
+  if (child_queries.size() > 1) {
+    col_stats_schema = child_queries[1]->result_schema();
+    col_stats_data = child_queries[1]->result_data();
+  }
+
+  Status status = catalog_op_executor_->ExecComputeStats(
+      exec_request_.catalog_op_request.ddl_params.compute_stats_params,
+      child_queries[0]->result_schema(),
+      child_queries[0]->result_data(),
+      col_stats_schema,
+      col_stats_data);
+  {
+    lock_guard<mutex> l(lock_);
+    RETURN_IF_ERROR(UpdateQueryStatus(status));
+  }
+  RETURN_IF_ERROR(parent_server_->ProcessCatalogUpdateResult(
+      *catalog_op_executor_->update_catalog_result(),
+      exec_request_.query_options.sync_ddl));
+
+  // Set the results to be reported to the client.
+  SetResultSet(catalog_op_executor_->ddl_exec_response());
+  query_events_->MarkEvent("Metastore update finished");
+  return Status::OK();
+}
+
+void ClientRequestState::ClearResultCache() {
+  if (result_cache_ == NULL) return;
+  // Update result set cache metrics and mem limit accounting.
+  ImpaladMetrics::RESULTSET_CACHE_TOTAL_NUM_ROWS->Increment(-result_cache_->size());
+  int64_t total_bytes = result_cache_->ByteSize();
+  ImpaladMetrics::RESULTSET_CACHE_TOTAL_BYTES->Increment(-total_bytes);
+  if (coord_ != NULL) {
+    DCHECK(coord_->query_mem_tracker() != NULL);
+    coord_->query_mem_tracker()->Release(total_bytes);
+  }
+  result_cache_.reset(NULL);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
new file mode 100644
index 0000000..0e18957
--- /dev/null
+++ b/be/src/service/client-request-state.h
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_SERVICE_CLIENT_REQUEST_STATE_H
+#define IMPALA_SERVICE_CLIENT_REQUEST_STATE_H
+
+#include "common/status.h"
+#include "exec/catalog-op-executor.h"
+#include "runtime/timestamp-value.h"
+#include "scheduling/query-schedule.h"
+#include "service/child-query.h"
+#include "service/impala-server.h"
+#include "util/auth-util.h"
+#include "util/runtime-profile.h"
+#include "gen-cpp/Frontend_types.h"
+#include "gen-cpp/Frontend_types.h"
+
+#include <boost/thread.hpp>
+#include <boost/unordered_set.hpp>
+#include <vector>
+
+namespace impala {
+
+class ExecEnv;
+class Coordinator;
+class RuntimeState;
+class RowBatch;
+class Expr;
+class TupleRow;
+class Frontend;
+class ClientRequestStateCleaner;
+
+/// Execution state of the client-facing side a query. This captures everything
+/// necessary to convert row batches received by the coordinator into results
+/// we can return to the client. It also captures all state required for
+/// servicing query-related requests from the client.
+/// Thread safety: this class is generally not thread-safe, callers need to
+/// synchronize access explicitly via lock(). See the ImpalaServer class comment for
+/// the required lock acquisition order.
+///
+/// TODO: Compute stats is the only stmt that requires child queries. Once the
+/// CatalogService performs background stats gathering the concept of child queries
+/// will likely become obsolete. Remove all child-query related code from this class.
+class ClientRequestState {
+ public:
+  ClientRequestState(const TQueryCtx& query_ctx, ExecEnv* exec_env, Frontend* frontend,
+      ImpalaServer* server, std::shared_ptr<ImpalaServer::SessionState> session);
+
+  ~ClientRequestState();
+
+  /// Initiates execution of a exec_request.
+  /// Non-blocking.
+  /// Must *not* be called with lock_ held.
+  Status Exec(TExecRequest* exec_request) WARN_UNUSED_RESULT;
+
+  /// Execute a HiveServer2 metadata operation
+  /// TODO: This is likely a superset of GetTableNames/GetDbs. Coalesce these different
+  /// code paths.
+  Status Exec(const TMetadataOpRequest& exec_request) WARN_UNUSED_RESULT;
+
+  /// Call this to ensure that rows are ready when calling FetchRows(). Updates the
+  /// query_status_, and advances query_state_ to FINISHED or EXCEPTION. Must be preceded
+  /// by call to Exec(). Waits for all child queries to complete. Takes lock_.
+  void Wait();
+
+  /// Calls Wait() asynchronously in a thread and returns immediately.
+  void WaitAsync();
+
+  /// BlockOnWait() may be called after WaitAsync() has been called in order to wait
+  /// for the asynchronous thread (wait_thread_) to complete. It is safe to call this
+  /// from multiple threads (all threads will block until wait_thread_ has completed)
+  /// and multiple times (non-blocking once wait_thread_ has completed). Do not call
+  /// while holding lock_.
+  void BlockOnWait();
+
+  /// Return at most max_rows from the current batch. If the entire current batch has
+  /// been returned, fetch another batch first.
+  /// Caller needs to hold fetch_rows_lock_ and lock_.
+  /// Caller should verify that EOS has not be reached before calling.
+  /// Must be preceeded by call to Wait() (or WaitAsync()/BlockOnWait()).
+  /// Also updates query_state_/status_ in case of error.
+  Status FetchRows(const int32_t max_rows, QueryResultSet* fetched_rows)
+      WARN_UNUSED_RESULT;
+
+  /// Resets the state of this query such that the next fetch() returns results from the
+  /// beginning of the query result set (by using the using result_cache_).
+  /// It is valid to call this function for any type of statement that returns a result
+  /// set, including queries, show stmts, compute stats, etc.
+  /// Returns a recoverable error status if the restart is not possible, ok() otherwise.
+  /// The error is recoverable to allow clients to resume fetching.
+  /// The caller must hold fetch_rows_lock_ and lock_.
+  Status RestartFetch() WARN_UNUSED_RESULT;
+
+  /// Update query state if the requested state isn't already obsolete. This is only for
+  /// non-error states - if the query encounters an error the query status needs to be set
+  /// with information about the error so UpdateQueryStatus must be used instead.
+  /// Takes lock_.
+  void UpdateNonErrorQueryState(beeswax::QueryState::type query_state);
+
+  /// Update the query status and the "Query Status" summary profile string.
+  /// If current status is already != ok, no update is made (we preserve the first error)
+  /// If called with a non-ok argument, the expectation is that the query will be aborted
+  /// quickly.
+  /// Returns the status argument (so we can write
+  /// RETURN_IF_ERROR(UpdateQueryStatus(SomeOperation())).
+  /// Does not take lock_, but requires it: caller must ensure lock_
+  /// is taken before calling UpdateQueryStatus
+  Status UpdateQueryStatus(const Status& status) WARN_UNUSED_RESULT;
+
+  /// Cancels the child queries and the coordinator with the given cause.
+  /// If cause is NULL, assume this was deliberately cancelled by the user.
+  /// Otherwise, sets state to EXCEPTION.
+  /// Does nothing if the query has reached EOS or already cancelled.
+  ///
+  /// Only returns an error if 'check_inflight' is true and the query is not yet
+  /// in-flight. Otherwise, proceed and return Status::OK() even if the query isn't
+  /// in-flight (for cleaning up after an error on the query issuing path).
+  Status Cancel(bool check_inflight, const Status* cause) WARN_UNUSED_RESULT;
+
+  /// This is called when the query is done (finished, cancelled, or failed).
+  /// Takes lock_: callers must not hold lock() before calling.
+  void Done();
+
+  /// Sets the API-specific (Beeswax, HS2) result cache and its size bound.
+  /// The given cache is owned by this query exec state, even if an error is returned.
+  /// Returns a non-ok status if max_size exceeds the per-impalad allowed maximum.
+  Status SetResultCache(QueryResultSet* cache, int64_t max_size) WARN_UNUSED_RESULT;
+
+  ImpalaServer::SessionState* session() const { return session_.get(); }
+
+  /// Queries are run and authorized on behalf of the effective_user.
+  const std::string& effective_user() const {
+      return GetEffectiveUser(query_ctx_.session);
+  }
+  const std::string& connected_user() const { return query_ctx_.session.connected_user; }
+  const std::string& do_as_user() const { return query_ctx_.session.delegated_user; }
+  TSessionType::type session_type() const { return query_ctx_.session.session_type; }
+  const TUniqueId& session_id() const { return query_ctx_.session.session_id; }
+  const std::string& default_db() const { return query_ctx_.session.database; }
+  bool eos() const { return eos_; }
+  Coordinator* coord() const { return coord_.get(); }
+  QuerySchedule* schedule() { return schedule_.get(); }
+
+  /// Resource pool associated with this query, or an empty string if the schedule has not
+  /// been created and had the pool set yet, or this StmtType doesn't go through admission
+  /// control.
+  std::string request_pool() const {
+    return schedule_ == nullptr ? "" : schedule_->request_pool();
+  }
+  int num_rows_fetched() const { return num_rows_fetched_; }
+  void set_fetched_rows() { fetched_rows_ = true; }
+  bool fetched_rows() const { return fetched_rows_; }
+  bool returns_result_set() { return !result_metadata_.columns.empty(); }
+  const TResultSetMetadata* result_metadata() { return &result_metadata_; }
+  const TUniqueId& query_id() const { return query_ctx_.query_id; }
+  const TExecRequest& exec_request() const { return exec_request_; }
+  TStmtType::type stmt_type() const { return exec_request_.stmt_type; }
+  TCatalogOpType::type catalog_op_type() const {
+    return exec_request_.catalog_op_request.op_type;
+  }
+  TDdlType::type ddl_type() const {
+    return exec_request_.catalog_op_request.ddl_params.ddl_type;
+  }
+  boost::mutex* lock() { return &lock_; }
+  boost::mutex* fetch_rows_lock() { return &fetch_rows_lock_; }
+  beeswax::QueryState::type query_state() const { return query_state_; }
+  const Status& query_status() const { return query_status_; }
+  void set_result_metadata(const TResultSetMetadata& md) { result_metadata_ = md; }
+  const RuntimeProfile& profile() const { return profile_; }
+  const RuntimeProfile& summary_profile() const { return summary_profile_; }
+  const TimestampValue& start_time() const { return start_time_; }
+  const TimestampValue& end_time() const { return end_time_; }
+  const std::string& sql_stmt() const { return query_ctx_.client_request.stmt; }
+  const TQueryOptions& query_options() const {
+    return query_ctx_.client_request.query_options;
+  }
+  /// Returns 0:0 if this is a root query
+  TUniqueId parent_query_id() const { return query_ctx_.parent_query_id; }
+
+  const std::vector<std::string>& GetAnalysisWarnings() const {
+    return exec_request_.analysis_warnings;
+  }
+
+  inline int64_t last_active_ms() const {
+    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+    return last_active_time_ms_;
+  }
+
+  /// Returns true if Impala is actively processing this query.
+  inline bool is_active() const {
+    boost::lock_guard<boost::mutex> l(expiration_data_lock_);
+    return ref_count_ > 0;
+  }
+
+  RuntimeProfile::EventSequence* query_events() const { return query_events_; }
+  RuntimeProfile* summary_profile() { return &summary_profile_; }
+
+ private:
+  const TQueryCtx query_ctx_;
+
+  /// Ensures single-threaded execution of FetchRows(). Callers of FetchRows() are
+  /// responsible for acquiring this lock. To avoid deadlocks, callers must not hold lock_
+  /// while acquiring this lock (since FetchRows() will release and re-acquire lock_ during
+  /// its execution).
+  /// See "Locking" in the class comment for lock acquisition order.
+  boost::mutex fetch_rows_lock_;
+
+  /// Protects last_active_time_ms_ and ref_count_. Only held during short function calls
+  /// - no other locks should be acquired while holding this lock.
+  mutable boost::mutex expiration_data_lock_;
+
+  /// Stores the last time that the query was actively doing work, in Unix milliseconds.
+  int64_t last_active_time_ms_;
+
+  /// ref_count_ > 0 if Impala is currently performing work on this query's behalf. Every
+  /// time a client instructs Impala to do work on behalf of this query, the ref count is
+  /// increased, and decreased once that work is completed.
+  uint32_t ref_count_;
+
+  /// Executor for any child queries (e.g. compute stats subqueries). Always non-NULL.
+  const boost::scoped_ptr<ChildQueryExecutor> child_query_executor_;
+
+  /// Protects all following fields. Acquirers should be careful not to hold it for too
+  /// long, e.g. during RPCs because this lock is required to make progress on various
+  /// ImpalaServer requests. If held for too long it can block progress of client
+  /// requests for this query, e.g. query status and cancellation. Furthermore, until
+  /// IMPALA-3882 is fixed, it can indirectly block progress on all other queries.
+  /// See "Locking" in the class comment for lock acquisition order.
+  boost::mutex lock_;
+
+  /// TODO: remove and use ExecEnv::GetInstance() instead
+  ExecEnv* exec_env_;
+
+  /// Thread for asynchronously running Wait().
+  boost::scoped_ptr<Thread> wait_thread_;
+
+  /// Condition variable to make BlockOnWait() thread-safe. One thread joins
+  /// wait_thread_, and all other threads block on this cv. Used with lock_.
+  boost::condition_variable block_on_wait_cv_;
+
+  /// Used in conjunction with block_on_wait_cv_ to make BlockOnWait() thread-safe.
+  bool is_block_on_wait_joining_;
+
+  /// Session that this query is from
+  std::shared_ptr<ImpalaServer::SessionState> session_;
+
+  /// Resource assignment determined by scheduler. Owned by obj_pool_.
+  boost::scoped_ptr<QuerySchedule> schedule_;
+
+  /// Not set for ddl queries.
+  boost::scoped_ptr<Coordinator> coord_;
+
+  /// Runs statements that query or modify the catalog via the CatalogService.
+  boost::scoped_ptr<CatalogOpExecutor> catalog_op_executor_;
+
+  /// Result set used for requests that return results and are not QUERY
+  /// statements. For example, EXPLAIN, LOAD, and SHOW use this.
+  boost::scoped_ptr<std::vector<TResultRow>> request_result_set_;
+
+  /// Cache of the first result_cache_max_size_ query results to allow clients to restart
+  /// fetching from the beginning of the result set. This cache is appended to in
+  /// FetchInternal(), and set to NULL if its bound is exceeded. If the bound is exceeded,
+  /// then clients cannot restart fetching because some results have been lost since the
+  /// last fetch. Only set if result_cache_max_size_ > 0.
+  boost::scoped_ptr<QueryResultSet> result_cache_;
+
+  /// Max size of the result_cache_ in number of rows. A value <= 0 means no caching.
+  int64_t result_cache_max_size_;
+
+  ObjectPool profile_pool_;
+
+  /// The ClientRequestState builds three separate profiles.
+  /// * profile_ is the top-level profile which houses the other
+  ///   profiles, plus the query timeline
+  /// * summary_profile_ contains mostly static information about the
+  ///   query, including the query statement, the plan and the user who submitted it.
+  /// * server_profile_ tracks time spent inside the ImpalaServer,
+  ///   but not inside fragment execution, i.e. the time taken to
+  ///   register and set-up the query and for rows to be fetched.
+  //
+  /// There's a fourth profile which is not built here (but is a
+  /// child of profile_); the execution profile which tracks the
+  /// actual fragment execution.
+  RuntimeProfile profile_;
+  RuntimeProfile server_profile_;
+  RuntimeProfile summary_profile_;
+  RuntimeProfile::Counter* row_materialization_timer_;
+
+  /// Tracks how long we are idle waiting for a client to fetch rows.
+  RuntimeProfile::Counter* client_wait_timer_;
+  /// Timer to track idle time for the above counter.
+  MonotonicStopWatch client_wait_sw_;
+
+  RuntimeProfile::EventSequence* query_events_;
+
+  bool is_cancelled_; // if true, Cancel() was called.
+  bool eos_;  // if true, there are no more rows to return
+  // We enforce the invariant that query_status_ is not OK iff query_state_
+  // is EXCEPTION, given that lock_ is held.
+  beeswax::QueryState::type query_state_;
+  Status query_status_;
+  TExecRequest exec_request_;
+
+  TResultSetMetadata result_metadata_; // metadata for select query
+  RowBatch* current_batch_; // the current row batch; only applicable if coord is set
+  int current_batch_row_; // number of rows fetched within the current batch
+  int num_rows_fetched_; // number of rows fetched by client for the entire query
+
+  /// True if a fetch was attempted by a client, regardless of whether a result set
+  /// (or error) was returned to the client.
+  bool fetched_rows_;
+
+  /// To get access to UpdateCatalog, LOAD, and DDL methods. Not owned.
+  Frontend* frontend_;
+
+  /// The parent ImpalaServer; called to wait until the the impalad has processed a
+  /// catalog update request. Not owned.
+  ImpalaServer* parent_server_;
+
+  /// Start/end time of the query
+  TimestampValue start_time_, end_time_;
+
+  /// Executes a local catalog operation (an operation that does not need to execute
+  /// against the catalog service). Includes USE, SHOW, DESCRIBE, and EXPLAIN statements.
+  Status ExecLocalCatalogOp(const TCatalogOpRequest& catalog_op) WARN_UNUSED_RESULT;
+
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently not
+  /// doing any work. Takes expiration_data_lock_.
+  void MarkInactive();
+
+  /// Updates last_active_time_ms_ and ref_count_ to reflect that query is currently being
+  /// actively processed. Takes expiration_data_lock_.
+  void MarkActive();
+
+  /// Core logic of initiating a query or dml execution request.
+  /// Initiates execution of plan fragments, if there are any, and sets
+  /// up the output exprs for subsequent calls to FetchRows().
+  /// 'coord_' is only valid after this method is called, and may be invalid if it
+  /// returns an error.
+  /// Also sets up profile and pre-execution counters.
+  /// Non-blocking.
+  Status ExecQueryOrDmlRequest(const TQueryExecRequest& query_exec_request)
+      WARN_UNUSED_RESULT;
+
+  /// Core logic of executing a ddl statement. May internally initiate execution of
+  /// queries (e.g., compute stats) or dml (e.g., create table as select)
+  Status ExecDdlRequest() WARN_UNUSED_RESULT;
+
+  /// Executes a LOAD DATA
+  Status ExecLoadDataRequest() WARN_UNUSED_RESULT;
+
+  /// Core logic of Wait(). Does not update query_state_/status_.
+  Status WaitInternal() WARN_UNUSED_RESULT;
+
+  /// Core logic of FetchRows(). Does not update query_state_/status_.
+  /// Caller needs to hold fetch_rows_lock_ and lock_.
+  Status FetchRowsInternal(const int32_t max_rows, QueryResultSet* fetched_rows)
+      WARN_UNUSED_RESULT;
+
+  /// Evaluates 'output_expr_ctxs_' against 'row' and output the evaluated row in
+  /// 'result'. The values' scales (# of digits after decimal) are stored in 'scales'.
+  /// result and scales must have been resized to the number of columns before call.
+  Status GetRowValue(TupleRow* row, std::vector<void*>* result, std::vector<int>* scales)
+      WARN_UNUSED_RESULT;
+
+  /// Gather and publish all required updates to the metastore
+  Status UpdateCatalog() WARN_UNUSED_RESULT;
+
+  /// Copies results into request_result_set_
+  /// TODO: Have the FE return list<Data.TResultRow> so that this isn't necessary
+  void SetResultSet(const TDdlExecResponse* ddl_resp);
+  void SetResultSet(const std::vector<std::string>& results);
+  void SetResultSet(const std::vector<std::string>& col1,
+      const std::vector<std::string>& col2);
+  void SetResultSet(const std::vector<std::string>& col1,
+      const std::vector<std::string>& col2, const std::vector<std::string>& col3,
+      const std::vector<std::string>& col4);
+
+  /// Sets the result set for a CREATE TABLE AS SELECT statement. The results will not be
+  /// ready until all BEs complete execution. This can be called as part of Wait(),
+  /// at which point results will be avilable.
+  void SetCreateTableAsSelectResultSet();
+
+  /// Updates the metastore's table and column statistics based on the child-query results
+  /// of a compute stats command.
+  /// TODO: Unify the various ways that the Metastore is updated for DDL/DML.
+  /// For example, INSERT queries update partition metadata in UpdateCatalog() using a
+  /// TUpdateCatalogRequest, whereas our DDL uses a TCatalogOpRequest for very similar
+  /// purposes. Perhaps INSERT should use a TCatalogOpRequest as well.
+  Status UpdateTableAndColumnStats(const std::vector<ChildQuery*>& child_queries)
+      WARN_UNUSED_RESULT;
+
+  /// Sets result_cache_ to NULL and updates its associated metrics and mem consumption.
+  /// This function is a no-op if the cache has already been cleared.
+  void ClearResultCache();
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/fe-support.cc
----------------------------------------------------------------------
diff --git a/be/src/service/fe-support.cc b/be/src/service/fe-support.cc
index 8f52352..bb35089 100644
--- a/be/src/service/fe-support.cc
+++ b/be/src/service/fe-support.cc
@@ -103,10 +103,11 @@ Java_org_apache_impala_service_FeSupport_NativeEvalExprsWithoutRow(
   // Allow logging of at least one error, so we can detect and convert it into a
   // Java exception.
   query_ctx.client_request.query_options.max_errors = 1;
-
   // Track memory against a dummy "fe-eval-exprs" resource pool - we don't
   // know what resource pool the query has been assigned to yet.
-  RuntimeState state(query_ctx, ExecEnv::GetInstance(), "fe-eval-exprs");
+  query_ctx.request_pool = "fe-eval-exprs";
+
+  RuntimeState state(query_ctx, ExecEnv::GetInstance());
   // Make sure to close the runtime state no matter how this scope is exited.
   const auto close_runtime_state =
       MakeScopeExitTrigger([&state]() { state.ReleaseResources(); });

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/368115cd/be/src/service/impala-beeswax-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index bd4ee67..67ecc79 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -25,7 +25,7 @@
 #include "runtime/exec-env.h"
 #include "runtime/raw-value.inline.h"
 #include "runtime/timestamp-value.h"
-#include "service/query-exec-state.h"
+#include "service/client-request-state.h"
 #include "service/query-options.h"
 #include "service/query-result-set.h"
 #include "util/impalad-metrics.h"
@@ -64,22 +64,22 @@ void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
 
   // raise Syntax error or access violation; it's likely to be syntax/analysis error
   // TODO: that may not be true; fix this
-  shared_ptr<QueryExecState> exec_state;
-  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
+  shared_ptr<ClientRequestState> request_state;
+  RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+  request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // start thread to wait for results to become available, which will allow
   // us to advance query state to FINISHED or EXCEPTION
-  exec_state->WaitAsync();
+  request_state->WaitAsync();
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
-  Status status = SetQueryInflight(session, exec_state);
+  Status status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &status);
+    (void) UnregisterQuery(request_state->query_id(), false, &status);
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
-  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
+  TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
 }
 
 void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
@@ -94,7 +94,7 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
   // raise general error for request conversion error;
   RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
 
-  shared_ptr<QueryExecState> exec_state;
+  shared_ptr<ClientRequestState> request_state;
   DCHECK(session != NULL);  // The session should exist.
   {
     // The session is created when the client connects. Depending on the underlying
@@ -106,27 +106,27 @@ void ImpalaServer::executeAndWait(QueryHandle& query_handle, const Query& query,
 
   // raise Syntax error or access violation; it's likely to be syntax/analysis error
   // TODO: that may not be true; fix this
-  RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
+  RAISE_IF_ERROR(Execute(&query_ctx, session, &request_state),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
-  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
+  request_state->UpdateNonErrorQueryState(beeswax::QueryState::RUNNING);
   // Once the query is running do a final check for session closure and add it to the
   // set of in-flight queries.
-  Status status = SetQueryInflight(session, exec_state);
+  Status status = SetQueryInflight(session, request_state);
   if (!status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &status);
+    (void) UnregisterQuery(request_state->query_id(), false, &status);
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
   // block until results are ready
-  exec_state->Wait();
-  status = exec_state->query_status();
+  request_state->Wait();
+  status = request_state->query_status();
   if (!status.ok()) {
-    UnregisterQuery(exec_state->query_id(), false, &status);
+    (void) UnregisterQuery(request_state->query_id(), false, &status);
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 
-  exec_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
-  TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
+  request_state->UpdateNonErrorQueryState(beeswax::QueryState::FINISHED);
+  TUniqueIdToQueryHandle(request_state->query_id(), &query_handle);
 
   // If the input log context id is an empty string, then create a new number and
   // set it to _return. Otherwise, set _return with the input log context
@@ -172,7 +172,7 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
   VLOG_ROW << "fetch result: #results=" << query_results.data.size()
            << " has_more=" << (query_results.has_more ? "true" : "false");
   if (!status.ok()) {
-    UnregisterQuery(query_id, false, &status);
+    (void) UnregisterQuery(query_id, false, &status);
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
 }
@@ -188,18 +188,18 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_QUERY << "get_results_metadata(): query_id=" << PrintId(query_id);
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (UNLIKELY(exec_state.get() == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+  if (UNLIKELY(request_state.get() == nullptr)) {
     RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
       SQLSTATE_GENERAL_ERROR);
   }
 
   {
-    // make sure we release the lock on exec_state if we see any error
-    lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
+    // make sure we release the lock on request_state if we see any error
+    lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
 
     // Convert TResultSetMetadata to Beeswax.ResultsMetadata
-    const TResultSetMetadata* result_set_md = exec_state->result_metadata();
+    const TResultSetMetadata* result_set_md = request_state->result_metadata();
     results_metadata.__isset.schema = true;
     results_metadata.schema.__isset.fieldSchemas = true;
     results_metadata.schema.fieldSchemas.resize(result_set_md->columns.size());
@@ -232,7 +232,7 @@ void ImpalaServer::close(const QueryHandle& handle) {
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
   // TODO: do we need to raise an exception if the query state is EXCEPTION?
-  // TODO: use timeout to get rid of unwanted exec_state.
+  // TODO: use timeout to get rid of unwanted request_state.
   RAISE_IF_ERROR(UnregisterQuery(query_id, true), SQLSTATE_GENERAL_ERROR);
 }
 
@@ -244,9 +244,9 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
 
-  lock_guard<mutex> l(query_exec_state_map_lock_);
-  QueryExecStateMap::iterator entry = query_exec_state_map_.find(query_id);
-  if (entry != query_exec_state_map_.end()) {
+  lock_guard<mutex> l(client_request_state_map_lock_);
+  ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
+  if (entry != client_request_state_map_.end()) {
     return entry->second->query_state();
   } else {
     VLOG_QUERY << "ImpalaServer::get_state invalid handle";
@@ -277,8 +277,8 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
 
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (exec_state.get() == NULL) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (request_state.get() == NULL) {
     stringstream str;
     str << "unknown query id: " << query_id;
     LOG(ERROR) << str.str();
@@ -286,17 +286,17 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
   }
   stringstream error_log_ss;
   // If the query status is !ok, include the status error message at the top of the log.
-  if (!exec_state->query_status().ok()) {
-    error_log_ss << exec_state->query_status().GetDetail() << "\n";
+  if (!request_state->query_status().ok()) {
+    error_log_ss << request_state->query_status().GetDetail() << "\n";
   }
 
   // Add warnings from analysis
-  error_log_ss << join(exec_state->GetAnalysisWarnings(), "\n");
+  error_log_ss << join(request_state->GetAnalysisWarnings(), "\n");
 
   // Add warnings from execution
-  if (exec_state->coord() != NULL) {
-    if (!exec_state->query_status().ok()) error_log_ss << "\n\n";
-    error_log_ss << exec_state->coord()->GetErrorLog();
+  if (request_state->coord() != NULL) {
+    if (!request_state->query_status().ok()) error_log_ss << "\n\n";
+    error_log_ss << request_state->coord()->GetErrorLog();
   }
   log = error_log_ss.str();
 }
@@ -455,30 +455,31 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
 
 Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
     const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, false);
-  if (UNLIKELY(exec_state == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, false);
+  if (UNLIKELY(request_state == nullptr)) {
     return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
   }
 
-  // Make sure QueryExecState::Wait() has completed before fetching rows. Wait() ensures
-  // that rows are ready to be fetched (e.g., Wait() opens QueryExecState::output_exprs_,
-  // which are evaluated in QueryExecState::FetchRows() below).
-  exec_state->BlockOnWait();
+  // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
+  // ensures that rows are ready to be fetched (e.g., Wait() opens
+  // ClientRequestState::output_exprs_, which are evaluated in
+  // ClientRequestState::FetchRows() below).
+  request_state->BlockOnWait();
 
-  lock_guard<mutex> frl(*exec_state->fetch_rows_lock());
-  lock_guard<mutex> l(*exec_state->lock());
+  lock_guard<mutex> frl(*request_state->fetch_rows_lock());
+  lock_guard<mutex> l(*request_state->lock());
 
-  if (exec_state->num_rows_fetched() == 0) {
-    exec_state->query_events()->MarkEvent("First row fetched");
-    exec_state->set_fetched_rows();
+  if (request_state->num_rows_fetched() == 0) {
+    request_state->query_events()->MarkEvent("First row fetched");
+    request_state->set_fetched_rows();
   }
 
   // Check for cancellation or an error.
-  RETURN_IF_ERROR(exec_state->query_status());
+  RETURN_IF_ERROR(request_state->query_status());
 
   // ODBC-190: set Beeswax's Results.columns to work around bug ODBC-190;
   // TODO: remove the block of code when ODBC-190 is resolved.
-  const TResultSetMetadata* result_metadata = exec_state->result_metadata();
+  const TResultSetMetadata* result_metadata = request_state->result_metadata();
   query_results->columns.resize(result_metadata->columns.size());
   for (int i = 0; i < result_metadata->columns.size(); ++i) {
     // TODO: As of today, the ODBC driver does not support boolean and timestamp data
@@ -498,16 +499,16 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
   query_results->__set_ready(true);
   // It's likely that ODBC doesn't care about start_row, but Hue needs it. For Hue,
   // start_row starts from zero, not one.
-  query_results->__set_start_row(exec_state->num_rows_fetched());
+  query_results->__set_start_row(request_state->num_rows_fetched());
 
   Status fetch_rows_status;
   query_results->data.clear();
-  if (!exec_state->eos()) {
+  if (!request_state->eos()) {
     scoped_ptr<QueryResultSet> result_set(QueryResultSet::CreateAsciiQueryResultSet(
-        *exec_state->result_metadata(), &query_results->data));
-    fetch_rows_status = exec_state->FetchRows(fetch_size, result_set.get());
+        *request_state->result_metadata(), &query_results->data));
+    fetch_rows_status = request_state->FetchRows(fetch_size, result_set.get());
   }
-  query_results->__set_has_more(!exec_state->eos());
+  query_results->__set_has_more(!request_state->eos());
   query_results->__isset.data = true;
 
   return fetch_rows_status;
@@ -515,15 +516,15 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
 
 Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
     TInsertResult* insert_result) {
-  shared_ptr<QueryExecState> exec_state = GetQueryExecState(query_id, true);
-  if (UNLIKELY(exec_state == nullptr)) {
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id, true);
+  if (UNLIKELY(request_state == nullptr)) {
     return Status(Substitute("Invalid query handle: $0", PrintId(query_id)));
   }
 
   Status query_status;
   {
-    lock_guard<mutex> l(*exec_state->lock(), adopt_lock_t());
-    query_status = exec_state->query_status();
+    lock_guard<mutex> l(*request_state->lock(), adopt_lock_t());
+    query_status = request_state->query_status();
     if (query_status.ok()) {
       // Coord may be NULL for a SELECT with LIMIT 0.
       // Note that when IMPALA-87 is fixed (INSERT without FROM clause) we might
@@ -531,9 +532,9 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
       // coordinator, depending on how we choose to drive the table sink.
       int64_t num_row_errors = 0;
       bool has_kudu_stats = false;
-      if (exec_state->coord() != NULL) {
+      if (request_state->coord() != NULL) {
         for (const PartitionStatusMap::value_type& v:
-             exec_state->coord()->per_partition_status()) {
+             request_state->coord()->per_partition_status()) {
           const pair<string, TInsertPartitionStatus> partition_status = v;
           insert_result->rows_modified[partition_status.first] =
               partition_status.second.num_modified_rows;