You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by as...@apache.org on 2021/03/18 16:29:49 UTC

[impala] 03/05: IMPALA-10518: Add ImpalaServer interface to retrieve executor membership.

This is an automated email from the ASF dual-hosted git repository.

asherman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit decb79d0328f6b4daa74ca4228291571e7b4b668
Author: Aman Sinha <am...@cloudera.com>
AuthorDate: Sat Mar 13 12:41:52 2021 -0800

    IMPALA-10518: Add ImpalaServer interface to retrieve executor membership.
    
    This patch adds an interface to ImpalaServer to retrieve the current
    executor membership snapshot from impalad for use by an external
    frontend. This involves sending a thrift request to impalad and
    receiving a thrift response. Refactored some code in exec-env into
    a separate function in the impala namespace which makes it easier to
    populate the needed information for an external frontend.
    
    Testing:
     - Ran selected tests for sanity check (no impact is expected
       since this is adding a new interface):
        - Frontend tests (PlannerTest, CardinalityTest)
        - Backend tests under custom_cluster/test_executor_groups.py
     - Manually tested with external frontend to ensure it gets
       the executor membership snapshot.
    
    Change-Id: Ie89b71f4555c368869ee7b9d6341756c60af12b5
    Reviewed-on: http://gerrit.cloudera.org:8080/17181
    Reviewed-by: Thomas Tauber-Marshall <tm...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/hs2-http-test.cc                 |  2 ++
 be/src/runtime/exec-env.cc                  | 43 +++-----------------------
 be/src/scheduling/cluster-membership-mgr.cc | 47 ++++++++++++++++++++++++++++-
 be/src/scheduling/cluster-membership-mgr.h  |  5 +++
 be/src/service/impala-hs2-server.cc         | 39 ++++++++++++++++++++++++
 be/src/service/impala-server.h              |  4 +++
 common/thrift/ImpalaService.thrift          | 14 +++++++++
 tests/hs2/test_hs2.py                       |  9 ++++++
 8 files changed, 124 insertions(+), 39 deletions(-)

diff --git a/be/src/rpc/hs2-http-test.cc b/be/src/rpc/hs2-http-test.cc
index 0c61df9..5dde309 100644
--- a/be/src/rpc/hs2-http-test.cc
+++ b/be/src/rpc/hs2-http-test.cc
@@ -52,6 +52,8 @@ class TestHS2Service : public ImpalaHiveServer2ServiceIf {
       TExecuteStatementResp& _return, const TExecutePlannedStatementReq& req) {}
   virtual void GetBackendConfig(TGetBackendConfigResp& _return,
       const TGetBackendConfigReq& req) {}
