You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/03/13 02:24:28 UTC

[impala] 04/08: IMPALA-10535: Add interface to ImpalaServer for execution of externally compiled statements

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 311938b4f500aeb26f5a42cd955231588821e18b
Author: Kurt Deschler <kd...@cloudera.com>
AuthorDate: Fri Nov 8 15:45:45 2019 -0500

    IMPALA-10535: Add interface to ImpalaServer for execution of externally compiled statements
    
    The ExecutePlannedStatement interface allows an externally supplied
    TExecRequest to be executed by impalad. The TExecRequest must be fully
    populated and will be sent directly to the backend for execution.
    
    The following fields in the TExecRequest are updated by the coordinator:
    - Hostname
    - KRPC address
    - Local Timezone
    
    In order to add the interface to ImpalaInternalService.thrift, several of
    the thrift classes were moved to Query.thrift to avoid a circular
    dependency with Frontend.thrift.
    
    Added functionality to format and dump TExecRequest structures to path
    specified in debug flag dump_exec_request_path.
    
    A start timestamp field has been added to TExecRequest to represent the
    interval in the query profile between when the request was sent by the
    external frontend and handled by the backend.
    
    A local timestamp field has been added to the Ping result struct to
    return the current backend timestamp. This is used by the external to
    frontend to populate the start timestamp.
    
    Also included is a change to avoid generating silent AnalysisExceptions
    during table resolution.
    
    Tested with TExecRequest structures populated by external frontend.
    Local timezone change tested withe INT64 TIMESTAMP datatype
    
    Reviewed-by: John Sherman <jf...@cloudera.com>
    Change-Id: Iace716dd67290f08441857dc02d2428b0e335eaa
    Reviewed-on: http://gerrit.cloudera.org:8080/17104
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Thomas Tauber-Marshall <tm...@cloudera.com>
---
 be/generated-sources/gen-cpp/CMakeLists.txt        |   2 +
 be/src/rpc/hs2-http-test.cc                        |   2 +
 be/src/runtime/debug-options.h                     |   1 +
 be/src/runtime/query-driver.cc                     |  25 +
 be/src/runtime/query-driver.h                      |   3 +
 be/src/service/client-request-state.cc             |   4 +
 be/src/service/client-request-state.h              |   4 +
 be/src/service/impala-beeswax-server.cc            |   4 +-
 be/src/service/impala-hs2-server.cc                |  32 +-
 be/src/service/impala-server.cc                    |  74 ++-
 be/src/service/impala-server.h                     |  21 +-
 common/thrift/CMakeLists.txt                       |  12 +-
 common/thrift/Frontend.thrift                      | 129 +----
 common/thrift/ImpalaInternalService.thrift         | 619 +--------------------
 common/thrift/ImpalaService.thrift                 |  21 +
 .../{ImpalaInternalService.thrift => Query.thrift} | 311 ++++-------
 fe/pom.xml                                         |   5 +
 .../java/org/apache/impala/analysis/Analyzer.java  |   9 +-
 .../org/apache/impala/analysis/PrivilegeSpec.java  |   2 +-
 .../apache/impala/analysis/ResetMetadataStmt.java  |   3 +-
 .../org/apache/impala/planner/PlannerTestBase.java |   3 +-
 21 files changed, 330 insertions(+), 956 deletions(-)

diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt
index 271dcb7..074dfae 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -84,6 +84,8 @@ set(SRC_FILES
   Planner_types.cpp
   parquet_constants.cpp
   parquet_types.cpp
+  Query_constants.cpp
+  Query_types.cpp
   ResourceProfile_constants.cpp
   ResourceProfile_types.cpp
   RuntimeProfile_constants.cpp
diff --git a/be/src/rpc/hs2-http-test.cc b/be/src/rpc/hs2-http-test.cc
index d81c9d9..2c95550 100644
--- a/be/src/rpc/hs2-http-test.cc
+++ b/be/src/rpc/hs2-http-test.cc
@@ -48,6 +48,8 @@ class TestHS2Service : public ImpalaHiveServer2ServiceIf {
   virtual void GetInfo(TGetInfoResp& _return, const TGetInfoReq& req) {}
   virtual void ExecuteStatement(
       TExecuteStatementResp& _return, const TExecuteStatementReq& req) {}
+  virtual void ExecutePlannedStatement(
+      TExecuteStatementResp& _return, const TExecutePlannedStatementReq& req) {}
   virtual void GetTypeInfo(TGetTypeInfoResp& _return, const TGetTypeInfoReq& req) {}
   virtual void GetCatalogs(TGetCatalogsResp& _return, const TGetCatalogsReq& req) {}
   virtual void GetSchemas(TGetSchemasResp& _return, const TGetSchemasReq& req) {}
diff --git a/be/src/runtime/debug-options.h b/be/src/runtime/debug-options.h
index 2b76f5c..4b723b3 100644
--- a/be/src/runtime/debug-options.h
+++ b/be/src/runtime/debug-options.h
@@ -23,6 +23,7 @@
 
 #include "gen-cpp/Frontend_types.h"
 #include "gen-cpp/Types_types.h"
+#include "gen-cpp/ImpalaInternalService_types.h"
 
 namespace impala {
 
diff --git a/be/src/runtime/query-driver.cc b/be/src/runtime/query-driver.cc
index 604054c..983d599 100644
--- a/be/src/runtime/query-driver.cc
+++ b/be/src/runtime/query-driver.cc
@@ -23,6 +23,7 @@
 #include "service/frontend.h"
 #include "service/impala-server.h"
 #include "util/debug-util.h"
+#include "util/network-util.h"
 #include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
@@ -63,6 +64,30 @@ Status QueryDriver::RunFrontendPlanner(const TQueryCtx& query_ctx) {
   return Status::OK();
 }
 
+Status QueryDriver::SetExternalPlan(
+    const TQueryCtx& query_ctx, const TExecRequest& external_exec_request) {
+  // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
+  // this query.
+  DCHECK(client_request_state_ != nullptr);
+  DCHECK(exec_request_ != nullptr);
+  RETURN_IF_ERROR(
+      DebugAction(query_ctx.client_request.query_options, "FRONTEND_PLANNER"));
+  *exec_request_.get() = external_exec_request;
+  // Update query_id in the external request
+  exec_request_->query_exec_request.query_ctx.__set_query_id(
+      client_request_state_->query_id());
+  // Update coordinator related internal addresses in the external request
+  exec_request_->query_exec_request.query_ctx.__set_coord_hostname(
+      ExecEnv::GetInstance()->configured_backend_address().hostname);
+  const TNetworkAddress& address = ExecEnv::GetInstance()->krpc_address();
+  DCHECK(IsResolvedAddress(address));
+  exec_request_->query_exec_request.query_ctx.__set_coord_ip_address(address);
+  // Update local_time_zone in the external request
+  exec_request_->query_exec_request.query_ctx.__set_local_time_zone(
+      query_ctx.local_time_zone);
+  return Status::OK();
+}
+
 ClientRequestState* QueryDriver::GetActiveClientRequestState() {
   lock_guard<SpinLock> l(client_request_state_lock_);
   if (retried_client_request_state_ != nullptr) {
diff --git a/be/src/runtime/query-driver.h b/be/src/runtime/query-driver.h
index bc6264a..188b4da 100644
--- a/be/src/runtime/query-driver.h
+++ b/be/src/runtime/query-driver.h
@@ -149,6 +149,9 @@ class QueryDriver {
   /// query string (TQueryCtx::TClientRequest::stmt).
   Status RunFrontendPlanner(const TQueryCtx& query_ctx) WARN_UNUSED_RESULT;
 
+  /// Similar to RunFrontendPlanner but takes TExecRequest from and external planner
+  Status SetExternalPlan(const TQueryCtx& query_ctx, const TExecRequest& exec_request);
+
   /// Returns the ClientRequestState corresponding to the given query id.
   ClientRequestState* GetClientRequestState(const TUniqueId& query_id);
 
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index 7f455a3..23b446f 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -186,6 +186,10 @@ Status ClientRequestState::SetResultCache(QueryResultSet* cache,
   return Status::OK();
 }
 
+void ClientRequestState::SetRemoteSubmitTime(int64_t remote_submit_time) {
+  query_events_->Start(remote_submit_time);
+}
+
 void ClientRequestState::SetFrontendProfile(TRuntimeProfileNode profile) {
   // Should we defer creating and adding the child until here? probably.
   TRuntimeProfileTree prof_tree;
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 0c7bb6a..c2dd271 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -90,6 +90,10 @@ class ClientRequestState {
   /// which then sets the frontend profile.
   void SetFrontendProfile(TRuntimeProfileNode profile);
 
+  /// Sets the coordinator time that the plan request was submitted at so that
+  /// the backend timeline starts where the frontend timeline ends
+  void SetRemoteSubmitTime(int64_t remote_submit_time);
+
   /// Based on query type, this either initiates execution of this ClientRequestState's
   /// TExecRequest or submits the query to the Admission controller for asynchronous
   /// admission control. When this returns the operation state is either RUNNING_STATE or
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 7eb0094..79a500f 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -70,7 +70,7 @@ void ImpalaServer::query(beeswax::QueryHandle& beeswax_handle, const Query& quer
   // raise Syntax error or access violation; it's likely to be syntax/analysis error
   // TODO: that may not be true; fix this
   QueryHandle query_handle;
-  RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle),
+  RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
   // start thread to wait for results to become available, which will allow
@@ -118,7 +118,7 @@ void ImpalaServer::executeAndWait(beeswax::QueryHandle& beeswax_handle,
   // raise Syntax error or access violation; it's likely to be syntax/analysis error
   // TODO: that may not be true; fix this
   QueryHandle query_handle;
-  RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle),
+  RAISE_IF_ERROR(Execute(&query_ctx, session, &query_handle, nullptr),
       SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
 
   // Once the query is running do a final check for session closure and add it to the
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index b38a254..debd755 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -52,6 +52,7 @@
 #include "util/metrics.h"
 #include "util/pretty-printer.h"
 #include "util/runtime-profile-counters.h"
+#include "util/stopwatch.h"
 #include "util/string-parser.h"
 #include "util/uid-util.h"
 
@@ -453,9 +454,8 @@ void ImpalaServer::GetInfo(TGetInfoResp& return_val,
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
-void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
-    const TExecuteStatementReq& request) {
-  VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request);
+void ImpalaServer::ExecuteStatementCommon(TExecuteStatementResp& return_val,
+    const TExecuteStatementReq& request, const TExecRequest* external_exec_request) {
   HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
   // We ignore the runAsync flag here: Impala's queries will always run asynchronously,
   // and will block on fetch. To the client, this looks like Hive's synchronous mode; the
@@ -497,7 +497,7 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
   }
 
   QueryHandle query_handle;
-  status = Execute(&query_ctx, session, &query_handle);
+  status = Execute(&query_ctx, session, &query_handle, external_exec_request);
   HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
 
   // Start thread to wait for results to become available.
@@ -541,6 +541,29 @@ Status ImpalaServer::SetupResultsCacheing(const QueryHandle& query_handle,
   return Status::OK();
 }
 
+void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
+    const TExecuteStatementReq& request) {
+  VLOG_QUERY << "ExecuteStatement(): request=" << ThriftDebugString(request);
+  ExecuteStatementCommon(return_val, request);
+}
+
+void ImpalaServer::ExecutePlannedStatement(
+      TExecuteStatementResp& return_val,
+      const TExecutePlannedStatementReq& request) {
+  VLOG_QUERY << "ExecutePlannedStatement(): request=" << ThriftDebugString(request);
+  const ThriftServer::ConnectionContext* connection_context =
+      ThriftServer::GetThreadConnectionContext();
+  // This RPC is only supported on the external frontend service and should only be
+  // exposed to trusted clients since it executes provided query plans directly with
+  // no authorization checks (they are assumed to have been done by the trusted client)
+  if (connection_context->server_name != EXTERNAL_FRONTEND_SERVER_NAME) {
+    HS2_RETURN_ERROR(return_val, "Unsupported operation",
+        SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
+  }
+  ExecuteStatementCommon(return_val, request.statementReq, &request.plan);
+}
+
+
 void ImpalaServer::GetTypeInfo(TGetTypeInfoResp& return_val,
     const TGetTypeInfoReq& request) {
   VLOG_QUERY << "GetTypeInfo(): request=" << ThriftDebugString(request);
@@ -1206,6 +1229,7 @@ void ImpalaServer::PingImpalaHS2Service(TPingImpalaHS2ServiceResp& return_val,
   } else {
     return_val.__set_webserver_address("");
   }
+  return_val.__set_timestamp(MonotonicStopWatch::Now());
   VLOG_RPC << "PingImpalaHS2Service(): return_val=" << ThriftDebugString(return_val);
 }
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 0e8bfe2..c11e2e8 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -21,6 +21,9 @@
 #include <unistd.h>
 #include <algorithm>
 #include <exception>
+#include <fstream>
+#include <sstream>
+
 #include <boost/algorithm/string.hpp>
 #include <boost/algorithm/string/join.hpp>
 #include <boost/algorithm/string/replace.hpp>
@@ -313,6 +316,11 @@ DEFINE_int32(query_event_hook_nthreads, 1, "Number of threads to use for "
     "QueryEventHook execution. If this number is >1 then hooks will execute "
     "concurrently.");
 
+// Dumps used for debugging and diffing ExecRequests in text form.
+DEFINE_string(dump_exec_request_path, "",
+    "If set, dump TExecRequest structures to {dump_exec_request_path}/"
+    "TExecRequest-{internal|external}.{query_id.hi}-{query_id.lo}");
+
 DECLARE_bool(compact_catalog_topic);
 
 DEFINE_bool(use_local_tz_for_unix_timestamp_conversions, false,
@@ -1084,7 +1092,8 @@ void ImpalaServer::EnforceMaxMtDop(TQueryCtx* query_ctx, int64_t max_mt_dop) {
 }
 
 Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> session_state,
-    QueryHandle* query_handle) {
+    QueryHandle* query_handle,
+    const TExecRequest* external_exec_request) {
   PrepareQueryContext(query_ctx);
   ScopedThreadContext debug_ctx(GetThreadDebugInfo(), query_ctx->query_id);
   ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
@@ -1095,26 +1104,57 @@ Status ImpalaServer::Execute(TQueryCtx* query_ctx, shared_ptr<SessionState> sess
   query_ctx->client_request.__set_redacted_stmt((const string) stmt);
 
   bool registered_query = false;
-  Status status = ExecuteInternal(*query_ctx, session_state, &registered_query,
-      query_handle);
+  Status status = ExecuteInternal(*query_ctx, external_exec_request, session_state,
+      &registered_query, query_handle);
   if (!status.ok() && registered_query) {
     UnregisterQueryDiscardResult((*query_handle)->query_id(), false, &status);
   }
   return status;
 }
 
+void DumpTExecReq(const TExecRequest& exec_request, const char* dump_type,
+    const TUniqueId& query_id) {
+  if (FLAGS_dump_exec_request_path.empty()) return;
+  int depth = 0;
+  std::stringstream tmpstr;
+  string fn(Substitute("$1/TExecRequest-$2.$3", FLAGS_dump_exec_request_path,
+      dump_type, PrintId(query_id, "-")));
+  std::ofstream ofs(fn);
+  tmpstr << exec_request;
+  const int len = tmpstr.str().length();
+  const char *p = tmpstr.str().c_str();
+  for (int i = 0; i < len; ++i) {
+    const char ch = p[i];
+    ofs << ch;
+    if (ch == '(') {
+      depth++;
+    } else if (ch == ')' && depth > 0) {
+      depth--;
+    } else if (ch == ',') {
+    } else {
+      continue;
+    }
+    ofs << '\n' << std::setw(depth) << " ";
+  }
+}
+
 Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
-    shared_ptr<SessionState> session_state, bool* registered_query,
-    QueryHandle* query_handle) {
+    const TExecRequest* external_exec_request, shared_ptr<SessionState> session_state,
+    bool* registered_query, QueryHandle* query_handle) {
   DCHECK(session_state != nullptr);
   DCHECK(query_handle != nullptr);
   DCHECK(registered_query != nullptr);
   *registered_query = false;
-
   // Create the QueryDriver for this query. CreateNewDriver creates the associated
   // ClientRequestState as well.
   QueryDriver::CreateNewDriver(this, query_handle, query_ctx, session_state);
 
+  bool is_external_req = external_exec_request != nullptr;
+
+  if (is_external_req && external_exec_request->remote_submit_time) {
+    (*query_handle)->SetRemoteSubmitTime(external_exec_request->remote_submit_time);
+  }
+
   (*query_handle)->query_events()->MarkEvent("Query submitted");
 
   {
@@ -1144,9 +1184,25 @@ Status ImpalaServer::ExecuteInternal(const TQueryCtx& query_ctx,
           statement_length, max_statement_length));
     }
 
-    // Takes the TQueryCtx and calls into the frontend to initialize the TExecRequest for
-    // this query.
-    RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
+    Status exec_status = Status::OK();
+    TUniqueId query_id = (*query_handle)->query_id();
+    // Generate TExecRequest here if one was not passed in or we want one
+    // from the Impala planner to compare with
+    if (external_exec_request == nullptr || !FLAGS_dump_exec_request_path.empty()) {
+      // Takes the TQueryCtx and calls into the frontend to initialize the
+      // TExecRequest for this query.
+      RETURN_IF_ERROR(query_handle->query_driver()->RunFrontendPlanner(query_ctx));
+      DumpTExecReq((*query_handle)->exec_request(), "internal", query_id);
+    }
+
+    if (external_exec_request != nullptr) {
+      // Use passed in exec_request
+      RETURN_IF_ERROR(query_handle->query_driver()->SetExternalPlan(
+          query_ctx, *external_exec_request));
+
+      exec_status = Status::OK();
+      DumpTExecReq((*query_handle)->exec_request(), "external", query_id);
+    }
 
     const TExecRequest& result = (*query_handle)->exec_request();
     (*query_handle)->query_events()->MarkEvent("Planning finished");
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index f280374..9a3250e 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -63,6 +63,7 @@ class CancellationWork;
 class ImpalaHttpHandler;
 class RowDescriptor;
 class TDmlResult;
+class TExecutePlannedStatementReq;
 class TNetworkAddress;
 class TClientRequest;
 class TExecRequest;
@@ -76,6 +77,7 @@ struct QueryHandle;
 class SimpleLogger;
 class UpdateFilterParamsPB;
 class UpdateFilterResultPB;
+class TQueryExecRequest;
 
 /// An ImpalaServer contains both frontend and backend functionality;
 /// it implements ImpalaService (Beeswax), ImpalaHiveServer2Service (HiveServer2)
@@ -345,6 +347,11 @@ class ImpalaServer : public ImpalaServiceIf,
   virtual void PingImpalaHS2Service(TPingImpalaHS2ServiceResp& return_val,
       const TPingImpalaHS2ServiceReq& req);
 
+  // Execute the provided Thrift statement/plan
+  virtual void ExecutePlannedStatement(
+      apache::hive::service::cli::thrift::TExecuteStatementResp& return_val,
+      const TExecutePlannedStatementReq& req);
+
   /// Closes an Impala operation and returns additional information about the closed
   /// operation.
   virtual void CloseImpalaOperation(
@@ -669,14 +676,26 @@ class ImpalaServer : public ImpalaServiceIf,
   /// query_driver->request_state will be NULL and nothing will have been registered in
   /// query_driver_map_. session_state is a ptr to the session running this query and must
   /// have been checked out.
+  /// external_exec_request is a statement that was prepared by an external frontend using
+  /// Impala PlanNodes or null if the external frontend isn't being used.
   Status Execute(TQueryCtx* query_ctx, std::shared_ptr<SessionState> session_state,
-      QueryHandle* query_handle) WARN_UNUSED_RESULT;
+      QueryHandle* query_handle,
+      const TExecRequest* external_exec_request) WARN_UNUSED_RESULT;
 
   /// Implements Execute() logic, but doesn't unregister query on error.
   Status ExecuteInternal(const TQueryCtx& query_ctx,
+      const TExecRequest* external_exec_request,
       std::shared_ptr<SessionState> session_state, bool* registered_query,
       QueryHandle* query_handle);
 
+  /// Common execution logic factored out from Execute to be shared with
+  /// ExecutePlannedStatement. Optional TExecRequest is the provided Thrift
+  /// request to execute instead of parsing and executing the SQL statement.
+  void ExecuteStatementCommon(
+      apache::hive::service::cli::thrift::TExecuteStatementResp& return_val,
+      const apache::hive::service::cli::thrift::TExecuteStatementReq& request,
+      const TExecRequest* external_exec_request = nullptr);
+
   /// Registers the query with query_driver_map_ using the globally unique query_id. The
   /// caller must have checked out the session state.
   Status RegisterQuery(const TUniqueId& query_id,
diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt
index 9b14ab5..a4fd209 100644
--- a/common/thrift/CMakeLists.txt
+++ b/common/thrift/CMakeLists.txt
@@ -64,12 +64,9 @@ function(THRIFT_GEN VAR)
       set(CPP_ARGS -r ${CPP_ARGS})
     ENDIF(THRIFT_FILE STREQUAL "beeswax.thrift")
 
-    IF (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift" OR
-        THRIFT_FILE STREQUAL "ImpalaService.thrift")
+    IF (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift")
       # Do not generate Java HiveServer2 and Parquet files because we should use the jar
       # from Hive or Parquet.
-      # Also do not generate ImpalaService.thrift because the generated code doesn't
-      # compile with hive if the thrift version in hive is 0.9.0
       add_custom_command(
         OUTPUT ${OUTPUT_BE_FILE}
         COMMAND ${THRIFT_COMPILER} ${CPP_ARGS} ${THRIFT_FILE}
@@ -78,8 +75,7 @@ function(THRIFT_GEN VAR)
         COMMENT "Running thrift compiler on ${THRIFT_FILE}"
         VERBATIM
       )
-    ELSE (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift" OR
-        THRIFT_FILE STREQUAL "ImpalaService.thrift")
+    ELSE (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift")
       add_custom_command(
         OUTPUT ${OUTPUT_BE_FILE}
         COMMAND ${THRIFT_COMPILER} ${CPP_ARGS} ${THRIFT_FILE}
@@ -89,8 +85,7 @@ function(THRIFT_GEN VAR)
         COMMENT "Running thrift compiler on ${THRIFT_FILE}"
         VERBATIM
     )
-    ENDIF (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift" OR
-        THRIFT_FILE STREQUAL "ImpalaService.thrift")
+    ENDIF (THRIFT_FILE STREQUAL ${TCLI_SERVICE_THRIFT} OR THRIFT_FILE STREQUAL "parquet.thrift")
   endforeach(THRIFT_FILE)
 
   set(${VAR} ${${VAR}} PARENT_SCOPE)
@@ -229,6 +224,7 @@ set (SRC_FILES
   Partitions.thrift
   parquet.thrift
   ResourceProfile.thrift
+  Query.thrift
   Results.thrift
   RuntimeProfile.thrift
   SqlConstraints.thrift
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index 5126958..62899bf 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -19,19 +19,16 @@ namespace cpp impala
 namespace java org.apache.impala.thrift
 
 include "Types.thrift"
-include "ImpalaInternalService.thrift"
-include "PlanNodes.thrift"
-include "Planner.thrift"
 include "RuntimeProfile.thrift"
 include "Descriptors.thrift"
 include "Data.thrift"
 include "Results.thrift"
-include "Exprs.thrift"
 include "TCLIService.thrift"
 include "Status.thrift"
 include "CatalogObjects.thrift"
 include "CatalogService.thrift"
 include "LineageGraph.thrift"
+include "Query.thrift"
 
 // These are supporting structs for JniFrontend.java, which serves as the glue
 // between our C++ execution environment and the Java frontend.
@@ -79,7 +76,7 @@ struct TGetTablesParams {
   // Session state for the user who initiated this request. If authorization is
   // enabled, only the tables this user has access to will be returned. If not
   // set, access checks will be skipped (used for internal Impala requests)
-  3: optional ImpalaInternalService.TSessionState session
+  3: optional Query.TSessionState session
 }
 
 // getTableNames returns a list of unqualified table names
@@ -126,7 +123,7 @@ struct TGetDbsParams {
   // Session state for the user who initiated this request. If authorization is
   // enabled, only the databases this user has access to will be returned. If not
   // set, access checks will be skipped (used for internal Impala requests)
-  2: optional ImpalaInternalService.TSessionState session
+  2: optional Query.TSessionState session
 }
 
 // getDbs returns a list of databases
@@ -188,7 +185,7 @@ struct TDescribeTableParams {
   3: optional Types.TColumnType result_struct
 
   // Session state for the user who initiated this request.
-  4: optional ImpalaInternalService.TSessionState session
+  4: optional Query.TSessionState session
 }
 
 // Results of a call to describeDb() and describeTable()
@@ -284,11 +281,6 @@ struct TShowRolesResult {
   1: required list<string> role_names
 }
 
-// Result of the DESCRIBE HISTORY command.
-struct TGetTableHistoryResult {
-  1: required list<TGetTableHistoryResultItem> result
-}
-
 // Represents one row in the DESCRIBE HISTORY command's result.
 struct TGetTableHistoryResultItem {
   // Timestamp in millis
@@ -298,6 +290,11 @@ struct TGetTableHistoryResultItem {
   4: required bool is_current_ancestor
 }
 
+// Result of the DESCRIBE HISTORY command.
+struct TGetTableHistoryResult {
+  1: required list<TGetTableHistoryResultItem> result
+}
+
 // Parameters for SHOW GRANT ROLE/USER commands
 struct TShowGrantPrincipalParams {
   // The effective user who submitted this request.
@@ -331,7 +328,7 @@ struct TGetFunctionsParams {
   // Session state for the user who initiated this request. If authorization is
   // enabled, only the functions this user has access to will be returned. If not
   // set, access checks will be skipped (used for internal Impala requests)
-  4: optional ImpalaInternalService.TSessionState session
+  4: optional Query.TSessionState session
 }
 
 // getFunctions() returns a list of function signatures
@@ -353,40 +350,6 @@ struct TExplainResult {
   1: required list<Data.TResultRow> results
 }
 
-// Metadata required to finalize a query - that is, to clean up after the query is done.
-// Only relevant for INSERT queries.
-struct TFinalizeParams {
-  // True if the INSERT query was OVERWRITE, rather than INTO
-  1: required bool is_overwrite
-
-  // The base directory in hdfs of the table targeted by this INSERT
-  2: required string hdfs_base_dir
-
-  // The target table name
-  3: required string table_name
-
-  // The target table database
-  4: required string table_db
-
-  // The full path in HDFS of a directory under which temporary files may be written
-  // during an INSERT. For a query with id a:b, files are written to <staging_dir>/.a_b/,
-  // and that entire directory is removed after the INSERT completes.
-  5: optional string staging_dir
-
-  // Identifier for the target table in the query-wide descriptor table (see
-  // TDescriptorTable and TTableDescriptor).
-  6: optional i64 table_id;
-
-  // Stores the ACID transaction id of the target table for transactional INSERTs.
-  7: optional i64 transaction_id;
-
-  // Stores the ACID write id of the target table for transactional INSERTs.
-  8: optional i64 write_id;
-
-  // Stores the Iceberg spec id of the partition spec used for this INSERT.
-  9: optional i32 spec_id;
-}
-
 // Request for a LOAD DATA statement. LOAD DATA is only supported for HDFS backed tables.
 struct TLoadDataReq {
   // Fully qualified table name to load data into.
@@ -413,67 +376,6 @@ struct TLoadDataResp {
   1: required Data.TResultRow load_summary
 }
 
-// Execution parameters for a single plan; component of TQueryExecRequest
-struct TPlanExecInfo {
-  // fragments[i] may consume the output of fragments[j > i];
-  // fragments[0] is the root fragment and also the coordinator fragment, if
-  // it is unpartitioned.
-  1: required list<Planner.TPlanFragment> fragments
-
-  // A map from scan node ids to a scan range specification.
-  // The node ids refer to scan nodes in fragments[].plan
-  2: optional map<Types.TPlanNodeId, Planner.TScanRangeSpec>
-      per_node_scan_ranges
-}
-
-// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
-struct TQueryExecRequest {
-  // exec info for all plans; the first one materializes the query result and subsequent
-  // ones materialize join builds that are input for preceding plans in the list.
-  1: optional list<TPlanExecInfo> plan_exec_info
-
-  // Metadata of the query result set (only for select)
-  2: optional Results.TResultSetMetadata result_set_metadata
-
-  // Set if the query needs finalization after it executes
-  3: optional TFinalizeParams finalize_params
-
-  4: required ImpalaInternalService.TQueryCtx query_ctx
-
-  // The same as the output of 'explain <query>'
-  5: optional string query_plan
-
-  // The statement type governs when the coordinator can judge a query to be finished.
-  // DML queries are complete after Wait(), SELECTs may not be. Generally matches
-  // the stmt_type of the parent TExecRequest, but in some cases (such as CREATE TABLE
-  // AS SELECT), these may differ.
-  6: required Types.TStmtType stmt_type
-
-  // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
-  7: required list<Types.TNetworkAddress> host_list
-
-  // Column lineage graph
-  8: optional LineageGraph.TLineageGraph lineage_graph
-
-  // Estimated per-host peak memory consumption in bytes. Used by admission control.
-  // TODO: Remove when AC doesn't rely on this any more.
-  9: optional i64 per_host_mem_estimate
-
-  // Maximum possible (in the case all fragments are scheduled on all hosts with
-  // max DOP) minimum memory reservation required per host, in bytes.
-  10: optional i64 max_per_host_min_mem_reservation;
-
-  // Maximum possible (in the case all fragments are scheduled on all hosts with
-  // max DOP) required threads per host, i.e. the number of threads that this query
-  // needs to execute successfully. Does not include "optional" threads.
-  11: optional i64 max_per_host_thread_reservation;
-
-  // Estimated coordinator's memory consumption in bytes assuming that the coordinator
-  // fragment will run on a dedicated coordinator. Set by the planner and used by
-  // admission control.
-  12: optional i64 dedicated_coord_mem_estimate;
-}
-
 enum TCatalogOpType {
   SHOW_TABLES = 0
   SHOW_DBS = 1
@@ -622,7 +524,7 @@ struct TMetadataOpRequest {
   // Session state for the user who initiated this request. If authorization is
   // enabled, only the server objects this user has access to will be returned.
   // If not set, access checks will be skipped (used for internal Impala requests)
-  10: optional ImpalaInternalService.TSessionState session
+  10: optional Query.TSessionState session
   11: optional TCLIService.TGetPrimaryKeysReq get_primary_keys_req
   12: optional TCLIService.TGetCrossReferenceReq get_cross_reference_req
 }
@@ -647,11 +549,11 @@ struct TExecRequest {
   1: required Types.TStmtType stmt_type
 
   // Copied from the corresponding TClientRequest
-  2: required ImpalaInternalService.TQueryOptions query_options
+  2: required Query.TQueryOptions query_options
 
   // TQueryExecRequest for the backend
   // Set iff stmt_type is QUERY or DML
-  3: optional TQueryExecRequest query_exec_request
+  3: optional Query.TQueryExecRequest query_exec_request
 
   // Set if stmt_type is DDL
   4: optional TCatalogOpRequest catalog_op_request
@@ -692,6 +594,9 @@ struct TExecRequest {
 
   // Set iff stmt_type is TESTCASE
   15: optional string testcase_data_path
+
+  // Coordinator time when plan was submitted by external frontend
+  16: optional i64 remote_submit_time
 }
 
 // Parameters to FeSupport.cacheJar().
@@ -1014,4 +919,4 @@ struct TWrappedHttpResponse {
   4: required map<string, string> cookies
   5: optional string content
   6: optional string content_type
-}
\ No newline at end of file
+}
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 668d183..1f95d72 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -34,506 +34,11 @@ include "Results.thrift"
 include "RuntimeProfile.thrift"
 include "ImpalaService.thrift"
 include "Data.thrift"
-
-// constants for TQueryOptions.num_nodes
-const i32 NUM_NODES_ALL = 0
-const i32 NUM_NODES_ALL_RACKS = -1
+include "Query.thrift"
 
 // constants for TPlanNodeId
 const i32 INVALID_PLAN_NODE_ID = -1
 
-enum TParquetFallbackSchemaResolution {
-  POSITION = 0
-  NAME = 1
-  // Valid for Iceberg tables
-  FIELD_ID = 2
-}
-
-// The order of the enum values needs to be kept in sync with
-// ParquetMetadataUtils::ORDERED_ARRAY_ENCODINGS in parquet-metadata-utils.cc.
-enum TParquetArrayResolution {
-  THREE_LEVEL = 0
-  TWO_LEVEL = 1
-  TWO_LEVEL_THEN_THREE_LEVEL = 2
-}
-
-enum TJoinDistributionMode {
-  BROADCAST = 0
-  SHUFFLE = 1
-}
-
-// Consistency level options for Kudu scans.
-enum TKuduReadMode {
-  DEFAULT = 0
-  READ_LATEST = 1
-  READ_AT_SNAPSHOT = 2
-}
-
-// Physical type and unit used when writing timestamps in Parquet.
-enum TParquetTimestampType {
-  INT96_NANOS,
-  INT64_MILLIS,
-  INT64_MICROS,
-  INT64_NANOS
-}
-
-// A table's Hive ACID type.
-enum TTransactionalType {
-  NONE,
-  INSERT_ONLY
-}
-
-// Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
-// respective defaults. Query options can be set in the following ways:
-//
-// 1) Process-wide defaults (via the impalad arg --default_query_options)
-// 2) Resource pool defaults (via resource pool configuration)
-// 3) Session settings (via the SET command or the HS2 OpenSession RPC)
-// 4) HS2/Beeswax configuration 'overlay' in the request metadata
-//
-// (1) and (2) are set by administrators and provide the default query options for a
-// session, in that order, so options set in (2) override those in (1). The user
-// can specify query options with (3) to override the preceding layers; these
-// overrides are stored in SessionState. Finally, the client can pass a config
-// 'overlay' (4) in the request metadata which overrides everything else.
-//
-// Session options (level 3, above) can be set by the user with SET <key>=<value>
-// or in the OpenSession RPC. They can be unset with SET <key>="". When unset,
-// it's unset in that level, and the values as specified by the defaults,
-// and levels 1 and 2 above take hold.
-//
-// Because of the ambiguity between null and the empty string here, string-typed
-// options where the empty string is a valid value can cause problems as follows:
-// * If their default is not the empty string, a user can't set it to the
-//   empty string with SET.
-// * Even if their default is the empty string, they may be set to something
-//   else via process defaults or resource pool defaults, and the user
-//   may not be able to override them back to the empty string.
-struct TQueryOptions {
-  1: optional bool abort_on_error = 0
-  2: optional i32 max_errors = 100
-  3: optional bool disable_codegen = 0
-  4: optional i32 batch_size = 0
-  5: optional i32 num_nodes = NUM_NODES_ALL
-  6: optional i64 max_scan_range_length = 0
-  7: optional i32 num_scanner_threads = 0
-  11: optional string debug_action = ""
-  12: optional i64 mem_limit = 0
-  14: optional CatalogObjects.TCompressionCodec compression_codec
-  15: optional i32 hbase_caching = 0
-  16: optional bool hbase_cache_blocks = 0
-  17: optional i64 parquet_file_size = 0
-  18: optional Types.TExplainLevel explain_level = 1
-  19: optional bool sync_ddl = 0
-
-  // Request pool this request should be submitted to. If not set
-  // the pool is determined based on the user.
-  20: optional string request_pool
-
-  // test hook to disable topn on the outermost select block.
-  24: optional bool disable_outermost_topn = 0
-
-  // Time, in s, before a query will be timed out if it is inactive. May not exceed
-  // --idle_query_timeout if that flag > 0. If 0, falls back to --idle_query_timeout.
-  26: optional i32 query_timeout_s = 0
-
-  // test hook to cap max memory for spilling operators (to force them to spill).
-  27: optional i64 buffer_pool_limit
-
-  // If true, transforms all count(distinct) aggregations into NDV()
-  28: optional bool appx_count_distinct = 0
-
-  // If true, allows Impala to internally disable spilling for potentially
-  // disastrous query plans. Impala will excercise this option if a query
-  // has no plan hints, and at least one table is missing relevant stats.
-  29: optional bool disable_unsafe_spills = 0
-
-  // If the number of rows that are processed for a single query is below the
-  // threshold, it will be executed on the coordinator only with codegen disabled
-  31: optional i32 exec_single_node_rows_threshold = 100
-
-  // If true, use the table's metadata to produce the partition columns instead of table
-  // scans whenever possible. This option is opt-in by default as this optimization may
-  // produce different results than the scan based approach in some edge cases.
-  32: optional bool optimize_partition_key_scans = 0
-
-  // Specify the prefered locality level of replicas during scan scheduling.
-  // Replicas with an equal or better locality will be preferred.
-  33: optional PlanNodes.TReplicaPreference replica_preference =
-      PlanNodes.TReplicaPreference.CACHE_LOCAL
-
-  // Configure whether scan ranges with local replicas will be assigned by starting from
-  // the same replica for every query or by starting with a new, pseudo-random replica for
-  // subsequent queries. The default is to start with the same replica for every query.
-  34: optional bool schedule_random_replica = 0
-
-  // If true, the planner will not generate plans with streaming preaggregations.
-  36: optional bool disable_streaming_preaggregations = 0
-
-  // If true, runtime filter propagation is enabled
-  37: optional Types.TRuntimeFilterMode runtime_filter_mode = 2
-
-  // Size in bytes of Bloom Filters used for runtime filters. Actual size of filter will
-  // be rounded up to the nearest power of two.
-  38: optional i32 runtime_bloom_filter_size = 1048576
-
-  // Time in ms to wait until runtime filters are delivered. If 0, the default defined
-  // by the startup flag of the same name is used.
-  39: optional i32 runtime_filter_wait_time_ms = 0
-
-  // If true, per-row runtime filtering is disabled
-  40: optional bool disable_row_runtime_filtering = false
-
-  // Maximum number of bloom runtime filters allowed per query
-  41: optional i32 max_num_runtime_filters = 10
-
-  // If true, use UTF-8 annotation for string columns. Note that char and varchar columns
-  // always use the annotation.
-  //
-  // This is disabled by default in order to preserve the existing behavior of legacy
-  // workloads. In addition, Impala strings are not necessarily UTF8-encoded.
-  42: optional bool parquet_annotate_strings_utf8 = false
-
-  // Determines how to resolve Parquet files' schemas in the absence of field IDs (which
-  // is always, since fields IDs are NYI). Valid values are "position" (default) and
-  // "name".
-  43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0
-
-  // Multi-threaded execution: degree of parallelism (= number of active threads) per
-  // query per backend.
-  // > 0: multi-threaded execution mode, with given dop
-  // 0: single-threaded execution mode
-  // unset: may be set automatically to > 0 in createExecRequest(), otherwise same as 0
-  44: optional i32 mt_dop
-
-  // If true, INSERT writes to S3 go directly to their final location rather than being
-  // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for
-  // those queries, the coordinator deletes all files in the final location before copying
-  // the files there.
-  45: optional bool s3_skip_insert_staging = true
-
-  // Minimum runtime bloom filter size, in bytes
-  46: optional i32 runtime_filter_min_size = 1048576
-
-  // Maximum runtime bloom filter size, in bytes
-  47: optional i32 runtime_filter_max_size = 16777216
-
-  // Prefetching behavior during hash tables' building and probing.
-  48: optional Types.TPrefetchMode prefetch_mode = Types.TPrefetchMode.HT_BUCKET
-
-  // Additional strict handling of invalid data parsing and type conversions.
-  49: optional bool strict_mode = false
-
-  // A limit on the amount of scratch directory space that can be used;
-  50: optional i64 scratch_limit = -1
-
-  // Indicates whether the FE should rewrite Exprs for optimization purposes.
-  // It's sometimes useful to disable rewrites for testing, e.g., expr-test.cc.
-  51: optional bool enable_expr_rewrites = true
-
-  // Indicates whether to use the new decimal semantics.
-  52: optional bool decimal_v2 = true
-
-  // Indicates whether to use dictionary filtering for Parquet files
-  53: optional bool parquet_dictionary_filtering = true
-
-  // Policy for resolving nested array fields in Parquet files.
-  54: optional TParquetArrayResolution parquet_array_resolution =
-    TParquetArrayResolution.THREE_LEVEL
-
-  // Indicates whether to read statistics from Parquet files and use them during query
-  // processing. This includes skipping data based on the statistics and computing query
-  // results like "select min()".
-  55: optional bool parquet_read_statistics = true
-
-  // Join distribution mode that is used when the join inputs have an unknown
-  // cardinality, e.g., because of missing table statistics.
-  56: optional TJoinDistributionMode default_join_distribution_mode =
-    TJoinDistributionMode.BROADCAST
-
-  // If the number of rows processed per node is below the threshold codegen will be
-  // automatically disabled by the planner.
-  57: optional i32 disable_codegen_rows_threshold = 50000
-
-  // The default spillable buffer size in bytes, which may be overridden by the planner.
-  // Defaults to 2MB.
-  58: optional i64 default_spillable_buffer_size = 2097152;
-
-  // The minimum spillable buffer to use. The planner will not choose a size smaller than
-  // this. Defaults to 64KB.
-  59: optional i64 min_spillable_buffer_size = 65536;
-
-  // The maximum size of row that the query will reserve memory to process. Processing
-  // rows larger than this may result in a query failure. Defaults to 512KB, e.g.
-  // enough for a row with 15 32KB strings or many smaller columns.
-  //
-  // Different operators handle this option in different ways. E.g. some simply increase
-  // the size of all their buffers to fit this row size, whereas others may use more
-  // sophisticated strategies - e.g. reserving a small number of buffers large enough to
-  // fit maximum-sized rows.
-  60: optional i64 max_row_size = 524288;
-
-  // The time, in seconds, that a session may be idle for before it is closed (and all
-  // running queries cancelled) by Impala. If 0, idle sessions never expire.
-  // The default session timeout is set by the command line flag of the same name.
-  61: optional i32 idle_session_timeout;
-
-  // Minimum number of bytes that will be scanned in COMPUTE STATS TABLESAMPLE,
-  // regardless of the user-supplied sampling percent. Default value: 1GB
-  62: optional i64 compute_stats_min_sample_size = 1073741824;
-
-  // Time limit, in s, before a query will be timed out after it starts executing. Does
-  // not include time spent in planning, scheduling or admission control. A value of 0
-  // means no time limit.
-  63: optional i32 exec_time_limit_s = 0;
-
-  // When a query has both grouping and distinct exprs, impala can optionally include the
-  // distinct exprs in the hash exchange of the first aggregation phase to spread the data
-  // among more nodes. However, this plan requires another hash exchange on the grouping
-  // exprs in the second phase which is not required when omitting the distinct exprs in
-  // the first phase. Shuffling by both is better if the grouping exprs have low NDVs.
-  64: optional bool shuffle_distinct_exprs = true;
-
-  // See comment in ImpalaService.thrift.
-  65: optional i64 max_mem_estimate_for_admission = 0;
-
-  // See comment in ImpalaService.thrift.
-  // The default values is set fairly high based on empirical data - queries with up to
-  // this number of reserved threads have run successfully as part of production
-  // workloads but with very degraded performance.
-  66: optional i32 thread_reservation_limit = 3000;
-
-  // See comment in ImpalaService.thrift.
-  67: optional i32 thread_reservation_aggregate_limit = 0;
-
-  // See comment in ImpalaService.thrift.
-  68: optional TKuduReadMode kudu_read_mode = TKuduReadMode.DEFAULT;
-
-  // Allow reading of erasure coded files in HDFS.
-  69: optional bool allow_erasure_coded_files = false;
-
-  // See comment in ImpalaService.thrift.
-  70: optional string timezone = ""
-
-  // See comment in ImpalaService.thrift.
-  71: optional i64 scan_bytes_limit = 0;
-
-  // See comment in ImpalaService.thrift.
-  72: optional i64 cpu_limit_s = 0;
-
-  // See comment in ImpalaService.thrift
-  // The default value is set to 512MB based on empirical data
-  73: optional i64 topn_bytes_limit = 536870912;
-
-  // See comment in ImpalaService.thrift
-  74: optional string client_identifier;
-
-  75: optional double resource_trace_ratio = 0;
-
-  // See comment in ImpalaService.thrift.
-  // The default value is set to 3 as this is the default value of HDFS replicas.
-  76: optional i32 num_remote_executor_candidates = 3;
-
-  // See comment in ImpalaService.thrift.
-  77: optional i64 num_rows_produced_limit = 0;
-
-  // See comment in ImpalaService.thrift
-  78: optional bool planner_testcase_mode = false;
-
-  // See comment in ImpalaService.thrift.
-  79: optional CatalogObjects.THdfsFileFormat default_file_format =
-      CatalogObjects.THdfsFileFormat.TEXT;
-
-  // See comment in ImpalaService.thrift.
-  80: optional TParquetTimestampType parquet_timestamp_type =
-      TParquetTimestampType.INT96_NANOS;
-
-  // See comment in ImpalaService.thrift.
-  81: optional bool parquet_read_page_index = true;
-
-  // See comment in ImpalaService.thrift.
-  82: optional bool parquet_write_page_index = true;
-
-  // See comment in ImpalaService.thrift.
-  83: optional i32 parquet_page_row_count_limit;
-
-  // Disable the attempt to compute an estimated number of rows in an
-  // hdfs table.
-  84: optional bool disable_hdfs_num_rows_estimate = false;
-
-  // See comment in ImpalaService.thrift.
-  85: optional string default_hints_insert_statement;
-
-  // See comment in ImpalaService.thrift
-  86: optional bool spool_query_results = true;
-
-  // See comment in ImpalaService.thrift
-  87: optional TTransactionalType default_transactional_type = TTransactionalType.NONE;
-
-  // See comment in ImpalaService.thrift.
-  // The default of 250,000 is set to a high value to avoid impacting existing users, but
-  // testing indicates a statement with this number of expressions can run.
-  88: optional i32 statement_expression_limit = 250000
-
-  // See comment in ImpalaService.thrift
-  // The default is set to 16MB. It is likely that a statement of this size would exceed
-  // the statement expression limit. Setting a limit on the total statement size avoids
-  // the cost of parsing and analyzing the statement, which is required to enforce the
-  // statement expression limit.
-  89: optional i32 max_statement_length_bytes = 16777216
-
-  // If true, skip using the data cache for this query session.
-  90: optional bool disable_data_cache = false;
-
-  // See comment in ImpalaService.thrift
-  91: optional i64 max_result_spooling_mem = 104857600;
-
-  // See comment in ImpalaService.thrift
-  92: optional i64 max_spilled_result_spooling_mem = 1073741824;
-
-  // See comment in ImpalaService.thrift
-  93: optional bool disable_hbase_num_rows_estimate = false;
-
-  // See comment in ImpalaService.thrift
-  94: optional i64 fetch_rows_timeout_ms = 10000;
-
-  // For testing purposes
-  95: optional string now_string = "";
-
-  // See comment in ImpalaService.thrift
-  96: optional i64 parquet_object_store_split_size = 268435456;
-
-  // See comment in ImpalaService.thrift
-  97: optional i64 mem_limit_executors = 0;
-
-  // See comment in ImpalaService.thrift
-  // The default value is set to 32 GB
-  98: optional i64 broadcast_bytes_limit = 34359738368;
-
-  // See comment in ImpalaService.thrift
-  99: optional i64 preagg_bytes_limit = -1;
-
-  // See comment in ImpalaService.thrift
-  100: optional bool enable_cnf_rewrites = true;
-
-  // See comment in ImpalaService.thrift
-  101: optional i32 max_cnf_exprs = 200;
-
-  // See comment in ImpalaService.thrift
-  102: optional i64 kudu_snapshot_read_timestamp_micros = 0;
-
-  // See comment in ImpalaService.thrift
-  103: optional bool retry_failed_queries = false;
-
-  // See comment in ImpalaService.thrift
-  104: optional PlanNodes.TEnabledRuntimeFilterTypes enabled_runtime_filter_types =
-      PlanNodes.TEnabledRuntimeFilterTypes.ALL;
-
-  // See comment in ImpalaService.thrift
-  105: optional bool async_codegen = false;
-
-  // See comment in ImpalaService.thrift
-  106: optional bool enable_distinct_semi_join_optimization = true;
-
-  // See comment in ImpalaService.thrift
-  107: optional i64 sort_run_bytes_limit = -1;
-
-  // See comment in ImpalaService.thrift
-  108: optional i32 max_fs_writers = 0;
-
-  // See comment in ImpalaService.thrift
-  109: optional bool refresh_updated_hms_partitions = false;
-
-  // See comment in ImpalaService.thrift
-  110: optional bool spool_all_results_for_retries = true;
-
-  // See comment in ImpalaService.thrift
-  111: optional double runtime_filter_error_rate;
-
-  // See comment in ImpalaService.thrift
-  112: optional bool use_local_tz_for_unix_timestamp_conversions = false;
-
-  // See comment in ImpalaService.thrift
-  113: optional bool convert_legacy_hive_parquet_utc_timestamps = false;
-
-  // See comment in ImpalaService.thrift
-  114: optional bool enable_outer_join_to_inner_transformation = false;
-
-  // Initialized with -1 to indicate it is unspecified.
-  // See comment in ImpalaService.thrift
-  115: optional i64 targeted_kudu_scan_range_length = -1;
-
-  // See comment in ImpalaService.thrift
-  116: optional double report_skew_limit = 1.0;
-
-  // See comment in ImpalaService.thrift
-  117: optional bool optimize_simple_limit = false;
-
-  // See comment in ImpalaService.thrift
-  118: optional bool use_dop_for_costing = true;
-
-  // See comment in ImpalaService.thrift
-  119: optional double broadcast_to_partition_factor = 1.0;
-
-  // See comment in ImpalaService.thrift
-  120: optional i64 join_rows_produced_limit = 0;
-
-  // See comment in ImpalaService.thrift
-  121: optional bool utf8_mode = false;
-
-  // See comment in ImpalaService.thrift
-  122: optional i64 analytic_rank_pushdown_threshold = 1000;
-
-  // See comment in ImpalaService.thrift
-  123: optional double minmax_filter_threshold = 0.0;
-
-  // See comment in ImpalaService.thrift
-  124: optional PlanNodes.TMinmaxFilteringLevel minmax_filtering_level =
-      PlanNodes.TMinmaxFilteringLevel.ROW_GROUP;
-}
-
-// Impala currently has two types of sessions: Beeswax and HiveServer2
-enum TSessionType {
-  BEESWAX = 0
-  HIVESERVER2 = 1
-}
-
-// Per-client session state
-struct TSessionState {
-  // A unique identifier for this session
-  3: required Types.TUniqueId session_id
-
-  // Session Type (Beeswax or HiveServer2)
-  5: required TSessionType session_type
-
-  // The default database for the session
-  1: required string database
-
-  // The user to whom this session belongs
-  2: required string connected_user
-
-  // If set, the user we are delegating for the current session
-  6: optional string delegated_user;
-
-  // Client network address
-  4: required Types.TNetworkAddress network_address
-
-  // If set, the latest Kudu timestamp observed within this session.
-  7: optional i64 kudu_latest_observed_ts;
-}
-
-// Client request including stmt to execute and query options.
-struct TClientRequest {
-  // SQL stmt to be executed
-  1: required string stmt
-
-  // query options
-  2: required TQueryOptions query_options
-
-  // Redacted SQL stmt
-  3: optional string redacted_stmt
-}
-
 // Debug options: perform some action in a particular phase of a particular node
 // TODO: find a better name
 struct TDebugOptions {
@@ -546,128 +51,6 @@ struct TDebugOptions {
   4: optional string action_param
 }
 
-// Context of this query, including the client request, session state and
-// global query parameters needed for consistent expr evaluation (e.g., now()).
-//
-// TODO: Separate into FE/BE initialized vars.
-struct TQueryCtx {
-  // Client request containing stmt to execute and query options.
-  1: required TClientRequest client_request
-
-  // A globally unique id assigned to the entire query in the BE.
-  // The bottom 4 bytes are 0 (for details see be/src/util/uid-util.h).
-  2: required Types.TUniqueId query_id
-
-  // Session state including user.
-  3: required TSessionState session
-
-  // String containing a timestamp (in local timezone) set as the query submission time.
-  4: required string now_string
-
-  // Process ID of the impalad to which the user is connected.
-  5: required i32 pid
-
-  // The coordinator's hostname.
-  // TODO: determine whether we can get this somehow via the Thrift rpc mechanism.
-  6: optional string coord_hostname
-
-  // The initiating coordinator's address of its KRPC based ImpalaInternalService.
-  7: optional Types.TNetworkAddress coord_ip_address
-
-  // List of tables missing relevant table and/or column stats. Used for
-  // populating query-profile fields consumed by CM as well as warning messages.
-  8: optional list<CatalogObjects.TTableName> tables_missing_stats
-
-  // Internal flag to disable spilling. Used as a guard against potentially
-  // disastrous query plans. The rationale is that cancelling queries, e.g.,
-  // with a huge join build is preferable over spilling "forever".
-  9: optional bool disable_spilling
-
-  // Set if this is a child query (e.g. a child of a COMPUTE STATS request)
-  10: optional Types.TUniqueId parent_query_id
-
-  // List of tables suspected to have corrupt stats
-  11: optional list<CatalogObjects.TTableName> tables_with_corrupt_stats
-
-  // The snapshot timestamp as of which to execute the query
-  // When the backing storage engine supports snapshot timestamps (such as Kudu) this
-  // allows to select a snapshot timestamp on which to perform the scan, making sure that
-  // results returned from multiple scan nodes are consistent.
-  // This defaults to -1 when no timestamp is specified.
-  12: optional i64 snapshot_timestamp = -1;
-
-  // Optional for frontend tests.
-  // The descriptor table can be included in one of two forms:
-  //  - TDescriptorTable - standard Thrift object
-  //  - TDescriptorTableSerialized - binary blob with a serialized TDescriptorTable
-  // Normal end-to-end query execution uses the serialized form to avoid copying a large
-  // number of objects when sending RPCs. For this case, desc_tbl_serialized is set and
-  // desc_tbl_testonly is not set. See IMPALA-8732.
-  // Frontend tests cannot use the serialized form, because some frontend tests deal with
-  // incomplete structures (e.g. THdfsTable without the required nullPartitionKeyValue
-  // field) that cannot be serialized. In this case, desc_tbl_testonly is set and
-  // desc_tbl_serialized is not set. See Frontend.PlanCtx.serializeDescTbl_.
-  13: optional Descriptors.TDescriptorTable desc_tbl_testonly
-  24: optional Descriptors.TDescriptorTableSerialized desc_tbl_serialized
-
-  // Milliseconds since UNIX epoch at the start of query execution.
-  14: required i64 start_unix_millis
-
-  // Hint to disable codegen. Set by planner for single-node optimization or by the
-  // backend in NativeEvalExprsWithoutRow() in FESupport. This flag is only advisory to
-  // avoid the overhead of codegen and can be ignored if codegen is needed functionally.
-  15: optional bool disable_codegen_hint = false;
-
-  // List of tables with scan ranges that map to blocks with missing disk IDs.
-  16: optional list<CatalogObjects.TTableName> tables_missing_diskids
-
-  // The resolved admission control pool to which this request will be submitted. May be
-  // unset for statements that aren't subjected to admission control (e.g. USE, SET).
-  17: optional string request_pool
-
-  // String containing a timestamp (in UTC) set as the query submission time. It
-  // represents the same point in time as now_string
-  18: required string utc_timestamp_string
-
-  // String containing name of the local timezone.
-  // It is guaranteed to be a valid timezone on the coordinator (but not necessarily on
-  // the executor, since in theory the executor could have a different timezone db).
-  // TODO(Csaba): adding timezone as a query option made this property redundant. It
-  //   still has an effect if TimezoneDatabase::LocalZoneName() cannot find the
-  //   system's local timezone and falls back to UTC. This logic will be removed in
-  //   IMPALA-7359, which will make this member completely obsolete.
-  19: required string local_time_zone
-
-  // Disables the code that estimates HBase scan cardinality from key ranges.
-  // When disabled, scan cardinality is estimated from HMS table row count
-  // stats and key column predicate selectivity. Generally only disabled
-  // for testing.
-  20: optional bool disable_hbase_num_rows_estimate = false;
-
-  // Flag to enable tracing of resource usage consumption for all fragment instances of a
-  // query. Set in ImpalaServer::PrepareQueryContext().
-  21: required bool trace_resource_usage = false
-
-  // Taken from the flags of the same name. The coordinator uses these to decide how long
-  // to wait for a report before cancelling a backend, so we want to ensure that the
-  // coordinator and executors for a given query always agree this value.
-  22: optional i32 status_report_interval_ms
-  23: optional i32 status_report_max_retry_s
-
-  // Stores the transaction id if the query is transactional.
-  25: optional i64 transaction_id
-
-  // If mt_dop was overridden by admission control's max mt_dop setting, then this
-  // is set to the original value. If mt_dop was not overridden, then this is not set.
-  26: optional i32 overridden_mt_dop_value
-
-  // The initiating coordinator's backend_id.
-  27: optional Types.TUniqueId coord_backend_id
-
-  // True if the new runtime profile format added by IMPALA-9382 should be generated
-  // by this query.
-  28: optional bool gen_aggregated_profile
-}
 
 // Descriptor that indicates that a runtime filter is produced by a plan node.
 struct TRuntimeFilterSource {
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 0a48f80..c403055 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -24,6 +24,7 @@ include "Types.thrift"
 include "beeswax.thrift"
 include "TCLIService.thrift"
 include "RuntimeProfile.thrift"
+include "Frontend.thrift"
 
 // ImpalaService accepts query execution options through beeswax.Query.configuration in
 // key:value form. For example, the list of strings could be:
@@ -696,6 +697,9 @@ struct TPingImpalaHS2ServiceResp {
 
   // The Impalad's webserver address.
   3: optional string webserver_address
+
+  // The Impalad's local monotonic time
+  4: optional i64 timestamp
 }
 
 // CloseImpalaOperation()
@@ -806,6 +810,20 @@ struct TGetRuntimeProfileResp {
   5: optional list<RuntimeProfile.TRuntimeProfileTree> failed_thrift_profiles
 }
 
+// ExecutePlannedStatement()
+//
+// Execute a statement where the ExecRequest has been externally supplied.
+// The returned OperationHandle can be used to check on the
+// status of the plan, and to fetch results once the
+// plan has finished executing.
+struct TExecutePlannedStatementReq {
+  1: required TCLIService.TExecuteStatementReq statementReq
+
+  // The plan to be executed
+  2: required Frontend.TExecRequest plan
+}
+
+
 service ImpalaHiveServer2Service extends TCLIService.TCLIService {
   // Returns the exec summary for the given query. The exec summary is only valid for
   // queries that execute with Impala's backend, i.e. QUERY, DML and COMPUTE_STATS
@@ -822,4 +840,7 @@ service ImpalaHiveServer2Service extends TCLIService.TCLIService {
 
   // Same as HS2 CloseOperation but can return additional information.
   TCloseImpalaOperationResp CloseImpalaOperation(1:TCloseImpalaOperationReq req);
+  // Execute statement with supplied ExecRequest
+  TCLIService.TExecuteStatementResp ExecutePlannedStatement(
+      1:TExecutePlannedStatementReq req);
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/Query.thrift
similarity index 79%
copy from common/thrift/ImpalaInternalService.thrift
copy to common/thrift/Query.thrift
index 668d183..31c0e13 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/Query.thrift
@@ -15,32 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//
-// This file contains the details of the protocol between coordinators and backends.
-
 namespace cpp impala
 namespace java org.apache.impala.thrift
 
-include "Status.thrift"
-include "ErrorCodes.thrift"
 include "Types.thrift"
-include "Exprs.thrift"
-include "CatalogObjects.thrift"
-include "Descriptors.thrift"
 include "PlanNodes.thrift"
 include "Planner.thrift"
-include "DataSinks.thrift"
+include "Descriptors.thrift"
 include "Results.thrift"
-include "RuntimeProfile.thrift"
-include "ImpalaService.thrift"
-include "Data.thrift"
-
-// constants for TQueryOptions.num_nodes
-const i32 NUM_NODES_ALL = 0
-const i32 NUM_NODES_ALL_RACKS = -1
-
-// constants for TPlanNodeId
-const i32 INVALID_PLAN_NODE_ID = -1
+include "CatalogObjects.thrift"
+include "LineageGraph.thrift"
 
 enum TParquetFallbackSchemaResolution {
   POSITION = 0
@@ -49,17 +33,10 @@ enum TParquetFallbackSchemaResolution {
   FIELD_ID = 2
 }
 
-// The order of the enum values needs to be kept in sync with
-// ParquetMetadataUtils::ORDERED_ARRAY_ENCODINGS in parquet-metadata-utils.cc.
-enum TParquetArrayResolution {
-  THREE_LEVEL = 0
-  TWO_LEVEL = 1
-  TWO_LEVEL_THEN_THREE_LEVEL = 2
-}
-
-enum TJoinDistributionMode {
-  BROADCAST = 0
-  SHUFFLE = 1
+// A table's Hive ACID type.
+enum TTransactionalType {
+  NONE,
+  INSERT_ONLY
 }
 
 // Consistency level options for Kudu scans.
@@ -69,6 +46,19 @@ enum TKuduReadMode {
   READ_AT_SNAPSHOT = 2
 }
 
+enum TJoinDistributionMode {
+  BROADCAST = 0
+  SHUFFLE = 1
+}
+
+// The order of the enum values needs to be kept in sync with
+// ParquetMetadataUtils::ORDERED_ARRAY_ENCODINGS in parquet-metadata-utils.cc.
+enum TParquetArrayResolution {
+  THREE_LEVEL = 0
+  TWO_LEVEL = 1
+  TWO_LEVEL_THEN_THREE_LEVEL = 2
+}
+
 // Physical type and unit used when writing timestamps in Parquet.
 enum TParquetTimestampType {
   INT96_NANOS,
@@ -77,11 +67,9 @@ enum TParquetTimestampType {
   INT64_NANOS
 }
 
-// A table's Hive ACID type.
-enum TTransactionalType {
-  NONE,
-  INSERT_ONLY
-}
+// constants for TQueryOptions.num_nodes
+const i32 NUM_NODES_ALL = 0
+const i32 NUM_NODES_ALL_RACKS = -1
 
 // Query options that correspond to ImpalaService.ImpalaQueryOptions, with their
 // respective defaults. Query options can be set in the following ways:
@@ -498,6 +486,18 @@ enum TSessionType {
   HIVESERVER2 = 1
 }
 
+// Client request including stmt to execute and query options.
+struct TClientRequest {
+  // SQL stmt to be executed
+  1: required string stmt
+
+  // query options
+  2: required TQueryOptions query_options
+
+  // Redacted SQL stmt
+  3: optional string redacted_stmt
+}
+
 // Per-client session state
 struct TSessionState {
   // A unique identifier for this session
@@ -522,30 +522,6 @@ struct TSessionState {
   7: optional i64 kudu_latest_observed_ts;
 }
 
-// Client request including stmt to execute and query options.
-struct TClientRequest {
-  // SQL stmt to be executed
-  1: required string stmt
-
-  // query options
-  2: required TQueryOptions query_options
-
-  // Redacted SQL stmt
-  3: optional string redacted_stmt
-}
-
-// Debug options: perform some action in a particular phase of a particular node
-// TODO: find a better name
-struct TDebugOptions {
-  // The plan node that this action should be applied to. If -1 it is applied to all plan
-  // nodes.
-  1: optional Types.TPlanNodeId node_id
-  2: optional PlanNodes.TExecNodePhase phase
-  3: optional PlanNodes.TDebugAction action
-  // Optional parameter that goes along with the action.
-  4: optional string action_param
-}
-
 // Context of this query, including the client request, session state and
 // global query parameters needed for consistent expr evaluation (e.g., now()).
 //
@@ -608,7 +584,6 @@ struct TQueryCtx {
   // field) that cannot be serialized. In this case, desc_tbl_testonly is set and
   // desc_tbl_serialized is not set. See Frontend.PlanCtx.serializeDescTbl_.
   13: optional Descriptors.TDescriptorTable desc_tbl_testonly
-  24: optional Descriptors.TDescriptorTableSerialized desc_tbl_serialized
 
   // Milliseconds since UNIX epoch at the start of query execution.
   14: required i64 start_unix_millis
@@ -652,8 +627,11 @@ struct TQueryCtx {
   // to wait for a report before cancelling a backend, so we want to ensure that the
   // coordinator and executors for a given query always agree this value.
   22: optional i32 status_report_interval_ms
+
   23: optional i32 status_report_max_retry_s
 
+  24: optional Descriptors.TDescriptorTableSerialized desc_tbl_serialized
+
   // Stores the transaction id if the query is transactional.
   25: optional i64 transaction_id
 
@@ -669,157 +647,98 @@ struct TQueryCtx {
   28: optional bool gen_aggregated_profile
 }
 
-// Descriptor that indicates that a runtime filter is produced by a plan node.
-struct TRuntimeFilterSource {
-  1: required Types.TPlanNodeId src_node_id
-  2: required i32 filter_id
-}
 
-// The Thrift portion of the execution parameters of a single fragment instance. Every
-// fragment instance will also have a corresponding PlanFragmentInstanceCtxPB with the
-// same fragment_idx.
-// TODO: convert the rest of this struct to protobuf
-struct TPlanFragmentInstanceCtx {
-  // TPlanFragment.idx
-  1: required Types.TFragmentIdx fragment_idx
-
-  // The globally unique fragment instance id.
-  // Format: query id + query-wide fragment instance index
-  // The query-wide fragment instance index enumerates all fragment instances of a
-  // particular query. It starts at 0, so that the query id and the id of the first
-  // fragment instance are identical.
-  // If there is a coordinator instance, it is the first one, with index 0.
-  // Range: [0, TExecQueryFInstancesParams.fragment_instance_ctxs.size()-1]
-  2: required Types.TUniqueId fragment_instance_id
-
-  // Index of this fragment instance across all instances of its parent fragment
-  // (TPlanFragment with idx = TPlanFragmentInstanceCtx.fragment_idx).
-  // Range: [0, <# of instances of parent fragment> - 1]
-  3: required i32 per_fragment_instance_idx
-
-  // Number of senders for ExchangeNodes contained in TPlanFragment.plan_tree;
-  // needed to create a DataStreamRecvr
-  // TODO for per-query exec rpc: move these to PlanFragmentCtxPB
-  5: required map<Types.TPlanNodeId, i32> per_exch_num_senders
-
-  // Id of this instance in its role as a sender.
-  6: optional i32 sender_id
-
-  7: optional TDebugOptions debug_options
-
-  // List of runtime filters produced by nodes in the finstance.
-  8: optional list<TRuntimeFilterSource> filters_produced
-
-  // If this is a join build fragment, the number of fragment instances that consume the
-  // join build. -1 = invalid.
-  10: optional i32 num_join_build_outputs
-
-  // Number of backends executing the same fragment plan. Can be used by executors to do
-  // some estimations.
-  11: optional i32 num_backends;
+// Execution parameters for a single plan; component of TQueryExecRequest
+struct TPlanExecInfo {
+  // fragments[i] may consume the output of fragments[j > i];
+  // fragments[0] is the root fragment and also the coordinator fragment, if
+  // it is unpartitioned.
+  1: required list<Planner.TPlanFragment> fragments
+
+  // A map from scan node ids to a scan range specification.
+  // The node ids refer to scan nodes in fragments[].plan
+  2: optional map<Types.TPlanNodeId, Planner.TScanRangeSpec>
+      per_node_scan_ranges
 }
 
+// Metadata required to finalize a query - that is, to clean up after the query is done.
+// Only relevant for INSERT queries.
+struct TFinalizeParams {
+  // True if the INSERT query was OVERWRITE, rather than INTO
+  1: required bool is_overwrite
 
-// Service Protocol Details
+  // The base directory in hdfs of the table targeted by this INSERT
+  2: required string hdfs_base_dir
 
-enum ImpalaInternalServiceVersion {
-  V1 = 0
-}
+  // The target table name
+  3: required string table_name
 
-// The following contains the per-rpc structs for the parameters and the result.
+  // The target table database
+  4: required string table_db
 
-// Contains info about plan fragment execution needed for the ExecQueryFInstances rpc.
-// Rather than fully coverting this to protobuf, which would be a large change, for now we
-// serialize it ourselves and send it with ExecQueryFInstances as a sidecar.
-// TODO: investigate if it's worth converting this fully to protobuf
-struct TExecPlanFragmentInfo {
-  1: optional list<Planner.TPlanFragment> fragments
+  // The full path in HDFS of a directory under which temporary files may be written
+  // during an INSERT. For a query with id a:b, files are written to <staging_dir>/.a_b/,
+  // and that entire directory is removed after the INSERT completes.
+  5: optional string staging_dir
 
-  // the order corresponds to the order of fragments in 'fragments'
-  2: optional list<TPlanFragmentInstanceCtx> fragment_instance_ctxs
-}
+  // Identifier for the target table in the query-wide descriptor table (see
+  // TDescriptorTable and TTableDescriptor).
+  6: optional i64 table_id;
 
-// Parameters for RequestPoolService.resolveRequestPool()
-// TODO: why is this here?
-struct TResolveRequestPoolParams {
-  // User to resolve to a pool via the allocation placement policy and
-  // authorize for pool access.
-  1: required string user
+  // Stores the ACID transaction id of the target table for transactional INSERTs.
+  7: optional i64 transaction_id;
 
-  // Pool name specified by the user. The allocation placement policy may
-  // return a different pool.
-  2: required string requested_pool
+  // Stores the ACID write id of the target table for transactional INSERTs.
+  8: optional i64 write_id;
+
+  // Stores the Iceberg spec id of the partition spec used for this INSERT.
+  9: optional i32 spec_id;
 }
 
-// Returned by RequestPoolService.resolveRequestPool()
-struct TResolveRequestPoolResult {
-  // Actual pool to use, as determined by the pool allocation policy. Not set
-  // if no pool was resolved.
-  1: optional string resolved_pool
+// Result of call to ImpalaPlanService/JniFrontend.CreateQueryRequest()
+struct TQueryExecRequest {
+  // exec info for all plans; the first one materializes the query result
+  1: optional list<TPlanExecInfo> plan_exec_info
 
-  // True if the user has access to submit requests to the resolved_pool. Not set
-  // if no pool was resolved.
-  2: optional bool has_access
+  // Metadata of the query result set (only for select)
+  2: optional Results.TResultSetMetadata result_set_metadata
 
-  3: optional Status.TStatus status
-}
+  // Set if the query needs finalization after it executes
+  3: optional TFinalizeParams finalize_params
 
-// Parameters for RequestPoolService.getPoolConfig()
-// TODO: why is this here?
-struct TPoolConfigParams {
-  // Pool name
-  1: required string pool
-}
+  4: required TQueryCtx query_ctx
 
-// Returned by RequestPoolService.getPoolConfig()
-struct TPoolConfig {
-  // Maximum number of placed requests before incoming requests are queued.
-  // A value of 0 effectively disables the pool. -1 indicates no limit.
-  1: required i64 max_requests
-
-  // Maximum number of queued requests before incoming requests are rejected.
-  // Any non-positive number (<= 0) disables queuing, i.e. requests are rejected instead
-  // of queued.
-  2: required i64 max_queued
-
-  // Maximum memory resources of the pool in bytes.
-  // A value of 0 effectively disables the pool. -1 indicates no limit.
-  3: required i64 max_mem_resources
-
-  // Maximum amount of time (in milliseconds) that a request will wait to be admitted
-  // before timing out. Optional, if not set then the process default (set via gflags) is
-  // used.
-  4: optional i64 queue_timeout_ms;
-
-  // Default query options that are applied to requests mapped to this pool.
-  5: required string default_query_options;
-
-  // Maximum amount of memory that can be assigned to a query (in bytes).
-  // 0 indicates no limit. If both max_query_mem_limit and min_query_mem_limit are zero
-  // then the admission controller will fall back on old behavior, which is to not set
-  // any backend mem limit if mem_limit is not set in the query options.
-  6: required i64 max_query_mem_limit = 0;
-
-  // Minimum amount of memory that can be assigned to a query (in bytes).
-  // 0 indicates no limit.
-  7: required i64 min_query_mem_limit = 0;
-
-  // If false, the mem_limit query option will not be bounded by the max/min query mem
-  // limits specified for the pool. Default is true.
-  8: required bool clamp_mem_limit_query_option = true;
-
-  // Maximum value for the mt_dop query option. If the mt_dop is set and exceeds this
-  // maximum, the mt_dop setting is reduced to the maximum. If the max_mt_dop is
-  // negative, no limit is enforced.
-  9: required i64 max_mt_dop = -1;
-}
+  // The same as the output of 'explain <query>'
+  5: optional string query_plan
+
+  // The statement type governs when the coordinator can judge a query to be finished.
+  // DML queries are complete after Wait(), SELECTs may not be. Generally matches
+  // the stmt_type of the parent TExecRequest, but in some cases (such as CREATE TABLE
+  // AS SELECT), these may differ.
+  6: required Types.TStmtType stmt_type
+
+  // List of replica hosts.  Used by the host_idx field of TScanRangeLocation.
+  7: required list<Types.TNetworkAddress> host_list
 
-struct TParseDateStringResult {
-  // True iff date string was successfully parsed
-  1: required bool valid
-  // Number of days since 1970-01-01. Used only if 'valid' is true.
-  2: optional i32 days_since_epoch
-  // Canonical date string (formed as 'yyyy-MM-dd'). Used only if 'valid' is true and the
-  // parsed date string was not in a canonical form.
-  3: optional string canonical_date_string
+  // Column lineage graph
+  8: optional LineageGraph.TLineageGraph lineage_graph
+
+  // Estimated per-host peak memory consumption in bytes. Used by admission control.
+  // TODO: Remove when AC doesn't rely on this any more.
+  9: optional i64 per_host_mem_estimate
+
+  // Maximum possible (in the case all fragments are scheduled on all hosts with
+  // max DOP) minimum memory reservation required per host, in bytes.
+  10: optional i64 max_per_host_min_mem_reservation;
+
+  // Maximum possible (in the case all fragments are scheduled on all hosts with
+  // max DOP) required threads per host, i.e. the number of threads that this query
+  // needs to execute successfully. Does not include "optional" threads.
+  11: optional i64 max_per_host_thread_reservation;
+
+  // Estimated coordinator's memory consumption in bytes assuming that the coordinator
+  // fragment will run on a dedicated coordinator. Set by the planner and used by
+  // admission control.
+  12: optional i64 dedicated_coord_mem_estimate;
 }
+
diff --git a/fe/pom.xml b/fe/pom.xml
index 70579e3..e100ff3 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -430,6 +430,11 @@ under the License.
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hive</groupId>
+      <artifactId>hive-classification</artifactId>
+      <version>${hive.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.hive</groupId>
diff --git a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
index a4633c8..1432d02 100644
--- a/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
+++ b/fe/src/main/java/org/apache/impala/analysis/Analyzer.java
@@ -1077,7 +1077,7 @@ public class Analyzer {
         TableName tblName = candidateTbls.get(tblNameIdx);
         FeTable tbl = null;
         try {
-          tbl = getTable(tblName.getDb(), tblName.getTbl());
+          tbl = getTable(tblName.getDb(), tblName.getTbl(), /* must_exist */ false);
         } catch (AnalysisException e) {
           // Ignore to allow path resolution to continue.
         }
@@ -2847,11 +2847,14 @@ public class Analyzer {
    * Throws a TableLoadingException if the registered table failed to load.
    * Does not register authorization requests or access events.
    */
-  public FeTable getTable(String dbName, String tableName)
+  public FeTable getTable(String dbName, String tableName, boolean mustExist)
       throws AnalysisException, TableLoadingException {
     TableName tblName = new TableName(dbName, tableName);
     FeTable table = globalState_.stmtTableCache.tables.get(tblName);
     if (table == null) {
+      if (!mustExist) {
+        return null;
+      }
       if (!globalState_.stmtTableCache.dbs.contains(tblName.getDb())) {
         throw new AnalysisException(DB_DOES_NOT_EXIST_ERROR_MSG + tblName.getDb());
       } else {
@@ -2972,7 +2975,7 @@ public class Analyzer {
       }
     }
     // Propagate the AnalysisException if the table/db does not exist.
-    table = getTable(fqTableName.getDb(), fqTableName.getTbl());
+    table = getTable(fqTableName.getDb(), fqTableName.getTbl(), /* must_exist */ true);
     Preconditions.checkNotNull(table);
     if (addAccessEvent) {
       // Add an audit event for this access
diff --git a/fe/src/main/java/org/apache/impala/analysis/PrivilegeSpec.java b/fe/src/main/java/org/apache/impala/analysis/PrivilegeSpec.java
index 41a96a0..af578c7 100644
--- a/fe/src/main/java/org/apache/impala/analysis/PrivilegeSpec.java
+++ b/fe/src/main/java/org/apache/impala/analysis/PrivilegeSpec.java
@@ -280,7 +280,7 @@ public class PrivilegeSpec extends StmtNode {
     try {
       dbName_ = analyzer.getTargetDbName(tableName_);
       Preconditions.checkNotNull(dbName_);
-      table = analyzer.getTable(dbName_, tableName_.getTbl());
+      table = analyzer.getTable(dbName_, tableName_.getTbl(), /* must_exist */ true);
     } catch (TableLoadingException e) {
       throw new AnalysisException(e.getMessage(), e);
     } catch (AnalysisException e) {
diff --git a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
index 13dd361..c892a2e 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
@@ -157,7 +157,8 @@ public class ResetMetadataStmt extends StatementBase {
           if (partitionSpec_ != null) {
             try {
               // Get local table info without reaching out to HMS
-              FeTable table = analyzer.getTable(dbName, tableName_.getTbl());
+              FeTable table = analyzer.getTable(dbName, tableName_.getTbl(),
+                  /* must_exist */ true);
               if (AcidUtils.isTransactionalTable(
                       table.getMetaStoreTable().getParameters())) {
                 throw new AnalysisException("Refreshing a partition is not allowed on " +
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
index e9dcdae..8472d27 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTestBase.java
@@ -47,6 +47,7 @@ import org.apache.impala.testutil.TestFileParser.TestCase;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.testutil.TestUtils.ResultFilter;
 import org.apache.impala.thrift.ImpalaInternalServiceConstants;
+import org.apache.impala.thrift.QueryConstants;
 import org.apache.impala.thrift.TDescriptorTable;
 import org.apache.impala.thrift.TExecRequest;
 import org.apache.impala.thrift.TExplainLevel;
@@ -492,7 +493,7 @@ public class PlannerTestBase extends FrontendTestBase {
     } else {
       // for distributed and parallel execution we want to run on all available nodes
       queryOptions.setNum_nodes(
-          ImpalaInternalServiceConstants.NUM_NODES_ALL);
+          QueryConstants.NUM_NODES_ALL);
     }
     if (section == Section.PARALLELPLANS
         && (!queryOptions.isSetMt_dop() || queryOptions.getMt_dop() == 0)) {