You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by xy...@apache.org on 2023/09/21 05:45:50 UTC

[helix] branch master updated (045deb60f -> 79cefa3eb)

This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


    from 045deb60f Support persist instance info collected by CloudInstanceInformationProcessor during auto-reg (#2622)
     new 56f230e9c Exclude on-operation instance from computing min active replica in WAGED. (#2621)
     new 79cefa3eb Refine TestInstanceOperation by using verifier (#2625)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../rebalancer/waged/WagedRebalancer.java          | 15 ++++++-
 .../rebalancer/TestInstanceOperation.java          | 51 +++++++++++++++++-----
 2 files changed, 52 insertions(+), 14 deletions(-)


[helix] 01/02: Exclude on-operation instance from computing min active replica in WAGED. (#2621)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 56f230e9ceda72ba00fc1517ee360c926310a1e8
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Sep 20 10:58:08 2023 -0700

    Exclude on-operation instance from computing min active replica in WAGED. (#2621)
    
    Exclude on-operation instance from computing min active replica in WAGED.
---
 .../rebalancer/waged/WagedRebalancer.java          | 15 ++++++--
 .../rebalancer/TestInstanceOperation.java          | 42 ++++++++++++++++------
 2 files changed, 44 insertions(+), 13 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 6c1c4d74d..e717aa996 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -47,6 +47,7 @@ import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
@@ -395,7 +396,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
       Map<String, ResourceAssignment> currentResourceAssignment,
       RebalanceAlgorithm algorithm) throws HelixRebalanceException {
     // the "real" live nodes at the time
-    final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+    // TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
+    final Set<String> enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
     if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
       // no need for additional process, return the current resource assignment
       return currentResourceAssignment;
@@ -424,6 +426,14 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
+  private static Set<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
+      Set<String> nodes) {
+    return nodes.stream()
+        .filter(
+            instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
    * and addressing for partitions failing to meet minActiveReplicas.
@@ -608,7 +618,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
       String resourceName = resourceAssignment.getResourceName();
       IdealState currentIdealState = clusterData.getIdealState(resourceName);
-      Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
+      Set<String> enabledLiveInstances =
+          filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
       int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
       int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
           .mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index 6c51d58bb..c9c9c7597 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -15,6 +15,7 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixRollbackException;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.TestHelper;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.constants.InstanceConstants;
@@ -89,9 +90,11 @@ public class TestInstanceOperation extends ZkTestBase {
 
     ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
     clusterConfig.stateTransitionCancelEnabled(true);
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1800000L);
     _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
 
-    createTestDBs(200);
+    createTestDBs(1800000L);
 
     setUpWagedBaseline();
 
@@ -199,7 +202,7 @@ public class TestInstanceOperation extends ZkTestBase {
   public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
     // add a resource where downward state transition is slow
     createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
-        REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
+        REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
     _allDBs.add("TEST_DB3_DELAYED_CRUSHED");
     // add a resource where downward state transition is slow
     createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
@@ -338,21 +341,38 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
   public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
+    _participants.get(1).syncStop();
     _participants.get(2).syncStop();
-    _participants.get(3).syncStop();
-    // wait for converge, and set evacuate on instance 0
-    Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    String evacuateInstanceName =  _participants.get(0).getInstanceName();
+    String evacuateInstanceName =  _participants.get(_participants.size()-2).getInstanceName();
     _gSetupTool.getClusterManagementTool()
         .setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);
 
-    Map<String, IdealState> assignment;
-    List<String> currentActiveInstances =
-        _participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
-    TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);
+    Map<String, ExternalView> assignment;
+    // EV should contain all participants, check resources one by one
+    assignment = getEV();
+    for (String resource : _allDBs) {
+      ExternalView ev = assignment.get(resource);
+      for (String partition : ev.getPartitionSet()) {
+        AtomicInteger activeReplicaCount = new AtomicInteger();
+        ev.getStateMap(partition)
+            .values()
+            .stream()
+            .filter(
+                v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
+                    "STANDBY"))
+            .forEach(v -> activeReplicaCount.getAndIncrement());
+        Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
+        Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition)
+            .get(evacuateInstanceName)
+            .equals("MASTER") && ev.getStateMap(partition)
+            .get(evacuateInstanceName)
+            .equals("LEADER"));
+
+      }
+    }
 
-    _participants.get(3).syncStart();
+    _participants.get(1).syncStart();
     _participants.get(2).syncStart();
   }
 


[helix] 02/02: Refine TestInstanceOperation by using verifier (#2625)

Posted by xy...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xyuanlu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 79cefa3ebe4059fce238256e0a12a364e290bd88
Author: xyuanlu <xy...@gmail.com>
AuthorDate: Wed Sep 20 20:08:40 2023 -0700

    Refine TestInstanceOperation by using verifier (#2625)
---
 .../rebalancer/TestInstanceOperation.java          | 45 +++++++++++++---------
 1 file changed, 26 insertions(+), 19 deletions(-)

diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
index c9c9c7597..10cd662cb 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestInstanceOperation.java
@@ -103,6 +103,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test
   public void testEvacuate() throws Exception {
+    System.out.println("START TestInstanceOperation.testEvacuate() at " + new Date(System.currentTimeMillis()));
     // EV should contain all participants, check resources one by one
     Map<String, ExternalView> assignment = getEV();
     for (String resource : _allDBs) {
@@ -133,7 +134,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testEvacuate")
   public void testRevertEvacuation() throws Exception {
-
+    System.out.println("START TestInstanceOperation.testRevertEvacuation() at " + new Date(System.currentTimeMillis()));
     // revert an evacuate instance
     String instanceToEvacuate = _participants.get(0).getInstanceName();
     _gSetupTool.getClusterManagementTool()
@@ -151,6 +152,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testRevertEvacuation")
   public void testAddingNodeWithEvacuationTag() throws Exception {
+    System.out.println("START TestInstanceOperation.testAddingNodeWithEvacuationTag() at " + new Date(System.currentTimeMillis()));
     // first disable and instance, and wait for all replicas to be moved out
     String mockNewInstance = _participants.get(0).getInstanceName();
     _gSetupTool.getClusterManagementTool()
@@ -200,6 +202,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testAddingNodeWithEvacuationTag")
   public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
+    System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeBootstrapFinish() at " + new Date(System.currentTimeMillis()));
     // add a resource where downward state transition is slow
     createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
         REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
@@ -261,6 +264,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testEvacuateAndCancelBeforeBootstrapFinish")
   public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
+    System.out.println("START TestInstanceOperation.testEvacuateAndCancelBeforeDropFinish() at " + new Date(System.currentTimeMillis()));
 
     // set DROP ST delay to a large number
     _stateModelDelay = 10000L;
@@ -297,6 +301,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testEvacuateAndCancelBeforeDropFinish")
   public void testMarkEvacuationAfterEMM() throws Exception {
+    System.out.println("START TestInstanceOperation.testMarkEvacuationAfterEMM() at " + new Date(System.currentTimeMillis()));
     _stateModelDelay = 1000L;
     Assert.assertFalse(_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME));
     _gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME, true, null,
@@ -341,6 +346,7 @@ public class TestInstanceOperation extends ZkTestBase {
 
   @Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
   public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
+    System.out.println("START TestInstanceOperation.testEvacuationWithOfflineInstancesInCluster() at " + new Date(System.currentTimeMillis()));
     _participants.get(1).syncStop();
     _participants.get(2).syncStop();
 
@@ -352,24 +358,25 @@ public class TestInstanceOperation extends ZkTestBase {
     // EV should contain all participants, check resources one by one
     assignment = getEV();
     for (String resource : _allDBs) {
-      ExternalView ev = assignment.get(resource);
-      for (String partition : ev.getPartitionSet()) {
-        AtomicInteger activeReplicaCount = new AtomicInteger();
-        ev.getStateMap(partition)
-            .values()
-            .stream()
-            .filter(
-                v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
-                    "STANDBY"))
-            .forEach(v -> activeReplicaCount.getAndIncrement());
-        Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
-        Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition)
-            .get(evacuateInstanceName)
-            .equals("MASTER") && ev.getStateMap(partition)
-            .get(evacuateInstanceName)
-            .equals("LEADER"));
-
-      }
+      TestHelper.verify(() -> {
+        ExternalView ev = assignment.get(resource);
+        for (String partition : ev.getPartitionSet()) {
+          AtomicInteger activeReplicaCount = new AtomicInteger();
+          ev.getStateMap(partition)
+              .values()
+              .stream()
+              .filter(v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER")
+                  || v.equals("STANDBY"))
+              .forEach(v -> activeReplicaCount.getAndIncrement());
+          if (activeReplicaCount.get() < REPLICA - 1 || (ev.getStateMap(partition).containsKey(evacuateInstanceName)
+              && ev.getStateMap(partition).get(evacuateInstanceName).equals("MASTER") && ev.getStateMap(partition)
+              .get(evacuateInstanceName)
+              .equals("LEADER"))) {
+            return false;
+          }
+        }
+        return true;
+      }, 30000);
     }
 
     _participants.get(1).syncStart();