You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/06/11 23:03:36 UTC

[impala] branch master updated: IMPALA-8605: clean up HS2/beeswax session management

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ab908d5  IMPALA-8605: clean up HS2/beeswax session management
ab908d5 is described below

commit ab908d54c22861967f693428ec7d9f6d7008607f
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Wed May 15 16:34:23 2019 -0700

    IMPALA-8605: clean up HS2/beeswax session management
    
    Session/operation secrets are part of the HS2 handle
    but we haven't made use of them up until now. This
    patch checks the value and treats it as part of the
    session key, as originally intended. I.e. if the
    secret is missing, the session lookup fails.
    
    The operation secret is the same as the session secret
    to save having to generate and store extra secrets
    (there's no real benefit).
    
    A secret is added to each Beeswax session. This secret is
    internal to the server and not exposed. Adds validation
    that client requests accessed via Beeswax belong to the
    same user as the session.
    
    We switch uuid_generator_ to use boost::random_device, which
    uses /dev/urandom as its source of randomness to be more robust -
    otherwise it's hard to be sure that we won't have collisions,
    although it doesn't seem to be a problem in practice.
    
    For requests - GetRuntimeProfile() and GetExecSummary()
    that provide both a session and query ID, the code already
    checks that the session's user matches the query.
    
    An exception to the validation mechanisms above is added
    for Close() and Cancel() beeswax operations, because impala-shell
    and some administrative tools allow cancellation of
    queries on different threads and from different tools.
    
    We skip validating the session secret when cancelling queries
    from the web UI, since web UI users don't have the secret.
    
    Testing:
    * Ran exhaustive tests.
    * Add tests for all HS2 RPCs that provide invalid session secrets.
    * Add tests for HS2 RPCs that provide both session and query
      ID to ensure that query belongs to the session.
    * Add basic test for beeswax testing accessing a query from
      different connections.
    
    Change-Id: I4c014d1a32e273275a773f842b9ed9793dbdba6b
    Reviewed-on: http://gerrit.cloudera.org:8080/13585
    Reviewed-by: Lars Volker <lv...@cloudera.com>
    Reviewed-by: Thomas Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 CMakeLists.txt                          |   2 +-
 be/src/service/child-query.cc           |   3 +-
 be/src/service/impala-beeswax-server.cc |  85 +++++++++++-----
 be/src/service/impala-hs2-server.cc     | 173 +++++++++++++++++++-------------
 be/src/service/impala-http-handler.cc   |   7 +-
 be/src/service/impala-server.cc         |  89 ++++++++++++++--
 be/src/service/impala-server.h          | 155 ++++++++++++++++++++++++----
 tests/beeswax/impala_beeswax.py         |   5 +-
 tests/common/impala_connection.py       |   6 +-
 tests/hs2/hs2_test_suite.py             |  45 +++++++++
 tests/hs2/test_fetch.py                 |  36 ++++++-
 tests/hs2/test_hs2.py                   | 119 +++++++++++++++++++++-
 tests/query_test/test_beeswax.py        |  98 ++++++++++++++++++
 13 files changed, 689 insertions(+), 134 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index cda8a8a..69454b9 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -164,7 +164,7 @@ function(IMPALA_ADD_THIRDPARTY_LIB NAME HEADER STATIC_LIB SHARED_LIB)
 endfunction()
 
 
-find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time)
+find_package(Boost REQUIRED COMPONENTS thread regex filesystem system date_time random)
 # Mark Boost as a system header to avoid compile warnings.
 include_directories(SYSTEM ${Boost_INCLUDE_DIRS})
 message(STATUS "Boost include dir: " ${Boost_INCLUDE_DIRS})
diff --git a/be/src/service/child-query.cc b/be/src/service/child-query.cc
index d690d4e..2c6134d 100644
--- a/be/src/service/child-query.cc
+++ b/be/src/service/child-query.cc
@@ -35,13 +35,14 @@ const string ChildQuery::PARENT_QUERY_OPT = "impala.parent_query_id";
 // particular the parent query's lock_) while invoking HS2 functions to avoid deadlock.
 Status ChildQuery::ExecAndFetch() {
   const TUniqueId& session_id = parent_request_state_->session_id();
+  const TUniqueId& session_secret = parent_request_state_->session()->secret;
   VLOG_QUERY << "Executing child query: " << query_ << " in session "
              << PrintId(session_id);
 
   // Create HS2 request and response structs.
   TExecuteStatementResp exec_stmt_resp;
   TExecuteStatementReq exec_stmt_req;
-  ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_id,
+  ImpalaServer::TUniqueIdToTHandleIdentifier(session_id, session_secret,
       &exec_stmt_req.sessionHandle.sessionId);
   exec_stmt_req.__set_statement(query_);
   SetQueryOptions(parent_request_state_->exec_request().query_options, &exec_stmt_req);
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 51eef3a0..b897dc5 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -161,8 +161,9 @@ void ImpalaServer::explain(QueryExplanation& query_explanation, const Query& que
 void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle,
     const bool start_over, const int32_t fetch_size) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
-      SQLSTATE_GENERAL_ERROR);
+  shared_ptr<SessionState> session;
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
+        ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
 
   if (start_over) {
     // We can't start over. Raise "Optional feature not implemented"
@@ -174,7 +175,17 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
   QueryHandleToTUniqueId(query_handle, &query_id);
   VLOG_ROW << "fetch(): query_id=" << PrintId(query_id) << " fetch_size=" << fetch_size;
 
-  Status status = FetchInternal(query_id, start_over, fetch_size, &query_results);
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
+  if (UNLIKELY(request_state == nullptr)) {
+    string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
+    VLOG(1) << err_msg;
+    RaiseBeeswaxException(err_msg, SQLSTATE_GENERAL_ERROR);
+  }
+  // Validate that query can be accessed by user.
+  RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
+      query_id), SQLSTATE_GENERAL_ERROR);
+  Status status =
+      FetchInternal(request_state.get(), start_over, fetch_size, &query_results);
   VLOG_ROW << "fetch result: #results=" << query_results.data.size()
            << " has_more=" << (query_results.has_more ? "true" : "false");
   if (!status.ok()) {
@@ -187,8 +198,9 @@ void ImpalaServer::fetch(Results& query_results, const QueryHandle& query_handle
 void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
     const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
-      SQLSTATE_GENERAL_ERROR);
+  shared_ptr<SessionState> session;
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
+      ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
 
   // Convert QueryHandle to TUniqueId and get the query exec state.
   TUniqueId query_id;
@@ -199,6 +211,9 @@ void ImpalaServer::get_results_metadata(ResultsMetadata& results_metadata,
     RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
       SQLSTATE_GENERAL_ERROR);
   }
+  // Validate that query can be accessed by user.
+  RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
+      query_id), SQLSTATE_GENERAL_ERROR);
 
   {
     lock_guard<mutex> l(*request_state->lock());
@@ -235,6 +250,13 @@ void ImpalaServer::close(const QueryHandle& handle) {
       SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
+
+  // Impala-shell and administrative tools can call this from a different connection,
+  // e.g. to allow an admin to force-terminate queries. We should allow the operation to
+  // proceed without validating the session/query relation so that workflows don't
+  // get broken. In future we could check that the users match OR that the user has
+  // admin priviliges on the server.
+
   VLOG_QUERY << "close(): query_id=" << PrintId(query_id);
   // TODO: do we need to raise an exception if the query state is EXCEPTION?
   // TODO: use timeout to get rid of unwanted request_state.
@@ -243,8 +265,9 @@ void ImpalaServer::close(const QueryHandle& handle) {
 
 beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
-      SQLSTATE_GENERAL_ERROR);
+  shared_ptr<SessionState> session;
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
+      ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_ROW << "get_state(): query_id=" << PrintId(query_id);
@@ -255,6 +278,9 @@ beeswax::QueryState::type ImpalaServer::get_state(const QueryHandle& handle) {
     RaiseBeeswaxException(Substitute("Invalid query handle: $0", PrintId(query_id)),
       SQLSTATE_GENERAL_ERROR);
   }
+  // Validate that query can be accessed by user.
+  RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
+      query_id), SQLSTATE_GENERAL_ERROR);
   // Take the lock to ensure that if the client sees a query_state == EXCEPTION, it is
   // guaranteed to see the error query_status.
   lock_guard<mutex> l(*request_state->lock());
@@ -275,9 +301,10 @@ void ImpalaServer::clean(const LogContextId& log_context) {
 }
 
 void ImpalaServer::get_log(string& log, const LogContextId& context) {
+  shared_ptr<SessionState> session;
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
-      SQLSTATE_GENERAL_ERROR);
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
+      ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
   // LogContextId is the same as QueryHandle.id
   QueryHandle handle;
   handle.__set_id(context);
@@ -291,6 +318,9 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
     LOG(ERROR) << str.str();
     return;
   }
