You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/20 17:58:13 UTC

[helix] branch ApplicationClusterManager updated: Exclude on-operation instance from computing min active replica in WAGED. (#2621)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch ApplicationClusterManager
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/ApplicationClusterManager by this push:
     new e7b4c5386 Exclude on-operation instance from computing min active replica in WAGED. (#2621)
e7b4c5386 is described below

commit e7b4c5386af43d24cb243538228f73aa6b40ab71
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Sep 20 10:58:08 2023 -0700

    Exclude on-operation instance from computing min active replica in WAGED. (#2621)
    
    Exclude on-operation instance from computing min active replica in WAGED.
---
 .../rebalancer/waged/WagedRebalancer.java          | 15 ++++++--
 .../rebalancer/TestInstanceOperation.java          | 42 ++++++++++++++++------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 6c1c4d74d..e717aa996 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,6 +47,7 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -395,7 +396,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       Map<String, ResourceAssignment> currentResourceAssignment,
       RebalanceAlgorithm algorithm) throws HelixRebalanceException {
     // the "real" live nodes at the time
-    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
+    final Set<String> enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
     if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
       // no need for additional process, return the current resource assignment
       return currentResourceAssignment;
@@ -424,6 +426,14 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
+  private static Set<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
+      Set<String> nodes) {
+    return nodes.stream()
+        .filter(
+            instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
    * and addressing for partitions failing to meet minActiveReplicas.
@@ -608,7 +618,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
       String resourceName = resourceAssignment.getResourceName();
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
-      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+      Set<String> enabledLiveInstances =
+          filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
       int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
           .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 6c51d58bb..c9c9c7597 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -15,6 +15,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
@@ -89,9 +90,11 @@ public class TestInstanceOperation extends ZkTestBase {
 
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.stateTransitionCancelEnabled(true);
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1800000L);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
-    createTestDBs(200);
+    createTestDBs(1800000L);
 
     setUpWagedBaseline();
 
@@ -199,7 +202,7 @@ public class TestInstanceOperation extends ZkTestBase {
   public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
     // add a resource where downward state transition is slow
     createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
-        REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
+        REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
     _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
     // add a resource where downward state transition is slow
     createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
@@ -338,21 +341,38 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
   public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
+    _participants.get(1).syncStop();
     _participants.get(2).syncStop();
-    _participants.get(3).syncStop();
-    // wait for converge, and set evacuate on instance 0
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    String evacuateInstanceName =  _participants.get(0).getInstanceName();
+    String evacuateInstanceName =  _participants.get(_participants.size()-2).getInstanceName();
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);
 
-    Map<String, IdealState> assignment;
-    List<String> currentActiveInstances =
-        _participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
-    TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);
+    Map<String, ExternalView> assignment;
+    // EV should contain all participants, check resources one by one
+    assignment = getEV();
+    for (String resource : _allDBs) {
+      ExternalView ev = assignment.get(resource);
+      for (String partition : ev.getPartitionSet()) {
+        AtomicInteger activeReplicaCount = new AtomicInteger();
+        ev.getStateMap(partition)
+            .values()
+            .stream()
+            .filter(
+                v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
+                    "STANDBY"))
+            .forEach(v -> activeReplicaCount.getAndIncrement());
+        Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
+        Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition)
+            .get(evacuateInstanceName)
+            .equals("MASTER") && ev.getStateMap(partition)
+            .get(evacuateInstanceName)
+            .equals("LEADER"));
+
+      }
+    }
 
-    _participants.get(3).syncStart();
+    _participants.get(1).syncStart();
     _participants.get(2).syncStart();
   }