+  virtual void GetExecutorMembership(
+      TGetExecutorMembershipResp& _return, const TGetExecutorMembershipReq& req) {}
   virtual void GetTypeInfo(TGetTypeInfoResp& _return, const TGetTypeInfoReq& req) {}
   virtual void GetCatalogs(TGetCatalogsResp& _return, const TGetCatalogsReq& req) {}
   virtual void GetSchemas(TGetSchemasResp& _return, const TGetSchemasReq& req) {}
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index c3c8f06..9726c08 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -173,47 +173,14 @@ const static int COORDINATOR_CONCURRENCY_MULTIPLIER = 8;
 namespace {
 using namespace impala;
 /// Helper method to forward cluster membership updates to the frontend.
-///
-/// The frontend uses cluster membership information to determine whether it expects the
-/// scheduler to assign local or remote reads. It also uses the number of executors to
-/// determine the join type (partitioned vs broadcast). For the default executor group, we
-/// assume that local reads are preferred and will include the hostnames and IP addresses
-/// in the update to the frontend. For non-default executor groups, we assume that we will
-/// read data remotely and will only send the number of executors in the largest healthy
-/// group.
+/// For additional details see comments for PopulateExecutorMembershipRequest()
+/// in cluster-membership-mgr.cc
 void SendClusterMembershipToFrontend(
     ClusterMembershipMgr::SnapshotPtr& snapshot, Frontend* frontend) {
   TUpdateExecutorMembershipRequest update_req;
-  const ExecutorGroup* group = nullptr;
-  bool is_default_group = false;
-  auto default_it =
-      snapshot->executor_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
-  if (default_it != snapshot->executor_groups.end()) {
-    is_default_group = true;
-    group = &(default_it->second);
-  } else {
-    int max_num_executors = 0;
-    // Find largest healthy group.
-    for (const auto& it : snapshot->executor_groups) {
-      if (!it.second.IsHealthy()) continue;
-      int num_executors = it.second.NumExecutors();
-      if (num_executors > max_num_executors) {
-        max_num_executors = num_executors;
-        group = &(it.second);
-      }
-    }
-  }
-  if (group) {
-    for (const auto& backend : group->GetAllExecutorDescriptors()) {
-      if (backend.is_executor()) {
-        if (is_default_group) {
-          update_req.hostnames.insert(backend.address().hostname());
-          update_req.ip_addresses.insert(backend.ip_address());
-        }
-        update_req.num_executors++;
-      }
-    }
-  }
+
+  PopulateExecutorMembershipRequest(snapshot, update_req);
+
   Status status = frontend->UpdateExecutorMembership(update_req);
   if (!status.ok()) {
     LOG(WARNING) << "Error updating frontend membership snapshot: " << status.GetDetail();
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index 1016fcb..91bae23 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -19,6 +19,8 @@
 
 #include "common/logging.h"
 #include "common/names.h"
+#include "gen-cpp/Frontend_types.h"
+#include "service/impala-server.h"
 #include "util/metrics.h"
 #include "util/test-info.h"
 
@@ -40,7 +42,6 @@ ExecutorGroup* FindOrInsertExecutorGroup(const ExecutorGroupDescPB& group,
   DCHECK(inserted);
   return &it->second;
 }
-
 }
 
 namespace impala {
@@ -567,4 +568,48 @@ bool ClusterMembershipMgr::IsBackendInExecutorGroups(
   return false;
 }
 
+/// Helper method to populate a thrift request object for cluster membership
+/// using a supplied snapshot from the cluster membership manager.
+///
+/// The frontend uses cluster membership information to determine whether it expects the
+/// scheduler to assign local or remote reads. It also uses the number of executors to
+/// determine the join type (partitioned vs broadcast). For the default executor group, we
+/// assume that local reads are preferred and will include the hostnames and IP addresses
+/// in the update to the frontend. For non-default executor groups, we assume that we will
+/// read data remotely and will only send the number of executors in the largest healthy
+/// group.
+void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot,
+    TUpdateExecutorMembershipRequest& update_req) {
+  const ExecutorGroup* group = nullptr;
+  bool is_default_group = false;
+  auto default_it =
+      snapshot->executor_groups.find(ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME);
+  if (default_it != snapshot->executor_groups.end()) {
+    is_default_group = true;
+    group = &(default_it->second);
+  } else {
+    int max_num_executors = 0;
+    // Find largest healthy group.
+    for (const auto& it : snapshot->executor_groups) {
+      if (!it.second.IsHealthy()) continue;
+      int num_executors = it.second.NumExecutors();
+      if (num_executors > max_num_executors) {
+        max_num_executors = num_executors;
+        group = &(it.second);
+      }
+    }
+  }
+  if (group) {
+    for (const auto& backend : group->GetAllExecutorDescriptors()) {
+      if (backend.is_executor()) {
+        if (is_default_group) {
+          update_req.hostnames.insert(backend.address().hostname());
+          update_req.ip_addresses.insert(backend.ip_address());
+        }
+        update_req.num_executors++;
+      }
+    }
+  }
+}
+
 } // end namespace impala
diff --git a/be/src/scheduling/cluster-membership-mgr.h b/be/src/scheduling/cluster-membership-mgr.h
index 5898537..e7e218b 100644
--- a/be/src/scheduling/cluster-membership-mgr.h
+++ b/be/src/scheduling/cluster-membership-mgr.h
@@ -32,6 +32,8 @@
 
 namespace impala {
 
+class TUpdateExecutorMembershipRequest;
+
 namespace test {
 class SchedulerWrapper;
 }
@@ -257,4 +259,7 @@ class ClusterMembershipMgr {
   friend class impala::test::SchedulerWrapper;
 };
 
+void PopulateExecutorMembershipRequest(ClusterMembershipMgr::SnapshotPtr& snapshot,
+    TUpdateExecutorMembershipRequest& update_req);
+
 } // end namespace impala
diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 76a1c35..2f6858b 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -1197,6 +1197,45 @@ void ImpalaServer::GetBackendConfig(TGetBackendConfigResp& return_val,
   VLOG_RPC << "GetBackendConfig(): return_val=" << ThriftDebugString(return_val);
 }
 
+void ImpalaServer::GetExecutorMembership(
+    TGetExecutorMembershipResp& return_val, const TGetExecutorMembershipReq& req) {
+  VLOG_QUERY << "GetExecutorMembership(): req=" << ThriftDebugString(req);
+  const ThriftServer::ConnectionContext* connection_context =
+      ThriftServer::GetThreadConnectionContext();
+  if (connection_context->server_name != EXTERNAL_FRONTEND_SERVER_NAME) {
+    HS2_RETURN_ERROR(
+        return_val, "Unsupported operation", SQLSTATE_OPTIONAL_FEATURE_NOT_IMPLEMENTED);
+  }
+  TUniqueId session_id;
+  TUniqueId secret;
+  HS2_RETURN_IF_ERROR(return_val,
+      THandleIdentifierToTUniqueId(req.sessionHandle.sessionId, &session_id, &secret),
+      SQLSTATE_GENERAL_ERROR);
+  ScopedSessionState session_handle(this);
+  shared_ptr<SessionState> session;
+  HS2_RETURN_IF_ERROR(return_val,
+      session_handle.WithSession(session_id, SecretArg::Session(secret), &session),
+      SQLSTATE_GENERAL_ERROR);
+  if (session == NULL) {
+    HS2_RETURN_ERROR(return_val,
+        Substitute("Invalid session id: $0", PrintId(session_id)),
+        SQLSTATE_GENERAL_ERROR);
+  }
+
+  ClusterMembershipMgr* cluster_membership_mgr =
+      DCHECK_NOTNULL(ExecEnv::GetInstance()->cluster_membership_mgr());
+  ClusterMembershipMgr::SnapshotPtr membership_snapshot =
+      cluster_membership_mgr->GetSnapshot();
+  DCHECK_NOTNULL(membership_snapshot.get());
+
+  // Populate an instance of TUpdateExecutorMembershipRequest
+  // with the field values retrieved from membership_snapshot
+  PopulateExecutorMembershipRequest(membership_snapshot, return_val.executor_membership);
+
+  return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
+  VLOG_RPC << "GetExecutorMembership(): return_val=" << ThriftDebugString(return_val);
+}
+
 void ImpalaServer::CancelDelegationToken(TCancelDelegationTokenResp& return_val,
     const TCancelDelegationTokenReq& req) {
   return_val.status.__set_statusCode(thrift::TStatusCode::ERROR_STATUS);
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index cf63fa8..a98c2b4 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -357,6 +357,10 @@ class ImpalaServer : public ImpalaServiceIf,
   virtual void GetBackendConfig(TGetBackendConfigResp& return_val,
       const TGetBackendConfigReq& request);
 
+  // Retrieves the current ExecutorMembership
+  virtual void GetExecutorMembership(
+      TGetExecutorMembershipResp& return_val, const TGetExecutorMembershipReq& request);
+
   /// Closes an Impala operation and returns additional information about the closed
   /// operation.
   virtual void CloseImpalaOperation(
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 44cc8a2..80eb80e 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -834,6 +834,16 @@ struct TGetBackendConfigResp {
   2: required BackendGflags.TBackendGflags backend_config
 }
 
+struct TGetExecutorMembershipReq {
+  1: required TCLIService.TSessionHandle sessionHandle
+}
+
+struct TGetExecutorMembershipResp {
+  1: required TCLIService.TStatus status
+
+  2: required Frontend.TUpdateExecutorMembershipRequest executor_membership
+}
+
 service ImpalaHiveServer2Service extends TCLIService.TCLIService {
   // Returns the exec summary for the given query. The exec summary is only valid for
   // queries that execute with Impala's backend, i.e. QUERY, DML and COMPUTE_STATS
@@ -856,4 +866,8 @@ service ImpalaHiveServer2Service extends TCLIService.TCLIService {
 
   // Returns the current TBackendGflags. Only supported for the "external fe" service.
   TGetBackendConfigResp GetBackendConfig(1:TGetBackendConfigReq req);
+
+  // Returns the executor membership information. Only supported for the "external fe"
+  // service.
+  TGetExecutorMembershipResp GetExecutorMembership(1:TGetExecutorMembershipReq req);
 }
diff --git a/tests/hs2/test_hs2.py b/tests/hs2/test_hs2.py
index b72e221..4bdf449 100644
--- a/tests/hs2/test_hs2.py
+++ b/tests/hs2/test_hs2.py
@@ -740,6 +740,15 @@ class TestHS2(HS2TestSuite):
         TCLIService.TStatusCode.ERROR_STATUS, "Unsupported operation")
 
   @needs_session()
+  def test_get_executor_membership(self):
+    get_executor_membership_req = ImpalaHiveServer2Service.TGetExecutorMembershipReq()
+    get_executor_membership_req.sessionHandle = self.session_handle
+    get_executor_membership_resp = self.hs2_client.GetExecutorMembership(
+      get_executor_membership_req)
+    TestHS2.check_response(get_executor_membership_resp,
+        TCLIService.TStatusCode.ERROR_STATUS, "Unsupported operation")
+
+  @needs_session()
   def test_get_profile(self):
     statement = "SELECT COUNT(2) FROM functional.alltypes"
     execute_statement_resp = self.execute_statement(statement)