+  // Validate that query can be accessed by user.
+  RAISE_IF_ERROR(CheckClientRequestSession(session.get(), request_state->effective_user(),
+      query_id), SQLSTATE_GENERAL_ERROR);
   stringstream error_log_ss;
 
   {
@@ -318,7 +348,7 @@ void ImpalaServer::get_log(string& log, const LogContextId& context) {
   log = error_log_ss.str();
 }
 
-void ImpalaServer::get_default_configuration(vector<ConfigVariable> &configurations,
+void ImpalaServer::get_default_configuration(vector<ConfigVariable>& configurations,
     const bool include_hadoop) {
   ScopedSessionState session_handle(this);
   RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
@@ -342,6 +372,12 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
   // Convert QueryHandle to TUniqueId and get the query exec state.
   TUniqueId query_id;
   QueryHandleToTUniqueId(query_handle, &query_id);
+
+  // Impala-shell and administrative tools can call this from a different connection,
+  // e.g. to allow an admin to force-terminate queries. We should allow the operation to
+  // proceed without validating the session/query relation so that workflows don't
+  // get broken. In future we could check that the users match OR that the user has
+  // admin priviliges on the server.
   RAISE_IF_ERROR(CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
   tstatus.status_code = TErrorCode::OK;
 }
@@ -349,13 +385,15 @@ void ImpalaServer::Cancel(impala::TStatus& tstatus,
 void ImpalaServer::CloseInsert(TInsertResult& insert_result,
     const QueryHandle& query_handle) {
   ScopedSessionState session_handle(this);
-  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(ThriftServer::GetThreadConnectionId()),
-      SQLSTATE_GENERAL_ERROR);
+  shared_ptr<SessionState> session;
+  RAISE_IF_ERROR(session_handle.WithBeeswaxSession(
+      ThriftServer::GetThreadConnectionId(), &session), SQLSTATE_GENERAL_ERROR);
   TUniqueId query_id;
   QueryHandleToTUniqueId(query_handle, &query_id);
   VLOG_QUERY << "CloseInsert(): query_id=" << PrintId(query_id);
 
-  Status status = CloseInsertInternal(query_id, &insert_result);
+  // CloseInsertInternal() will validates that 'session' has access to 'query_id'.
+  Status status = CloseInsertInternal(session.get(), query_id, &insert_result);
   if (!status.ok()) {
     RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
   }
@@ -377,6 +415,7 @@ void ImpalaServer::GetRuntimeProfile(string& profile_output, const QueryHandle&
   }
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
+  // GetRuntimeProfile() will validate that the user has access to 'query_id'.
   VLOG_RPC << "GetRuntimeProfile(): query_id=" << PrintId(query_id);
   Status status = GetRuntimeProfileOutput(
       query_id, GetEffectiveUser(*session), TRuntimeProfileFormat::STRING, &ss, nullptr);
@@ -402,6 +441,7 @@ void ImpalaServer::GetExecSummary(impala::TExecSummary& result,
   TUniqueId query_id;
   QueryHandleToTUniqueId(handle, &query_id);
   VLOG_RPC << "GetExecSummary(): query_id=" << PrintId(query_id);
+  // GetExecSummary() will validate that the user has access to 'query_id'.
   Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &result);
   if (!status.ok()) RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
 }
@@ -433,7 +473,10 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
   {
     shared_ptr<SessionState> session;
     const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
-    RETURN_IF_ERROR(GetSessionState(session_id, &session));
+    // OK to skip secret validation since 'session_id' comes from connection
+    // and is trusted.
+    RETURN_IF_ERROR(GetSessionState(session_id, SecretArg::SkipSecretCheck(), &session,
+        /* mark_active= */ false));
     DCHECK(session != nullptr);
     {
       // The session is created when the client connects. Depending on the underlying
@@ -486,15 +529,8 @@ inline void ImpalaServer::QueryHandleToTUniqueId(const QueryHandle& handle,
   throw exc;
 }
 
-Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
+Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
     const bool start_over, const int32_t fetch_size, beeswax::Results* query_results) {
-  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
-  if (UNLIKELY(request_state == nullptr)) {
-    string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
-    VLOG(1) << err_msg;
-    return Status::Expected(err_msg);
-  }
-
   // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
   // ensures that rows are ready to be fetched (e.g., Wait() opens
   // ClientRequestState::output_exprs_, which are evaluated in
@@ -549,7 +585,7 @@ Status ImpalaServer::FetchInternal(const TUniqueId& query_id,
   return fetch_rows_status;
 }
 
-Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
+Status ImpalaServer::CloseInsertInternal(SessionState* session, const TUniqueId& query_id,
     TInsertResult* insert_result) {
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
   if (UNLIKELY(request_state == nullptr)) {
@@ -558,6 +594,9 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id,
     return Status::Expected(err_msg);
   }
 
+  RETURN_IF_ERROR(
+      CheckClientRequestSession(session, request_state->effective_user(), query_id));
+
   Status query_status;
   {
     lock_guard<mutex> l(*request_state->lock());
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 8b7896b..82be134 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -109,7 +109,8 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
   }
   ScopedSessionState scoped_session(this);
   shared_ptr<SessionState> session;
-  Status get_session_status = scoped_session.WithSession(session_id, &session);
+  Status get_session_status =
+      scoped_session.WithSession(session_id, SecretArg::Session(secret), &session);
   if (!get_session_status.ok()) {
     status->__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
     status->__set_errorMessage(get_session_status.GetDetail());
@@ -170,28 +171,15 @@ void ImpalaServer::ExecuteMetadataOp(const THandleIdentifier& session_handle,
     return;
   }
   handle->__set_hasResultSet(true);
-  // TODO: create secret for operationId
+  // Secret is inherited from session.
   TUniqueId operation_id = request_state->query_id();
-  TUniqueIdToTHandleIdentifier(operation_id, operation_id, &(handle->operationId));
+  TUniqueIdToTHandleIdentifier(operation_id, secret, &(handle->operationId));
   status->__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
-Status ImpalaServer::FetchInternal(const TUniqueId& query_id, int32_t fetch_size,
-    bool fetch_first, TFetchResultsResp* fetch_results) {
-  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
-  if (UNLIKELY(request_state == nullptr)) {
-    string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
-    VLOG(1) << err_msg;
-    return Status::Expected(err_msg);
-  }
-
-  // FetchResults doesn't have an associated session handle, so we presume that this
-  // request should keep alive the same session that orignated the query.
-  ScopedSessionState session_handle(this);
-  const TUniqueId session_id = request_state->session_id();
-  shared_ptr<SessionState> session;
-  RETURN_IF_ERROR(session_handle.WithSession(session_id, &session));
-
+Status ImpalaServer::FetchInternal(ClientRequestState* request_state,
+    SessionState* session, int32_t fetch_size, bool fetch_first,
+    TFetchResultsResp* fetch_results) {
   // Make sure ClientRequestState::Wait() has completed before fetching rows. Wait()
   // ensures that rows are ready to be fetched (e.g., Wait() opens
   // ClientRequestState::output_exprs_, which are evaluated in
@@ -238,7 +226,8 @@ Status ImpalaServer::TExecuteStatementReqToTQueryContext(
     RETURN_IF_ERROR(THandleIdentifierToTUniqueId(execute_request.sessionHandle.sessionId,
         &session_id, &secret));
 
-    RETURN_IF_ERROR(GetSessionState(session_id, &session_state));
+    RETURN_IF_ERROR(
+        GetSessionState(session_id, SecretArg::Session(secret), &session_state));
     session_state->ToThrift(session_id, &query_ctx->session);
     lock_guard<mutex> l(session_state->lock);
     query_ctx->client_request.query_options = session_state->QueryOptions();
@@ -277,19 +266,23 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
   HS2_RETURN_IF_ERROR(return_val, CheckNotShuttingDown(), SQLSTATE_GENERAL_ERROR);
 
   // Generate session ID and the secret
-  TUniqueId session_id;
+  uuid secret_uuid;
+  uuid session_uuid;
   {
     lock_guard<mutex> l(uuid_lock_);
-    uuid secret = uuid_generator_();
-    uuid session_uuid = uuid_generator_();
-    return_val.sessionHandle.sessionId.guid.assign(
-        session_uuid.begin(), session_uuid.end());
-    return_val.sessionHandle.sessionId.secret.assign(secret.begin(), secret.end());
-    DCHECK_EQ(return_val.sessionHandle.sessionId.guid.size(), 16);
-    DCHECK_EQ(return_val.sessionHandle.sessionId.secret.size(), 16);
-    return_val.__isset.sessionHandle = true;
-    UUIDToTUniqueId(session_uuid, &session_id);
-  }
+    secret_uuid = crypto_uuid_generator_();
+    session_uuid = crypto_uuid_generator_();
+  }
+  return_val.sessionHandle.sessionId.guid.assign(
+      session_uuid.begin(), session_uuid.end());
+  return_val.sessionHandle.sessionId.secret.assign(
+      secret_uuid.begin(), secret_uuid.end());
+  DCHECK_EQ(return_val.sessionHandle.sessionId.guid.size(), 16);
+  DCHECK_EQ(return_val.sessionHandle.sessionId.secret.size(), 16);
+  return_val.__isset.sessionHandle = true;
+  TUniqueId session_id, secret;
+  UUIDToTUniqueId(session_uuid, &session_id);
+  UUIDToTUniqueId(secret_uuid, &secret);
 
   // DO NOT log this Thrift struct in its entirety, in case a bad client sets the
   // password.
@@ -298,9 +291,8 @@ void ImpalaServer::OpenSession(TOpenSessionResp& return_val,
 
   // create a session state: initialize start time, session type, database and default
   // query options.
-  // TODO: put secret in session state map and check it
   // TODO: Fix duplication of code between here and ConnectionStart().
-  shared_ptr<SessionState> state = make_shared<SessionState>(this);
+  shared_ptr<SessionState> state = make_shared<SessionState>(this, session_id, secret);
   state->closed = false;
   state->start_time_ms = UnixMillis();
   state->session_type = TSessionType::HIVESERVER2;
@@ -394,7 +386,9 @@ void ImpalaServer::CloseSession(TCloseSessionResp& return_val,
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
       request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
   HS2_RETURN_IF_ERROR(return_val,
-      CloseSessionInternal(session_id, false), SQLSTATE_GENERAL_ERROR);
+      CloseSessionInternal(
+          session_id, SecretArg::Session(secret), /* ignore_if_absent= */ false),
+      SQLSTATE_GENERAL_ERROR);
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
@@ -409,7 +403,8 @@ void ImpalaServer::GetInfo(TGetInfoResp& return_val,
       request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Session(secret), &session),
       SQLSTATE_GENERAL_ERROR);
 
   switch (request.infoType) {
@@ -444,7 +439,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
       request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Session(secret), &session),
       SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
@@ -492,8 +488,8 @@ void ImpalaServer::ExecuteStatement(TExecuteStatementResp& return_val,
   return_val.__isset.operationHandle = true;
   return_val.operationHandle.__set_operationType(TOperationType::EXECUTE_STATEMENT);
   return_val.operationHandle.__set_hasResultSet(request_state->returns_result_set());
-  // TODO: create secret for operationId and store the secret in request_state
-  TUniqueIdToTHandleIdentifier(request_state->query_id(), request_state->query_id(),
+  // Secret is inherited from session.
+  TUniqueIdToTHandleIdentifier(request_state->query_id(), secret,
                                &return_val.operationHandle.operationId);
   return_val.status.__set_statusCode(
       apache::hive::service::cli::thrift::TStatusCode::SUCCESS_STATUS);
@@ -651,11 +647,12 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
     return;
   }
 
-  // TODO: check secret
+  // Secret is inherited from session.
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
   VLOG_ROW << "GetOperationStatus(): query_id=" << PrintId(query_id);
 
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
@@ -668,7 +665,9 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = request_state->session_id();
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(
+          session_id, SecretArg::Operation(op_secret, query_id), &session),
       SQLSTATE_GENERAL_ERROR);
 
   {
@@ -688,9 +687,10 @@ void ImpalaServer::GetOperationStatus(TGetOperationStatusResp& return_val,
 void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
     const TCancelOperationReq& request) {
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
   VLOG_QUERY << "CancelOperation(): query_id=" << PrintId(query_id);
 
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
@@ -701,7 +701,8 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
   }
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = request_state->session_id();
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)),
       SQLSTATE_GENERAL_ERROR);
   HS2_RETURN_IF_ERROR(return_val, CancelInternal(query_id, true), SQLSTATE_GENERAL_ERROR);
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
@@ -710,9 +711,10 @@ void ImpalaServer::CancelOperation(TCancelOperationResp& return_val,
 void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
     const TCloseOperationReq& request) {
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
   VLOG_QUERY << "CloseOperation(): query_id=" << PrintId(query_id);
 
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
@@ -723,8 +725,10 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
   }
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = request_state->session_id();
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)),
       SQLSTATE_GENERAL_ERROR);
+
   // TODO: use timeout to get rid of unwanted request_state.
   HS2_RETURN_IF_ERROR(return_val, UnregisterQuery(query_id, true),
       SQLSTATE_GENERAL_ERROR);
@@ -734,11 +738,11 @@ void ImpalaServer::CloseOperation(TCloseOperationResp& return_val,
 void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
     const TGetResultSetMetadataReq& request) {
   // Convert Operation id to TUniqueId and get the query exec state.
-  // TODO: check secret
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
   VLOG_QUERY << "GetResultSetMetadata(): query_id=" << PrintId(query_id);
 
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
@@ -750,7 +754,8 @@ void ImpalaServer::GetResultSetMetadata(TGetResultSetMetadataResp& return_val,
   }
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = request_state->session_id();
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)),
       SQLSTATE_GENERAL_ERROR);
   {
     lock_guard<mutex> l(*request_state->lock());
@@ -786,16 +791,32 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
   bool fetch_first = request.orientation == TFetchOrientation::FETCH_FIRST;
 
   // Convert Operation id to TUniqueId and get the query exec state.
-  // TODO: check secret
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
   VLOG_ROW << "FetchResults(): query_id=" << PrintId(query_id)
            << " fetch_size=" << request.maxRows;
 
-  // FetchInternal takes care of extending the session
-  Status status = FetchInternal(query_id, request.maxRows, fetch_first, &return_val);
+  shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
+  if (UNLIKELY(request_state == nullptr)) {
+    string err_msg = Substitute("Invalid query handle: $0", PrintId(query_id));
+    VLOG(1) << err_msg;
+    HS2_RETURN_ERROR(return_val, err_msg, SQLSTATE_GENERAL_ERROR);
+  }
+
+  // Validate the secret and keep the session that originated the query alive.
+  ScopedSessionState session_handle(this);
+  const TUniqueId session_id = request_state->session_id();
+  shared_ptr<SessionState> session;
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(
+          session_id, SecretArg::Operation(op_secret, query_id), &session),
+      SQLSTATE_GENERAL_ERROR);
+
+  Status status = FetchInternal(
+      request_state.get(), session.get(), request.maxRows, fetch_first, &return_val);
   VLOG_ROW << "FetchResults(): #results=" << return_val.results.rows.size()
            << " has_more=" << (return_val.hasMoreRows ? "true" : "false");
   if (!status.ok()) {
@@ -816,9 +837,10 @@ void ImpalaServer::FetchResults(TFetchResultsResp& return_val,
 
 void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
   TUniqueId query_id;
-  TUniqueId secret;
+  TUniqueId op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
 
   shared_ptr<ClientRequestState> request_state = GetClientRequestState(query_id);
   if (UNLIKELY(request_state.get() == nullptr)) {
@@ -831,8 +853,9 @@ void ImpalaServer::GetLog(TGetLogResp& return_val, const TGetLogReq& request) {
   // should keep alive the same session that orignated the query.
   ScopedSessionState session_handle(this);
   const TUniqueId session_id = request_state->session_id();
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id),
-                      SQLSTATE_GENERAL_ERROR);
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Operation(op_secret, query_id)),
+      SQLSTATE_GENERAL_ERROR);
 
   stringstream ss;
   Coordinator* coord = request_state->GetCoordinator();
@@ -870,19 +893,24 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
   TUniqueId session_id;
   TUniqueId secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.sessionHandle.sessionId, &session_id, &secret),
+      SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(
+          session_id, SecretArg::Session(secret), &session),
       SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     HS2_RETURN_ERROR(return_val, Substitute("Invalid session id: $0",
         PrintId(session_id)), SQLSTATE_GENERAL_ERROR);
   }
 
