You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by wz...@apache.org on 2023/02/04 17:47:55 UTC

[impala] 02/03: IMPALA-11891: Remove empty executor groups

This is an automated email from the ASF dual-hosted git repository.

wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 253423f99c5b0dee14fa279f0a916f5ed1dc2f03
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Thu Feb 2 09:52:49 2023 +0800

    IMPALA-11891: Remove empty executor groups
    
    This patch removes executor groups from cluster membership after they
    have no executors, so that executor groups' configurations can be
    updated without restarting all impalads in the cluster.
    
    Testing:
    - Added an e2e test to verify the new functionality.
    
    Change-Id: I480b84b26a780d345216004f1a4657c7b95dda45
    Reviewed-on: http://gerrit.cloudera.org:8080/19468
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Wenzhe Zhou <wz...@cloudera.com>
---
 be/src/scheduling/cluster-membership-mgr-test.cc |  7 +++-
 be/src/scheduling/cluster-membership-mgr.cc      | 26 ++++++++++----
 tests/custom_cluster/test_executor_groups.py     | 44 ++++++++++++++++++++++++
 3 files changed, 69 insertions(+), 8 deletions(-)

diff --git a/be/src/scheduling/cluster-membership-mgr-test.cc b/be/src/scheduling/cluster-membership-mgr-test.cc
index 1e54b427a..5773411d8 100644
--- a/be/src/scheduling/cluster-membership-mgr-test.cc
+++ b/be/src/scheduling/cluster-membership-mgr-test.cc
@@ -63,9 +63,14 @@ class ClusterMembershipMgrTest : public testing::Test {
  protected:
   ClusterMembershipMgrTest() {}
 
-  /// Returns the size of the default executor group of the current membership in 'cmm'.
+  /// Returns the size of the default executor group of the current membership in 'cmm'
+  /// if the default executor group exists, otherwise returns 0.
   int GetDefaultGroupSize(const ClusterMembershipMgr& cmm) const {
     const string& group_name = ImpalaServer::DEFAULT_EXECUTOR_GROUP_NAME;
+    if (cmm.GetSnapshot()->executor_groups.find(group_name)
+        == cmm.GetSnapshot()->executor_groups.end()) {
+      return 0;
+    }
     return cmm.GetSnapshot()->executor_groups.find(group_name)->second.NumExecutors();
   }
 
diff --git a/be/src/scheduling/cluster-membership-mgr.cc b/be/src/scheduling/cluster-membership-mgr.cc
index ff66317f0..34e154273 100644
--- a/be/src/scheduling/cluster-membership-mgr.cc
+++ b/be/src/scheduling/cluster-membership-mgr.cc
@@ -50,6 +50,21 @@ ExecutorGroup* FindOrInsertExecutorGroup(const ExecutorGroupDescPB& group,
   DCHECK(inserted);
   return &it->second;
 }
+
+/// Removes the executor 'be_desc' from the group 'group' if it exists and removes
+/// the group from the cluster if the group become empty after the executor removed.
+void RemoveExecutorAndGroup(const BackendDescriptorPB& be_desc,
+    const ExecutorGroupDescPB& group,
+    ClusterMembershipMgr::ExecutorGroups* executor_groups) {
+  auto it = executor_groups->find(group.name());
+  DCHECK(it != executor_groups->end());
+  DCHECK_EQ(group.name(), it->second.name());
+  it->second.RemoveExecutor(be_desc);
+  if (it->second.NumExecutors() == 0) {
+    VLOG(1) << "Removing empty group " << group.DebugString();
+    executor_groups->erase(it);
+  }
+}
 }
 
 namespace impala {
@@ -243,8 +258,7 @@ void ClusterMembershipMgr::UpdateMembership(
           for (const auto& group : be_desc.executor_groups()) {
             VLOG(1) << "Removing backend " << item.key << " from group "
                     << group.DebugString() << " (deleted)";
-            FindOrInsertExecutorGroup(
-                group, new_executor_groups)->RemoveExecutor(be_desc);
+            RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
           }
         }
         new_backend_map->erase(item.key);
@@ -310,8 +324,7 @@ void ClusterMembershipMgr::UpdateMembership(
           for (const auto& group : be_desc.executor_groups()) {
             VLOG(1) << "Removing backend " << item.key << " from group "
                     << group.DebugString() << " (quiescing)";
-            FindOrInsertExecutorGroup(group, new_executor_groups)
-                ->RemoveExecutor(be_desc);
+            RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
           }
         }
       }