-  TUniqueId query_id;
-  HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+  TUniqueId query_id, op_secret;
+  HS2_RETURN_IF_ERROR(return_val,
+      THandleIdentifierToTUniqueId(
+          request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
 
   TExecSummary summary;
   Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &summary);
@@ -891,24 +919,27 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
 
-void ImpalaServer::GetRuntimeProfile(TGetRuntimeProfileResp& return_val,
-    const TGetRuntimeProfileReq& request) {
+void ImpalaServer::GetRuntimeProfile(
+    TGetRuntimeProfileResp& return_val, const TGetRuntimeProfileReq& request) {
   TUniqueId session_id;
   TUniqueId secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
       request.sessionHandle.sessionId, &session_id, &secret), SQLSTATE_GENERAL_ERROR);
   ScopedSessionState session_handle(this);
   shared_ptr<SessionState> session;
-  HS2_RETURN_IF_ERROR(return_val, session_handle.WithSession(session_id, &session),
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(
+          session_id, SecretArg::Session(secret), &session),
       SQLSTATE_GENERAL_ERROR);
   if (session == NULL) {
     HS2_RETURN_ERROR(return_val, Substitute("Invalid session id: $0",
         PrintId(session_id)), SQLSTATE_GENERAL_ERROR);
   }
 
-  TUniqueId query_id;
+  TUniqueId query_id, op_secret;
   HS2_RETURN_IF_ERROR(return_val, THandleIdentifierToTUniqueId(
-      request.operationHandle.operationId, &query_id, &secret), SQLSTATE_GENERAL_ERROR);
+      request.operationHandle.operationId, &query_id, &op_secret),
+      SQLSTATE_GENERAL_ERROR);
 
   stringstream ss;
   TRuntimeProfileTree thrift_profile;
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 23b86a3..61e6f69 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -186,6 +186,8 @@ void ImpalaHttpHandler::CancelQueryHandler(const Webserver::WebRequest& req,
     return;
   }
   Status cause("Cancelled from Impala's debug web interface");
+  // Web UI doesn't have access to secret so we can't validate it. We assume that
+  // web UI is allowed to close queries.
   status = server_->UnregisterQuery(unique_id, true, &cause);
   if (!status.ok()) {
     Value error(status.GetDetail().c_str(), document->GetAllocator());
@@ -206,7 +208,10 @@ void ImpalaHttpHandler::CloseSessionHandler(const Webserver::WebRequest& req,
     return;
   }
   Status cause("Session closed from Impala's debug web interface");
-  status = server_->CloseSessionInternal(unique_id, true);
+  // Web UI doesn't have access to secret so we can't validate it. We assume that
+  // web UI is allowed to close sessions.
+  status = server_->CloseSessionInternal(unique_id,
+      ImpalaServer::SecretArg::SkipSecretCheck(), /* ignore_if_absent= */ false);
   if (!status.ok()) {
     Value error(status.GetDetail().c_str(), document->GetAllocator());
     document->AddMember("error", error, document->GetAllocator());
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 9fb58fe..1c20fe6 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -42,6 +42,7 @@
 
 #include "catalog/catalog-server.h"
 #include "catalog/catalog-util.h"
+#include "common/compiler-util.h"
 #include "common/logging.h"
 #include "common/thread-debug-info.h"
 #include "common/version.h"
@@ -294,6 +295,13 @@ const char* ImpalaServer::SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED = "HYC00";
 // Interval between checks for query expiration.
 const int64_t EXPIRATION_CHECK_INTERVAL_MS = 1000L;
 
+// Template to return error messages for client requests that could not be found, belonged
+// to the wrong session, or had a mismatched secret. We need to use this particular string
+// in some places because the shell has a regex for it.
+// TODO: Make consistent with "Invalid query handle: $0" template used elsewhere.
+// TODO: this should be turned into a proper error code and used throughout ImpalaServer.
+static const char* LEGACY_INVALID_QUERY_HANDLE_TEMPLATE = "Query id $0 not found.";
+
 ThreadSafeRandom ImpalaServer::rng_(GetRandomSeed32());
 
 ImpalaServer::ImpalaServer(ExecEnv* exec_env)
@@ -773,7 +781,8 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
     QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
     if (query_record == query_log_index_.end()) {
       // Common error, so logging explicitly and eliding Status's stack trace.
-      string err = strings::Substitute("Query id $0 not found.", PrintId(query_id));
+      string err =
+          strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
       VLOG(1) << err;
       return Status::Expected(err);
     }
@@ -848,7 +857,8 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
     }
     if (is_query_missing) {
       // Common error, so logging explicitly and eliding Status's stack trace.
-      string err = strings::Substitute("Query id $0 not found.", PrintId(query_id));
+      string err =
+          strings::Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id));
       VLOG(1) << err;
       return Status::Expected(err);
     }