@@ -355,8 +368,7 @@ void ClusterMembershipMgr::UpdateMembership(
     for (const auto& group : local_be_desc->executor_groups()) {
       if (local_be_desc->is_quiescing()) {
         VLOG(1) << "Removing local backend from group " << group.DebugString();
-        FindOrInsertExecutorGroup(
-            group, new_executor_groups)->RemoveExecutor(*local_be_desc);
+        RemoveExecutorAndGroup(*local_be_desc, group, new_executor_groups);
       } else if (local_be_desc->is_executor()) {
         VLOG(1) << "Adding local backend to group " << group.DebugString();
         FindOrInsertExecutorGroup(
@@ -445,7 +457,7 @@ void ClusterMembershipMgr::BlacklistExecutor(
   for (const auto& group : be_desc.executor_groups()) {
     VLOG(1) << "Removing backend " << be_desc.address() << " from group "
             << group.DebugString() << " (blacklisted)";
-    FindOrInsertExecutorGroup(group, new_executor_groups)->RemoveExecutor(be_desc);
+    RemoveExecutorAndGroup(be_desc, group, new_executor_groups);
   }
 
   ExecutorBlacklist* new_blacklist = &(new_state->executor_blacklist);
diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py
index e66146b38..7ab770986 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -84,6 +84,30 @@ class TestExecutorGroups(CustomClusterTestSuite):
                                add_executors=True,
                                expected_num_impalads=self.num_impalads)
 
+  def _add_executors(self, name_suffix, min_size, num_executors=0,
+                    extra_args=None, resource_pool=DEFAULT_RESOURCE_POOL,
+                    expected_num_impalads=0):
+    """Adds given number of executors to the cluster. 'min_size' specifies the minimum
+    size for the group to be considered healthy. 'num_executors' specifies the number of
+    executors to start. If 'name_suffix' is empty, no executor group is specified for
+    the new backends and they will end up in the default group."""
+    if num_executors == 0:
+      return
+    name = self._group_name(resource_pool, name_suffix)
+    LOG.info("Adding %s executors to group %s with minimum size %s" %
+             (num_executors, name, min_size))
+    cluster_args = []
+    if len(name_suffix) > 0:
+      cluster_args.append("--impalad_args=-executor_groups=%s:%s" % (name, min_size))
+    if extra_args:
+      cluster_args.append("--impalad_args=%s" % extra_args)
+    self._start_impala_cluster(options=cluster_args,
+                               cluster_size=num_executors,
+                               num_coordinators=0,
+                               add_executors=True,
+                               expected_num_impalads=expected_num_impalads)
+    self.num_impalads += num_executors
+
   def _restart_coordinators(self, num_coordinators, extra_args=None):
     """Restarts the coordinator spawned in setup_method and enables the caller to start
     more than one coordinator by specifying 'num_coordinators'"""
@@ -216,6 +240,26 @@ class TestExecutorGroups(CustomClusterTestSuite):
     self.execute_query_expect_success(client, TEST_QUERY)
     self._wait_for_num_executor_groups(1, only_healthy=True)
 
+  @pytest.mark.execute_serially
+  def test_executor_group_min_size_update(self):
+    """Tests that we can update an executor group's min size without restarting
+    coordinators."""
+    # Start cluster and group
+    self._add_executor_group("group1", min_size=1, num_executors=1)
+    self._wait_for_num_executor_groups(1, only_healthy=True)
+    client = self.client
+    # Kill the executor
+    executor = self.cluster.impalads[1]
+    executor.kill()
+    self.coordinator.service.wait_for_metric_value("cluster-membership.backends.total", 1,
+                                                   timeout=20)
+    assert self._get_num_executor_groups(only_healthy=True) == 0
+    # Add a new executor to group1 with group min size 2
+    self._add_executors("group1", min_size=2, num_executors=2, expected_num_impalads=3)
+    assert self._get_num_executor_groups(only_healthy=True) == 1
+    # Run query and observe success
+    self.execute_query_expect_success(client, TEST_QUERY)
+
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(impalad_args="-default_pool_max_requests=1")
   def test_executor_group_shutdown(self):