@@ -1105,6 +1115,9 @@ void ImpalaServer::PrepareQueryContext(const TNetworkAddress& backend_addr,
   // benchmarks show it to be slightly cheaper than contending for a
   // single generator under a lock (since random_generator is not
   // thread-safe).
+  // TODO: as cleanup we should consolidate this with uuid_generator_ - there's no reason
+  // to have two different methods to achieve the same end. To address the scalability
+  // concern we could shard the RNG or similar.
   query_ctx->query_id = UuidToQueryId(random_generator()());
   GetThreadDebugInfo()->SetQueryId(query_ctx->query_id);
 
@@ -1341,7 +1354,8 @@ Status ImpalaServer::CancelInternal(const TUniqueId& query_id, bool check_inflig
 }
 
 Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
-    bool ignore_if_absent) {
+    const SecretArg& secret, bool ignore_if_absent) {
+  DCHECK(secret.is_session_secret());
   VLOG_QUERY << "Closing session: " << PrintId(session_id);
 
   // Find the session_state and remove it from the map.
@@ -1349,10 +1363,15 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
   {
     lock_guard<mutex> l(session_state_map_lock_);
     SessionStateMap::iterator entry = session_state_map_.find(session_id);
-    if (entry == session_state_map_.end()) {
+    if (entry == session_state_map_.end() || !secret.Validate(entry->second->secret)) {
       if (ignore_if_absent) {
         return Status::OK();
       } else {
+        if (entry != session_state_map_.end()) {
+          // Log invalid attempts to connect. Be careful not to log secret.
+          VLOG(1) << "Client tried to connect to session " << PrintId(session_id)
+                  << " with invalid secret.";
+        }
         string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
         VLOG(1) << "CloseSessionInternal(): " << err_msg;
         return Status::Expected(err_msg);
@@ -1388,13 +1407,24 @@ Status ImpalaServer::CloseSessionInternal(const TUniqueId& session_id,
   return Status::OK();
 }
 
-Status ImpalaServer::GetSessionState(const TUniqueId& session_id,
+Status ImpalaServer::GetSessionState(const TUniqueId& session_id, const SecretArg& secret,
     shared_ptr<SessionState>* session_state, bool mark_active) {
   lock_guard<mutex> l(session_state_map_lock_);
   SessionStateMap::iterator i = session_state_map_.find(session_id);
-  if (i == session_state_map_.end()) {
+  // TODO: consider factoring out the lookup and secret validation into a separate method.
+  // This would require rethinking the locking protocol for 'session_state_map_lock_' -
+  // it probably doesn't not need to be held for the full duration of this function.
+  if (i == session_state_map_.end() || !secret.Validate(i->second->secret)) {
+    if (i != session_state_map_.end()) {
+      // Log invalid attempts to connect. Be careful not to log secret.
+      VLOG(1) << "Client tried to connect to session " << PrintId(session_id)
+              << " with invalid "
+              << (secret.is_session_secret() ? "session" : "operation") << " secret.";
+    }
     *session_state = std::shared_ptr<SessionState>();
-    string err_msg = Substitute("Invalid session id: $0", PrintId(session_id));
+    string err_msg = secret.is_session_secret() ?
+        Substitute("Invalid session id: $0", PrintId(session_id)) :
+        Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(secret.query_id()));
     VLOG(1) << "GetSessionState(): " << err_msg;
     return Status::Expected(err_msg);
   } else {
@@ -1970,7 +2000,17 @@ void ImpalaServer::ConnectionStart(
     // Beeswax only allows for one session per connection, so we can share the session ID
     // with the connection ID
     const TUniqueId& session_id = connection_context.connection_id;
-    shared_ptr<SessionState> session_state = make_shared<SessionState>(this);
+    // Generate a secret per Beeswax session so that the HS2 secret validation mechanism
+    // prevent accessing of Beeswax sessions from HS2.
+    uuid secret_uuid;
+    {
+      lock_guard<mutex> l(uuid_lock_);
+      secret_uuid = crypto_uuid_generator_();
+    }
+    TUniqueId secret;
+    UUIDToTUniqueId(secret_uuid, &secret);
+    shared_ptr<SessionState> session_state =
+        make_shared<SessionState>(this, session_id, secret);
     session_state->closed = false;
     session_state->start_time_ms = UnixMillis();
     session_state->last_accessed_ms = UnixMillis();
@@ -2030,7 +2070,8 @@ void ImpalaServer::ConnectionEnd(
 
   if (close) {
     for (const TUniqueId& session_id : disconnected_sessions) {
-      Status status = CloseSessionInternal(session_id, true);
+      Status status = CloseSessionInternal(session_id, SecretArg::SkipSecretCheck(),
+          /* ignore_if_absent= */ true);
       if (!status.ok()) {
         LOG(WARNING) << "Error closing session " << PrintId(session_id) << ": "
                      << status.GetDetail();
@@ -2041,7 +2082,7 @@ void ImpalaServer::ConnectionEnd(
         || connection_context.server_name == HS2_HTTP_SERVER_NAME);
     for (const TUniqueId& session_id : disconnected_sessions) {
       shared_ptr<SessionState> state;
-      Status status = GetSessionState(session_id, &state);
+      Status status = GetSessionState(session_id, SecretArg::SkipSecretCheck(), &state);
       // The session may not exist if it was explicitly closed.
       if (!status.ok()) continue;
       lock_guard<mutex> state_lock(state->lock);
@@ -2548,6 +2589,24 @@ shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
   }
 }
 
+Status ImpalaServer::CheckClientRequestSession(
+    SessionState* session, const std::string& client_request_effective_user,
+    const TUniqueId& query_id) {
+  const string& session_user = GetEffectiveUser(*session);
+  // Empty session users only occur for unauthenticated sessions where no user was
+  // specified by the client, e.g. unauthenticated beeswax sessions. Skip the
+  // check in this case because no security is enabled. Some tests rely on
+  // this behaviour, e.g. to poll query status from a new beeswax connection.
+  if (!session_user.empty() && session_user != client_request_effective_user) {
+    Status err = Status::Expected(
+        Substitute(LEGACY_INVALID_QUERY_HANDLE_TEMPLATE, PrintId(query_id)));
+    VLOG(1) << err << " caused by user mismatch: '" << session_user << "' vs '"
+            << client_request_effective_user << "'";
+    return err;
+  }
+  return Status::OK();
+}
+
 void ImpalaServer::UpdateFilter(TUpdateFilterResult& result,
     const TUpdateFilterParams& params) {
   DCHECK(params.__isset.query_id);
@@ -2657,4 +2716,14 @@ Status ImpalaServer::StartShutdown(
   LOG(INFO) << "Shutdown complete, going down.";
   exit(0);
 }
+
+// This should never be inlined to prevent it potentially being optimized, e.g.
+// by short-circuiting the comparisons.
+__attribute__((noinline)) int ImpalaServer::SecretArg::ConstantTimeCompare(
+    const TUniqueId& other) const {
+  // Compiles to two integer comparisons and an addition with no branches.
+  // TODO: consider replacing with CRYPTO_memcmp() once our minimum supported OpenSSL
+  // version has it.
+  return (secret_.hi != other.hi) + (secret_.lo != other.lo);
+}
 }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index fdbdb7f..c0971ba 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -19,6 +19,7 @@
 #define IMPALA_SERVICE_IMPALA_SERVER_H
 
 #include <atomic>
+#include <boost/random/random_device.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -138,6 +139,29 @@ class QuerySchedule;
 /// run the shutdown function again to set a shorter deadline. The deadline can't be
 /// increased after shutdown is started.
 ///
+/// Secrets
+/// -------
+/// The HS2 protocol has a concept of a 'secret' associated with each session and
+/// client request that is returned to the client, then must be passed back along with
+/// each RPC that interacts with the session or operation. RPCs with non-matching secrets
+/// should fail. Beeswax does not have this concept - rather sessions are bound to a
+/// connection, and that session and associated requests should (generally) only be
+/// accessed from the session's connection.
+///
+/// All RPC handler functions are responsible for validating secrets when looking up
+/// client sessions or requests, with some exceptions:
+/// * Beewax sessions are tied to a connection, so it is always valid to access the
+///   session from the original connection.
+/// * Cancel() and close() Beeswax operations can cancel any query if the ID is known.
+///   Existing tools (impala-shell and administrative tools) depend on this behaviour.
+///
+/// HS2 RPC handlers should pass the client-provided secret to WithSession() before
+/// taking any action on behalf of the client. Beeswax RPC handlers should call
+/// CheckClientRequestSession() to ensure that the client request is accessible from
+/// the current session. All other functions assume that the validation was already done.
+///
+/// HTTP handlers do not have access to client secrets, so do not validate them.
+///
 /// Locking
 /// -------
 /// This class is partially thread-safe. To ensure freedom from deadlock, if multiple
@@ -443,7 +467,9 @@ class ImpalaServer : public ImpalaServiceIf,
     /// run as children of Beeswax sessions) get results back in the expected format -
     /// child queries inherit the HS2 version from their parents, and a Beeswax session
     /// will never update the HS2 version from the default.
-    SessionState(ImpalaServer* impala_server) : impala_server(impala_server),
+    SessionState(ImpalaServer* impala_server, TUniqueId session_id, TUniqueId secret)
+      : impala_server(impala_server), session_id(std::move(session_id)),
+        secret(std::move(secret)),
         closed(false), expired(false), hs2_version(apache::hive::service::cli::thrift::
         TProtocolVersion::HIVE_CLI_SERVICE_PROTOCOL_V1), total_queries(0), ref_count(0) {
       DCHECK(this->impala_server != nullptr);
@@ -452,6 +478,14 @@ class ImpalaServer : public ImpalaServiceIf,
     /// Pointer to the Impala server of this session
     ImpalaServer* impala_server;
 
+    /// The unique session ID. This is also the key for session_state_map_, but
+    /// we redundantly store it here for convenience.
+    const TUniqueId session_id;
+
+    /// The session secret that client RPCs must pass back in to access the session.
+    /// This must not be printed to logs or exposed in any other way.
+    const TUniqueId secret;
+
     TSessionType::type session_type;
 
     /// Time the session was created, in ms since epoch (UTC).
@@ -547,6 +581,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
  private:
   struct ExpirationEvent;
+  class SecretArg;
   friend class ChildQuery;
   friend class ControlService;
   friend class ImpalaHttpHandler;
@@ -565,6 +600,16 @@ class ImpalaServer : public ImpalaServiceIf,
   std::shared_ptr<ClientRequestState> GetClientRequestState(
       const TUniqueId& query_id);
 
+  /// Used in situations where the client provides a session ID and a query ID and the
+  /// caller needs to validate that the query can be accessed from the session. The two
+  /// arguments are the session obtained by looking up the session ID provided by the
+  /// RPC client and the effective user from the query. If the username doesn't match,
+  /// return an error indicating that the query was not found (since we prefer not to
+  /// leak information about the existence of queries that the user doesn't have
+  /// access to).
+  Status CheckClientRequestSession(SessionState* session,
+      const std::string& client_request_effective_user, const TUniqueId& query_id);
+
   /// Updates a set of Impalad catalog metrics including number tables/databases and
   /// some cache metrics applicable to local catalog mode (if configured). Called at
   /// the end of statestore update callback (to account for metadata object changes)
@@ -625,8 +670,8 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Caller should not hold any locks when calling this function.
   /// If ignore_if_absent is true, returns OK even if a session with the supplied ID does
   /// not exist.
-  Status CloseSessionInternal(const TUniqueId& session_id, bool ignore_if_absent)
-      WARN_UNUSED_RESULT;
+  Status CloseSessionInternal(const TUniqueId& session_id, const SecretArg& secret,
+      bool ignore_if_absent) WARN_UNUSED_RESULT;
 
   /// Gets the runtime profile string for a given query_id and writes it to the output
   /// stream. First searches for the query id in the map of in-flight queries. If no
@@ -827,13 +872,15 @@ class ImpalaServer : public ImpalaServiceIf,
   [[noreturn]] void RaiseBeeswaxException(const std::string& msg, const char* sql_state);
 
   /// Executes the fetch logic. Doesn't clean up the exec state if an error occurs.
-  Status FetchInternal(const TUniqueId& query_id, bool start_over,
+  Status FetchInternal(ClientRequestState* request_state, bool start_over,
       int32_t fetch_size, beeswax::Results* query_results) WARN_UNUSED_RESULT;
 
   /// Populate insert_result and clean up exec state. If the query
   /// status is an error, insert_result is not populated and the status is returned.
-  Status CloseInsertInternal(const TUniqueId& query_id, TInsertResult* insert_result)
-      WARN_UNUSED_RESULT;
+  /// 'session' is RPC client's session, used to check whether the insert can
+  /// be closed via that session.
+  Status CloseInsertInternal(SessionState* session, const TUniqueId& query_id,
+      TInsertResult* insert_result) WARN_UNUSED_RESULT;
 
   /// HiveServer2 private methods (implemented in impala-hs2-server.cc)
 
@@ -852,8 +899,9 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Executes the fetch logic for HiveServer2 FetchResults. If fetch_first is true, then
   /// the query's state should be reset to fetch from the beginning of the result set.
-  /// Doesn't clean up the exec state if an error occurs.
-  Status FetchInternal(const TUniqueId& query_id, int32_t fetch_size, bool fetch_first,
+  /// Doesn't clean up 'request_state' if an error occurs.
+  Status FetchInternal(ClientRequestState* request_state, SessionState* session,
+      int32_t fetch_size, bool fetch_first,
       apache::hive::service::cli::thrift::TFetchResultsResp* fetch_results)
       WARN_UNUSED_RESULT;
 
@@ -1003,6 +1051,64 @@ class ImpalaServer : public ImpalaServiceIf,
   TQueryOptions default_query_options_;
   std::vector<beeswax::ConfigVariable> default_configs_;
 
+  // Container for a secret passed into functions for validation.
+  class SecretArg {
+   public:
+    // This should only be used if the client has not provided a secret but the caller
+    // knows that the client in fact is allowed to access the session or operation that
+    // they are accessing.
+    static SecretArg SkipSecretCheck() {
+      return SecretArg(true, true, TUniqueId(), TUniqueId());
+    }
+
+    /// Pass a session secret for validation.
+    static SecretArg Session(const TUniqueId& secret) {
+      return SecretArg(false, true, secret, TUniqueId());
+    }
+
+    /// Pass a query secret for validation. The error message will include the 'query_id',
+    /// so that must also be passed.
+    static SecretArg Operation(const TUniqueId& secret, const TUniqueId& query_id) {
+      return SecretArg(false, false, secret, query_id);
+    }
+
+    /// Return true iff the check should be skipped or the secret matches 'other'.
+    /// All validation should be done via this function to avoid subtle errors.
+    bool Validate(const TUniqueId& other) const {
+      return skip_validation_ || ConstantTimeCompare(other) == 0;
+    }
+
+    bool is_session_secret() const { return is_session_secret_; }
+    TUniqueId query_id() const {
+      DCHECK(!is_session_secret_);
+      return query_id_;
+    }
+
+   private:
+    SecretArg(bool skip_validation, bool is_session_secret, TUniqueId secret,
+        TUniqueId query_id)
+      : skip_validation_(skip_validation),
+        is_session_secret_(is_session_secret),
+        secret_(std::move(secret)),
+        query_id_(std::move(query_id)) {}
+
+    /// A comparison function for unique IDs that executes in an amount of time unrelated
+    /// to the input values. Returns the number of words that differ between id1 and id2.
+    int ConstantTimeCompare(const TUniqueId& other) const;
+
+    /// True if the caller wants to skip validation of the session.
+    const bool skip_validation_;
+
+    /// True if this is a session secret, false if this is an operation secret.
+    const bool is_session_secret_;
+
+    /// The secret to validate.
+    const TUniqueId secret_;
+
+    /// The query id, only provided if this is an operation secret.
+    const TUniqueId query_id_;
+  };
+
   /// Class that allows users of SessionState to mark a session as in-use, and therefore
   /// immune to expiration. The marking is done in WithSession() and undone in the
   /// destructor, so this class can be used to 'check-out' a session for the duration of a
@@ -1013,11 +1119,13 @@ class ImpalaServer : public ImpalaServiceIf,
 
     /// Marks a session as in-use, and saves it so that it can be unmarked when this
     /// object goes out of scope. Returns OK unless there is an error in GetSessionState.
-    /// Must only be called once per ScopedSessionState.
-    Status WithSession(const TUniqueId& session_id,
+    /// 'secret' must be provided and is validated against the stored secret for the
+    /// session. Must only be called once per ScopedSessionState.
+    Status WithSession(const TUniqueId& session_id, const SecretArg& secret,
         std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
       DCHECK(session_.get() == NULL);
-      RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
+      RETURN_IF_ERROR(impala_->GetSessionState(
+            session_id, secret, &session_, /* mark_active= */ true));
       if (session != NULL) (*session) = session_;
 
       // We won't have a connection context in the case of ChildQuery, which calls into
@@ -1039,12 +1147,17 @@ class ImpalaServer : public ImpalaServiceIf,
       return Status::OK();
     }
 
-    /// Same as WithSession(), except does not update the session/connection mapping, as
-    /// beeswax sessions can only be used over a single connection.
-    Status WithBeeswaxSession(const TUniqueId& session_id,
-        std::shared_ptr<SessionState>* session = NULL) WARN_UNUSED_RESULT {
+    /// Same as WithSession(), except:
+    /// * It should only be called from beeswax with a 'connection_id' obtained from
+    ///   ThriftServer::GetThreadConnectionId().
+    /// * It does not update the session/connection mapping, as beeswax sessions can
+    ///   only be used over a single connection.
+    Status WithBeeswaxSession(const TUniqueId& connection_id,
+        std::shared_ptr<SessionState>* session = NULL) {
       DCHECK(session_.get() == NULL);
-      RETURN_IF_ERROR(impala_->GetSessionState(session_id, &session_, true));
+      RETURN_IF_ERROR(
+          impala_->GetSessionState(connection_id, SecretArg::SkipSecretCheck(), &session_,
+              /* mark_active= */ true));
       if (session != NULL) (*session) = session_;
       return Status::OK();
     }
@@ -1087,10 +1200,11 @@ class ImpalaServer : public ImpalaServiceIf,
   ConnectionToSessionMap connection_to_sessions_map_;
 
   /// Returns session state for given session_id.
-  /// If not found, session_state will be NULL and an error status will be returned.
+  /// If not found or validation of 'secret' against the stored secret in the
+  /// SessionState fails, session_state will be NULL and an error status will be returned.
   /// If mark_active is true, also checks if the session is expired or closed and
   /// increments the session's reference counter if it is still alive.
-  Status GetSessionState(const TUniqueId& session_id,
+  Status GetSessionState(const TUniqueId& session_id, const SecretArg& secret,
       std::shared_ptr<SessionState>* session_state, bool mark_active = false)
       WARN_UNUSED_RESULT;
 
@@ -1121,8 +1235,9 @@ class ImpalaServer : public ImpalaServiceIf,
   std::shared_ptr<const TBackendDescriptor> local_backend_descriptor_;
   boost::mutex local_backend_descriptor_lock_;
 
-  /// Generate unique session id for HiveServer2 session
-  boost::uuids::random_generator uuid_generator_;
+  /// UUID generator for session IDs and secrets. Uses system random device to get
+  /// cryptographically secure random numbers.
+  boost::uuids::basic_random_generator<boost::random_device> crypto_uuid_generator_;
 
   /// Lock to protect uuid_generator
   boost::mutex uuid_lock_;
diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py
index e8d43db..b788a38 100644
--- a/tests/beeswax/impala_beeswax.py
+++ b/tests/beeswax/impala_beeswax.py
@@ -470,9 +470,12 @@ class ImpalaBeeswaxClient(object):
     exec_result.summary = 'Returned %d rows' % (len(result_rows))
     return exec_result
 
+  def close_dml(self, handle):
+    return self.__do_rpc(lambda: self.imp_service.CloseInsert(handle))
+
   def __fetch_insert_results(self, handle):
     """Executes an insert query"""
-    result = self.__do_rpc(lambda: self.imp_service.CloseInsert(handle))
+    result = self.close_dml(handle)
     # The insert was successful
     num_rows = sum(map(int, result.rows_modified.values()))
     data = ["%s: %s" % row for row in result.rows_modified.iteritems()]
diff --git a/tests/common/impala_connection.py b/tests/common/impala_connection.py
index 5242090..acc7159 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -175,6 +175,10 @@ class BeeswaxConnection(ImpalaConnection):
     LOG.info("-- closing query for operation handle: %s" % operation_handle)
     self.__beeswax_client.close_query(operation_handle.get_handle())
 
+  def close_dml(self, operation_handle):
+    LOG.info("-- closing DML query for operation handle: %s" % operation_handle)
+    self.__beeswax_client.close_dml(operation_handle.get_handle())
+
   def execute(self, sql_stmt, user=None):
     LOG.info("-- executing against %s\n%s;\n" % (self.__host_port, sql_stmt))
     return self.__beeswax_client.execute(sql_stmt, user=user)
@@ -220,7 +224,7 @@ class BeeswaxConnection(ImpalaConnection):
 
   def get_log(self, operation_handle):
     LOG.info("-- getting log for operation: %s" % operation_handle)
-    return self.__beeswax_client.get_log(operation_handle.get_handle())
+    return self.__beeswax_client.get_log(operation_handle.get_handle().log_context)
 
   def fetch(self, sql_stmt, operation_handle, max_rows = -1):
     LOG.info("-- fetching results from: %s" % operation_handle)
diff --git a/tests/hs2/hs2_test_suite.py b/tests/hs2/hs2_test_suite.py
index 83b2ff4..d627316 100644
--- a/tests/hs2/hs2_test_suite.py
+++ b/tests/hs2/hs2_test_suite.py
@@ -63,6 +63,22 @@ def operation_id_to_query_id(operation_id):
   hi = ''.join(['%0.2X' % ord(c) for c in hi[::-1]])
   return "%s:%s" % (lo, hi)
 
+
+def create_session_handle_without_secret(session_handle):
+  """Create a HS2 session handle with the same session ID as 'session_handle' but a
+  bogus secret of the right length, i.e. 16 bytes."""
+  return TCLIService.TSessionHandle(TCLIService.THandleIdentifier(
+      session_handle.sessionId.guid, r"xxxxxxxxxxxxxxxx"))
+
+
+def create_op_handle_without_secret(op_handle):
+  """Create a HS2 operation handle with same parameters as 'op_handle' but with a bogus
+  secret of the right length, i.e. 16 bytes."""
+  op_id = TCLIService.THandleIdentifier(op_handle.operationId.guid, r"xxxxxxxxxxxxxxxx")
+  return TCLIService.TOperationHandle(
+      op_id, op_handle.operationType, op_handle.hasResultSet)
+
+
 class HS2TestSuite(ImpalaTestSuite):
   HS2_V6_COLUMN_TYPES = ['boolVal', 'stringVal', 'byteVal', 'i16Val', 'i32Val', 'i64Val',
                          'doubleVal', 'binaryVal']
@@ -88,6 +104,35 @@ class HS2TestSuite(ImpalaTestSuite):
        and expected_error_prefix is not None:
       assert response.status.errorMessage.startswith(expected_error_prefix)
 
+  @staticmethod
+  def check_invalid_session(response):
+    """Checks that the HS2 API response is the correct response if the session is invalid,
+    i.e. the session doesn't exist or the secret is invalid."""
+    HS2TestSuite.check_response(response, TCLIService.TStatusCode.ERROR_STATUS,
+                                "Invalid session id:")
+
+  @staticmethod
+  def check_invalid_query(response, expect_legacy_err=False):
+    """Checks that the HS2 API response is the correct response if the query is invalid,
+    i.e. the query doesn't exist, doesn't match the session provided, or the secret is
+    invalid. """
+    if expect_legacy_err:
+      # Some operations return non-standard errors like "Query id ... not found".
+      expected_err = "Query id"
+    else:
+      # We should standardise on this error message.
+      expected_err = "Invalid query handle:"
+    HS2TestSuite.check_response(response, TCLIService.TStatusCode.ERROR_STATUS,
+                                expected_err)
+
+  @staticmethod
+  def check_profile_access_denied(response, user):
+    """Checks that the HS2 API response is the correct response if the user is not
+    authorised to access the query's profile."""
+    HS2TestSuite.check_response(response, TCLIService.TStatusCode.ERROR_STATUS,
+                                "User {0} is not authorized to access the runtime "
+                                "profile or execution summary".format(user))
+
   def close(self, op_handle):
     close_op_req = TCLIService.TCloseOperationReq()
     close_op_req.operationHandle = op_handle
diff --git a/tests/hs2/test_fetch.py b/tests/hs2/test_fetch.py
index e0f3f18..eb416d0 100644
--- a/tests/hs2/test_fetch.py
+++ b/tests/hs2/test_fetch.py
@@ -18,7 +18,8 @@
 
 import pytest
 import re
-from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session
+from tests.hs2.hs2_test_suite import (HS2TestSuite, needs_session,
+    create_op_handle_without_secret)
 from TCLIService import TCLIService, constants
 from TCLIService.ttypes import TTypeId
 
@@ -133,6 +134,7 @@ class TestFetch(HS2TestSuite):
     execute_statement_resp = self.hs2_client.ExecuteStatement(execute_statement_req)
     HS2TestSuite.check_response(execute_statement_resp)
 
+    # Do the actual fetch with a valid request.
     fetch_results_req = TCLIService.TFetchResultsReq()
     fetch_results_req.operationHandle = execute_statement_resp.operationHandle
     fetch_results_req.maxRows = 1024
@@ -257,3 +259,35 @@ class TestFetch(HS2TestSuite):
   def test_compute_stats(self):
     """Exercise the child query path"""
     self.__query_and_fetch("compute stats functional.alltypes")
+
+  @needs_session()
+  def test_invalid_secret(self):
+    """Test that the FetchResults, GetResultSetMetadata and CloseOperation APIs validate
+    the session secret."""
+    execute_req = TCLIService.TExecuteStatementReq(
+        self.session_handle, "select 'something something'")
+    execute_resp = self.hs2_client.ExecuteStatement(execute_req)
+    HS2TestSuite.check_response(execute_resp)
+
+    good_handle = execute_resp.operationHandle
+    bad_handle = create_op_handle_without_secret(good_handle)
+
+    # Fetching and closing operations with an invalid handle should be a no-op, i.e.
+    # the later operations with the good handle should succeed.
+    HS2TestSuite.check_invalid_query(self.hs2_client.FetchResults(
+        TCLIService.TFetchResultsReq(operationHandle=bad_handle, maxRows=1024)),
+        expect_legacy_err=True)
+    HS2TestSuite.check_invalid_query(self.hs2_client.GetResultSetMetadata(
+        TCLIService.TGetResultSetMetadataReq(operationHandle=bad_handle)),
+        expect_legacy_err=True)
+    HS2TestSuite.check_invalid_query(self.hs2_client.CloseOperation(
+        TCLIService.TCloseOperationReq(operationHandle=bad_handle)),
+        expect_legacy_err=True)
+
+    # Ensure that the good handle remained valid.
+    HS2TestSuite.check_response(self.hs2_client.FetchResults(
+        TCLIService.TFetchResultsReq(operationHandle=good_handle, maxRows=1024)))
+    HS2TestSuite.check_response(self.hs2_client.GetResultSetMetadata(
+        TCLIService.TGetResultSetMetadataReq(operationHandle=good_handle)))
+    HS2TestSuite.check_response(self.hs2_client.CloseOperation(
+        TCLIService.TCloseOperationReq(operationHandle=good_handle)))
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index 9fa92af..d50773f 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -17,7 +17,9 @@
 #
 # Client tests for Impala's HiveServer2 interface
 
+from getpass import getuser
 import json
+import logging
 import pytest
 import time
 
@@ -26,19 +28,34 @@ from urllib2 import urlopen
 from ImpalaService import ImpalaHiveServer2Service
 from tests.common.environ import IMPALA_TEST_CLUSTER_PROPERTIES
 from tests.common.skip import SkipIfDockerizedCluster
-from tests.hs2.hs2_test_suite import HS2TestSuite, needs_session, operation_id_to_query_id
+from tests.hs2.hs2_test_suite import (HS2TestSuite, needs_session,
+    operation_id_to_query_id, create_session_handle_without_secret,
+    create_op_handle_without_secret)
 from TCLIService import TCLIService
 
+LOG = logging.getLogger('test_hs2')
 
 SQLSTATE_GENERAL_ERROR = "HY000"
 
 class TestHS2(HS2TestSuite):
+  def setup_method(self, method):
+    # Keep track of extra session handless opened by _open_extra_session.
+    self.__extra_sessions = []
+
+  def teardown_method(self, method):
+    for session in self.__extra_sessions:
+      try:
+        close_session_req = TCLIService.TCloseSessionReq(session)
+        self.hs2_client.CloseSession(close_session_req)
+      except Exception:
+        LOG.log_exception("Error closing session.")
+
   def test_open_session(self):
     """Check that a session can be opened"""
     open_session_req = TCLIService.TOpenSessionReq()
     TestHS2.check_response(self.hs2_client.OpenSession(open_session_req))
 
-  def test_open_sesssion_query_options(self):
+  def test_open_session_query_options(self):
     """Check that OpenSession sets query options"""
     open_session_req = TCLIService.TOpenSessionReq()
     open_session_req.configuration = {'MAX_ERRORS': '45678',
@@ -167,6 +184,13 @@ class TestHS2(HS2TestSuite):
     resp = self.hs2_client.OpenSession(open_session_req)
     TestHS2.check_response(resp)
 
+    # Check that CloseSession validates session secret and acts as if the session didn't
+    # exist.
+    invalid_close_session_req = TCLIService.TCloseSessionReq()
+    invalid_close_session_req.sessionHandle = create_session_handle_without_secret(
+        resp.sessionHandle)
+    TestHS2.check_invalid_session(self.hs2_client.CloseSession(invalid_close_session_req))
+
     close_session_req = TCLIService.TCloseSessionReq()
     close_session_req.sessionHandle = resp.sessionHandle
     TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
@@ -182,8 +206,7 @@ class TestHS2(HS2TestSuite):
     TestHS2.check_response(self.hs2_client.CloseSession(close_session_req))
 
     # Double close should be an error
-    TestHS2.check_response(self.hs2_client.CloseSession(close_session_req),
-                           TCLIService.TStatusCode.ERROR_STATUS)
+    TestHS2.check_invalid_session(self.hs2_client.CloseSession(close_session_req))
 
   # This test verifies the number of open and expired sessions so avoid running
   # concurrently with other sessions.
@@ -279,6 +302,11 @@ class TestHS2(HS2TestSuite):
     assert get_operation_status_resp.operationState == \
         TCLIService.TOperationState.FINISHED_STATE
 
+    # Validate that the operation secret is checked.
+    TestHS2.check_invalid_query(self.get_operation_status(
+        create_op_handle_without_secret(execute_statement_resp.operationHandle)),
+        expect_legacy_err=True)
+
     close_operation_req = TCLIService.TCloseOperationReq()
     close_operation_req.operationHandle = execute_statement_resp.operationHandle
     TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
@@ -378,6 +406,15 @@ class TestHS2(HS2TestSuite):
         "impala-server.num-open-hiveserver2-sessions", num_sessions)
 
   @needs_session()
+  def test_get_info(self):
+    # Negative test for invalid session secret.
+    invalid_req = TCLIService.TGetInfoReq(create_session_handle_without_secret(
+        self.session_handle), TCLIService.TGetInfoType.CLI_DBMS_NAME)
+    TestHS2.check_invalid_session(self.hs2_client.GetInfo(invalid_req))
+
+    # TODO: it would be useful to add positive tests for GetInfo().
+
+  @needs_session()
   def test_get_schemas(self):
     get_schemas_req = TCLIService.TGetSchemasReq()
     get_schemas_req.sessionHandle = self.session_handle
@@ -395,6 +432,11 @@ class TestHS2(HS2TestSuite):
     assert "Sql Statement: GET_SCHEMAS" in profile_page
     assert "Query Type: DDL" in profile_page
 
+    # Test that session secret is validated by this API.
+    get_schemas_req.sessionHandle = create_session_handle_without_secret(
+        self.session_handle)
+    TestHS2.check_invalid_session(self.hs2_client.GetSchemas(get_schemas_req))
+
   @pytest.mark.execute_serially
   @needs_session()
   def test_get_tables(self):
@@ -439,6 +481,14 @@ class TestHS2(HS2TestSuite):
           assert table_remarks == "table comment"
         # Ensure the table is loaded for the second iteration.
         self.execute_query("describe {0}".format(table))
+
+      # Test that session secret is validated by this API.
+      invalid_req = TCLIService.TGetTablesReq()
+      invalid_req.sessionHandle = create_session_handle_without_secret(
+          self.session_handle)
+      invalid_req.schemaName = "default"
+      invalid_req.tableName = table
+      TestHS2.check_invalid_session(self.hs2_client.GetTables(invalid_req))
     finally:
       self.execute_query("drop table {0}".format(table))
 
@@ -474,6 +524,13 @@ class TestHS2(HS2TestSuite):
       TestHS2.check_response(fetch_results_resp)
       has_more_results = fetch_results_resp.hasMoreRows
 
+    # Test that secret is validated.
+    invalid_get_log_req = TCLIService.TGetLogReq()
+    invalid_get_log_req.operationHandle = create_op_handle_without_secret(
+        execute_statement_resp.operationHandle)
+    TestHS2.check_invalid_query(self.hs2_client.GetLog(invalid_get_log_req),
+        expect_legacy_err=True)
+
     get_log_req = TCLIService.TGetLogReq()
     get_log_req.operationHandle = execute_statement_resp.operationHandle
     get_log_resp = self.hs2_client.GetLog(get_log_req)
@@ -510,11 +567,31 @@ class TestHS2(HS2TestSuite):
     TestHS2.check_response(exec_summary_resp)
     assert len(exec_summary_resp.summary.nodes) > 0
 
+    # Test that session secret is validated. Note that operation secret does not need to
+    # be validated in addition if the session secret is valid and the operation belongs
+    # to the session, because the user has full access to the session.
+    TestHS2.check_invalid_session(self.hs2_client.GetExecSummary(
+      ImpalaHiveServer2Service.TGetExecSummaryReq(execute_statement_resp.operationHandle,
+        create_session_handle_without_secret(self.session_handle))))
+
+    # Attempt to access query with different user should fail.
+    evil_user = getuser() + "_evil_twin"
+    session_handle2 = self._open_extra_session(evil_user)
+    TestHS2.check_profile_access_denied(self.hs2_client.GetExecSummary(
+      ImpalaHiveServer2Service.TGetExecSummaryReq(execute_statement_resp.operationHandle,
+        session_handle2)), user=evil_user)
+
     # Now close the query and verify the exec summary is available.
     close_operation_req = TCLIService.TCloseOperationReq()
     close_operation_req.operationHandle = execute_statement_resp.operationHandle
     TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
 
+    # Attempt to access query with different user from log should fail.
+    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+        execute_statement_resp.operationHandle, session_handle2)),
+      user=evil_user)
+
     exec_summary_resp = self.hs2_client.GetExecSummary(exec_summary_req)
     TestHS2.check_response(exec_summary_resp)
     assert len(exec_summary_resp.summary.nodes) > 0
@@ -547,10 +624,32 @@ class TestHS2(HS2TestSuite):
     # After fetching the results, we must be in state FINISHED.
     assert "Query State: FINISHED" in get_profile_resp.profile, get_profile_resp.profile
 
+    # Test that session secret is validated. Note that operation secret does not need to
+    # be validated in addition if the session secret is valid and the operation belongs
+    # to the session, because the user has full access to the session.
+    TestHS2.check_invalid_session(self.hs2_client.GetRuntimeProfile(
+      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+        execute_statement_resp.operationHandle,
+        create_session_handle_without_secret(self.session_handle))))
+
+    # Attempt to access query with different user should fail.
+    evil_user = getuser() + "_evil_twin"
+    session_handle2 = self._open_extra_session(evil_user)
+    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+        execute_statement_resp.operationHandle, session_handle2)),
+        user=evil_user)
+
     close_operation_req = TCLIService.TCloseOperationReq()
     close_operation_req.operationHandle = execute_statement_resp.operationHandle
     TestHS2.check_response(self.hs2_client.CloseOperation(close_operation_req))
 
+    # Attempt to access query with different user from log should fail.
+    TestHS2.check_profile_access_denied(self.hs2_client.GetRuntimeProfile(
+      ImpalaHiveServer2Service.TGetRuntimeProfileReq(
+        execute_statement_resp.operationHandle, session_handle2)),
+        user=evil_user)
+
     get_profile_resp = self.hs2_client.GetRuntimeProfile(get_profile_req)
     TestHS2.check_response(get_profile_resp)
     assert statement in get_profile_resp.profile
@@ -599,6 +698,11 @@ class TestHS2(HS2TestSuite):
     for colType in types:
       assert typed_col.values.count(colType) == 1
 
+    # Test that session secret is validated by this API.
+    invalid_req = TCLIService.TGetTypeInfoReq(
+        create_session_handle_without_secret(self.session_handle))
+    TestHS2.check_invalid_session(self.hs2_client.GetTypeInfo(invalid_req))
+
   def _fetch_results(self, operation_handle, max_rows):
     """Fetch results from 'operation_handle' with up to 'max_rows' rows using
     self.hs2_client, returning the TFetchResultsResp object."""
@@ -609,6 +713,13 @@ class TestHS2(HS2TestSuite):
     TestHS2.check_response(fetch_results_resp)
     return fetch_results_resp
 
+  def _open_extra_session(self, user_name):
+    """Open an extra session with the provided username that will be automatically
+    closed at the end of the test. Returns the session handle."""
+    resp = self.hs2_client.OpenSession(TCLIService.TOpenSessionReq(username=user_name))
+    TestHS2.check_response(resp)
+    return resp.sessionHandle
+
   def test_close_connection(self):
     """Tests that an hs2 session remains valid even after the connection is dropped."""
     open_session_req = TCLIService.TOpenSessionReq()
diff --git a/tests/query_test/test_beeswax.py b/tests/query_test/test_beeswax.py
new file mode 100644
index 0000000..037a4a3
--- /dev/null
+++ b/tests/query_test/test_beeswax.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestBeeswax(ImpalaTestSuite):
+
+  def test_user_validation(self):
+    """Test various scenarios with cross-connection operations with the same
+    and different users."""
+    USER1 = "user1"
+    USER2 = "user2"
+    client1 = self.client
+    different_user_client = None
+    unset_user_client = None
+    try:
+      same_user_client = self.create_impala_client(protocol='beeswax')
+      different_user_client = self.create_impala_client(protocol='beeswax')
+      unset_user_client = self.create_impala_client(protocol='beeswax')
+
+      # Unauthenticated Beewax only sets user once the query is run.
+      result = client1.execute("select effective_user()", user=USER1)
+      assert result.data[0] == USER1
+      result = same_user_client.execute("select effective_user()", user=USER1)
+      assert result.data[0] == USER1
+      # The user shouldn't change once set.
+      result = client1.execute("select effective_user()", user=USER2)
+      assert result.data[0] == USER1
+      result = different_user_client.execute("select effective_user()", user=USER2)
+      assert result.data[0] == USER2
+
+      QUERY = "select * from functional.alltypes"
+      handle = client1.execute_async(QUERY)
+      client1.get_state(handle)
+      # Connections with a effective user should not be able to access other users'
+      # sessions.
+      self._assert_invalid_handle(lambda: different_user_client.fetch(QUERY, handle))
+      self._assert_invalid_handle(lambda: different_user_client.get_state(handle))
+      self._assert_invalid_handle(lambda: different_user_client.get_log(handle))
+      self._assert_profile_access_denied(
+          lambda: different_user_client.get_runtime_profile(handle))
+      self._assert_profile_access_denied(
+          lambda: different_user_client.get_exec_summary(handle))
+      self._assert_invalid_handle(lambda: different_user_client.close_dml(handle))
+
+      # Connections with the same user can always access the requests. A connection
+      # without an effective user (only possible with unauthenticated connections)
+      # is allowed to access any query. Some Impala tests depend on this.
+      for valid_client in [client1, same_user_client, unset_user_client]:
+        valid_client.get_state(handle)
+        valid_client.fetch(QUERY, handle)
+        valid_client.get_log(handle)
+        valid_client.get_runtime_profile(handle)
+        valid_client.get_exec_summary(handle)
+
+      # Validation of user should be skipped for closing and cancelling queries, to
+      # avoid breaking administrative tools that clean up queries via Beeswax RPCs.
+      different_user_client.cancel(handle)
+      different_user_client.close_query(handle)
+    finally:
+      if different_user_client is not None:
+        different_user_client.close()
+      if unset_user_client is not None:
+        unset_user_client.close()
+
+  def _assert_invalid_handle(self, fn):
+    """Assert that invoking fn() raises an ImpalaBeeswaxException with an invalid query
+    handle error."""
+    try:
+      fn()
+      assert False, "Expected invalid handle"
+    except ImpalaBeeswaxException, e:
+      assert "Query id" in str(e) and "not found" in str(e), str(e)
+
+  def _assert_profile_access_denied(self, fn):
+    """Assert that invoking fn() raises an ImpalaBeeswaxException with the error
+    message indicating that profile access is denied."""
+    try:
+      fn()
+      assert False, "Expected invalid handle"
+    except ImpalaBeeswaxException, e:
+      assert "is not authorized to access the runtime profile" in str(e), str(e)