You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/02 20:58:33 UTC

[helix] branch wagedRebalancer updated (1b4457d -> 3ac3b22)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git.


    from 1b4457d  Add delayed rebalance and user-defined preference list features to the WAGED rebalancer. (#456)
     new aae2e93  Enable maintenance mode for the WAGED rebalancer.
     new 3ac3b22  Adjust the topology processing logic for instance to ensure backward compatibility.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../ConstraintBasedAlgorithmFactory.java           |  19 +-
 .../rebalancer/waged/model/AssignableNode.java     |  52 ++--
 .../stages/BestPossibleStateCalcStage.java         |  20 +-
 .../rebalancer/waged/model/TestAssignableNode.java |  15 +-
 .../TestPartitionMigrationBase.java                |  15 +-
 .../PartitionMigration/TestWagedExpandCluster.java |  65 +++++
 .../TestWagedRebalancerMigration.java              | 108 ++++++++
 .../rebalancer/WagedRebalancer/TestNodeSwap.java   | 291 +++++++++++++++++++++
 8 files changed, 528 insertions(+), 57 deletions(-)
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java


[helix] 01/02: Enable maintenance mode for the WAGED rebalancer.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit aae2e93ef10399283a9a3e59459817f958acde6a
Author: Jiajun Wang <jj...@linkedin.com>
AuthorDate: Sun Sep 29 15:50:25 2019 -0700

    Enable maintenance mode for the WAGED rebalancer.
    
    The maintenance mode rebalance logic keeps the same as the previous feature.
    Add more tests about partition migration and node swap that requires maintenance mode.
---
 .../ConstraintBasedAlgorithmFactory.java           |  19 +-
 .../stages/BestPossibleStateCalcStage.java         |  20 +-
 .../TestPartitionMigrationBase.java                |  15 +-
 .../PartitionMigration/TestWagedExpandCluster.java |  65 +++++
 .../TestWagedRebalancerMigration.java              | 108 ++++++++
 .../rebalancer/WagedRebalancer/TestNodeSwap.java   | 291 +++++++++++++++++++++
 6 files changed, 500 insertions(+), 18 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
index 8568444..657fc82 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/constraints/ConstraintBasedAlgorithmFactory.java
@@ -22,32 +22,37 @@ package org.apache.helix.controller.rebalancer.waged.constraints;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
-import org.apache.helix.model.ClusterConfig;
-
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
+import org.apache.helix.model.ClusterConfig;
 
 /**
  * The factory class to create an instance of {@link ConstraintBasedAlgorithm}
  */
 public class ConstraintBasedAlgorithmFactory {
+  // Evenness constraints tend to score within a smaller range.
+  // In order to let their scores cause enough difference in the final evaluation result, we need to
+  // enlarge the overall weight of the evenness constraints compared with the movement constraint.
+  // TODO: Tune or make the following factor configurable.
+  private static final int EVENNESS_PREFERENCE_NORMALIZE_FACTOR = 50;
 
   public static RebalanceAlgorithm getInstance(
       Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
-    List<HardConstraint> hardConstraints =
-        ImmutableList.of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
+    List<HardConstraint> hardConstraints = ImmutableList
+        .of(new FaultZoneAwareConstraint(), new NodeCapacityConstraint(),
             new ReplicaActivateConstraint(), new NodeMaxPartitionLimitConstraint(),
             new ValidGroupTagConstraint(), new SamePartitionOnInstanceConstraint());
 
     int evennessPreference =
-        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1);
+        preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, 1)
+            * EVENNESS_PREFERENCE_NORMALIZE_FACTOR;
     int movementPreference =
         preferences.getOrDefault(ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, 1);
     float evennessRatio = (float) evennessPreference / (evennessPreference + movementPreference);
     float movementRatio = (float) movementPreference / (evennessPreference + movementPreference);
 
-    Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float> builder()
+    Map<SoftConstraint, Float> softConstraints = ImmutableMap.<SoftConstraint, Float>builder()
         .put(new PartitionMovementConstraint(), movementRatio)
         .put(new InstancePartitionsCountConstraint(), 0.3f * evennessRatio)
         .put(new ResourcePartitionAntiAffinityConstraint(), 0.1f * evennessRatio)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
index 8ce60e7..30b4443 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/BestPossibleStateCalcStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
  */
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -235,6 +236,11 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       ResourceControllerDataProvider cache, CurrentStateOutput currentStateOutput,
       HelixManager helixManager, Map<String, Resource> resourceMap, BestPossibleStateOutput output,
       List<String> failureResources) {
+    if (cache.isMaintenanceModeEnabled()) {
+      // The WAGED rebalancer won't be used while maintenance mode is enabled.
+      return Collections.emptyMap();
+    }
+
     // Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
     Map<String, Resource> wagedRebalancedResourceMap =
         resourceMap.entrySet().stream().filter(resourceEntry -> {
@@ -394,10 +400,9 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
     }
   }
 
-  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
-      String resourceName, boolean isMaintenanceModeEnabled) {
+  private Rebalancer<ResourceControllerDataProvider> getCustomizedRebalancer(
+      String rebalancerClassName, String resourceName) {
     Rebalancer<ResourceControllerDataProvider> customizedRebalancer = null;
-    String rebalancerClassName = idealState.getRebalancerClassName();
     if (rebalancerClassName != null) {
       if (logger.isDebugEnabled()) {
         LogUtil.logDebug(logger, _eventId,
@@ -411,13 +416,19 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
             "Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
       }
     }
+    return customizedRebalancer;
+  }
 
+  private Rebalancer<ResourceControllerDataProvider> getRebalancer(IdealState idealState,
+      String resourceName, boolean isMaintenanceModeEnabled) {
     Rebalancer<ResourceControllerDataProvider> rebalancer = null;
     switch (idealState.getRebalanceMode()) {
     case FULL_AUTO:
       if (isMaintenanceModeEnabled) {
         rebalancer = new MaintenanceRebalancer();
       } else {
+        Rebalancer<ResourceControllerDataProvider> customizedRebalancer =
+            getCustomizedRebalancer(idealState.getRebalancerClassName(), resourceName);
         if (customizedRebalancer != null) {
           rebalancer = customizedRebalancer;
         } else {
@@ -433,14 +444,13 @@ public class BestPossibleStateCalcStage extends AbstractBaseStage {
       break;
     case USER_DEFINED:
     case TASK:
-      rebalancer = customizedRebalancer;
+      rebalancer = getCustomizedRebalancer(idealState.getRebalancerClassName(), resourceName);
       break;
     default:
       LogUtil.logError(logger, _eventId,
           "Fail to find the rebalancer, invalid rebalance mode " + idealState.getRebalanceMode());
       break;
     }
-
     return rebalancer;
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
index 7a559a5..61d72a2 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestPartitionMigrationBase.java
@@ -57,14 +57,14 @@ public class TestPartitionMigrationBase extends ZkTestBase {
   protected ClusterControllerManager _controller;
 
   List<MockParticipantManager> _participants = new ArrayList<>();
-  int _replica = 3;
-  int _minActiveReplica = _replica - 1;
+  protected int _replica = 3;
+  protected int _minActiveReplica = _replica - 1;
   ZkHelixClusterVerifier _clusterVerifier;
-  List<String> _testDBs = new ArrayList<>();
+  protected List<String> _testDBs = new ArrayList<>();
 
   MigrationStateVerifier _migrationVerifier;
   HelixManager _manager;
-  ConfigAccessor _configAccessor;
+  protected ConfigAccessor _configAccessor;
 
 
   @BeforeClass
@@ -84,8 +84,7 @@ public class TestPartitionMigrationBase extends ZkTestBase {
     _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
     _controller.syncStart();
 
-    _clusterVerifier =
-        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    _clusterVerifier = getVerifier();
 
     enablePersistIntermediateAssignment(_gZkClient, CLUSTER_NAME, true);
 
@@ -95,6 +94,10 @@ public class TestPartitionMigrationBase extends ZkTestBase {
     _configAccessor = new ConfigAccessor(_gZkClient);
   }
 
+  protected ZkHelixClusterVerifier getVerifier() {
+    return new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+  }
+
   protected MockParticipantManager createAndStartParticipant(String instancename) {
     _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instancename);
 
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
new file mode 100644
index 0000000..d303e87
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedExpandCluster.java
@@ -0,0 +1,65 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+
+public class TestWagedExpandCluster extends TestExpandCluster {
+// TODO check the movements in between
+  protected ZkHelixClusterVerifier getVerifier() {
+    Set<String> dbNames = new HashSet<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      dbNames.add("Test-DB-" + i++);
+    }
+    return new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setResources(dbNames)
+        .setZkAddr(ZK_ADDR).build();
+  }
+
+  protected Map<String, IdealState> createTestDBs(long delayTime) {
+    Map<String, IdealState> idealStateMap = new HashMap<>();
+    int i = 0;
+    for (String stateModel : TestStateModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _minActiveReplica);
+      _testDBs.add(db);
+    }
+    for (String db : _testDBs) {
+      IdealState is =
+          _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+      idealStateMap.put(db, is);
+    }
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(delayTime);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    return idealStateMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
new file mode 100644
index 0000000..1a13496
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/PartitionMigration/TestWagedRebalancerMigration.java
@@ -0,0 +1,108 @@
+package org.apache.helix.integration.rebalancer.PartitionMigration;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class TestWagedRebalancerMigration extends TestPartitionMigrationBase {
+  ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    super.beforeClass();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @DataProvider(name = "stateModels")
+  public static Object[][] stateModels() {
+    return new Object[][] { { BuiltInStateModelDefinitions.MasterSlave.name(), true },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), true },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), true },
+        { BuiltInStateModelDefinitions.MasterSlave.name(), false },
+        { BuiltInStateModelDefinitions.OnlineOffline.name(), false },
+        { BuiltInStateModelDefinitions.LeaderStandby.name(), false },
+    };
+  }
+
+  // TODO check the movements in between
+  @Test(dataProvider = "stateModels")
+  public void testMigrateToWagedRebalancerWhileExpandCluster(String stateModel,
+      boolean delayEnabled) throws Exception {
+    String db = "Test-DB-" + stateModel;
+    if (delayEnabled) {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica - 1, 3000000, CrushRebalanceStrategy.class.getName());
+    } else {
+      createResourceWithDelayedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica, 0, CrushRebalanceStrategy.class.getName());
+    }
+    IdealState idealState =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    ClusterConfig config = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    config.setDelayRebalaceEnabled(delayEnabled);
+    config.setRebalanceDelayTime(3000000);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, config);
+
+    // add new instance to the cluster
+    int numNodes = _participants.size();
+    for (int i = numNodes; i < numNodes + NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      MockParticipantManager participant = createAndStartParticipant(storageNodeName);
+      _participants.add(participant);
+      Thread.sleep(100);
+    }
+    Thread.sleep(2000);
+    ZkHelixClusterVerifier clusterVerifier =
+        new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME)
+            .setResources(Collections.singleton(db)).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verifyByPolling());
+
+    _migrationVerifier =
+        new MigrationStateVerifier(Collections.singletonMap(db, idealState), _manager);
+
+    _migrationVerifier.reset();
+    _migrationVerifier.start();
+
+    IdealState currentIdealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, db);
+    currentIdealState.setRebalancerClassName(WagedRebalancer.class.getName());
+    _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, db, currentIdealState);
+    Thread.sleep(2000);
+    Assert.assertTrue(clusterVerifier.verifyByPolling());
+
+    Assert.assertFalse(_migrationVerifier.hasLessReplica());
+    Assert.assertFalse(_migrationVerifier.hasMoreReplica());
+
+    _migrationVerifier.stop();
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
new file mode 100644
index 0000000..360e495
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/WagedRebalancer/TestNodeSwap.java
@@ -0,0 +1,291 @@
+package org.apache.helix.integration.rebalancer.WagedRebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNodeSwap extends ZkTestBase {
+  final int NUM_NODE = 6;
+  protected static final int START_PORT = 12918;
+  protected static final int _PARTITIONS = 20;
+
+  protected final String CLASS_NAME = getShortClassName();
+  protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  protected ClusterControllerManager _controller;
+  protected HelixClusterVerifier _clusterVerifier;
+
+  List<MockParticipantManager> _participants = new ArrayList<>();
+  Set<String> _allDBs = new HashSet<>();
+  int _replica = 3;
+
+  String[] _testModels = { BuiltInStateModelDefinitions.OnlineOffline.name(),
+      BuiltInStateModelDefinitions.MasterSlave.name(),
+      BuiltInStateModelDefinitions.LeaderStandby.name()
+  };
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setTopology("/zone/instance");
+    clusterConfig.setFaultZoneType("zone");
+    clusterConfig.setDelayRebalaceEnabled(true);
+    // Set a long enough time to ensure delayed rebalance is activate
+    clusterConfig.setRebalanceDelayTime(3000000);
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    Set<String> nodes = new HashSet<>();
+    for (int i = 0; i < NUM_NODE; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+      String zone = "zone-" + i % 3;
+      String domain = String.format("zone=%s,instance=%s", zone, storageNodeName);
+
+      InstanceConfig instanceConfig =
+          configAccessor.getInstanceConfig(CLUSTER_NAME, storageNodeName);
+      instanceConfig.setDomain(domain);
+      _gSetupTool.getClusterManagementTool()
+          .setInstanceConfig(CLUSTER_NAME, storageNodeName, instanceConfig);
+      nodes.add(storageNodeName);
+    }
+
+    // start dummy participants
+    for (String node : nodes) {
+      MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, node);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true);
+    enableTopologyAwareRebalance(_gZkClient, CLUSTER_NAME, true);
+
+    int i = 0;
+    for (String stateModel : _testModels) {
+      String db = "Test-DB-" + i++;
+      createResourceWithWagedRebalance(CLUSTER_NAME, db, stateModel, _PARTITIONS, _replica,
+          _replica - 1);
+      _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, db, _replica);
+      _allDBs.add(db);
+    }
+    Thread.sleep(1000);
+
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _controller.syncStop();
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+    deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test
+  public void testNodeSwap() throws Exception {
+    Map<String, ExternalView> record = new HashMap<>();
+    for (String db : _allDBs) {
+      record.put(db,
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db));
+    }
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    // 1. disable an old node
+    MockParticipantManager oldParticipant = _participants.get(0);
+    String oldParticipantName = oldParticipant.getInstanceName();
+    final InstanceConfig instanceConfig =
+        _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, oldParticipantName);
+    instanceConfig.setInstanceEnabled(false);
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceConfig(CLUSTER_NAME, oldParticipantName, instanceConfig);
+    Assert.assertTrue(_clusterVerifier.verify(10000));
+
+    // 2. then entering maintenance mode and remove it from topology
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, "NodeSwap", Collections.emptyMap());
+    oldParticipant.syncStop();
+    _participants.remove(oldParticipant);
+    Thread.sleep(2000);
+    _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig);
+
+    // 3. create new participant with same topology
+    String newParticipantName = "RandomParticipant_" + START_PORT;
+    _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
+    InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
+    String zone = instanceConfig.getDomainAsMap().get("zone");
+    String domain = String.format("zone=%s,instance=%s", zone, newParticipantName);
+    newConfig.setDomain(domain);
+    _gSetupTool.getClusterManagementTool()
+        .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
+
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newParticipantName);
+    participant.syncStart();
+    _participants.add(0, participant);
+
+    // 4. exit maintenance mode and rebalance
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
+
+    Thread.sleep(2000);
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    // Since only one node temporary down, the same partitions will be moved to the newly added node.
+    for (String db : _allDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      ExternalView oldEv = record.get(db);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> stateMap = ev.getStateMap(partition);
+        Map<String, String> oldStateMap = oldEv.getStateMap(partition);
+        Assert.assertTrue(oldStateMap != null && stateMap != null);
+        Assert.assertEquals(stateMap.size(), _replica);
+        // Note the WAGED rebalanacer won't ensure the same state, because moving the top states
+        // back to the replaced node might be unnecessary and causing overhead.
+        Set<String> instanceSet = new HashSet<>(stateMap.keySet());
+        if (instanceSet.remove(newParticipantName)) {
+          instanceSet.add(oldParticipantName);
+        }
+        Assert.assertEquals(oldStateMap.keySet(), instanceSet);
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testNodeSwap")
+  public void testFaultZoneSwap() throws Exception {
+    Map<String, ExternalView> record = new HashMap<>();
+    for (String db : _allDBs) {
+      record.put(db,
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db));
+    }
+    ConfigAccessor configAccessor = new ConfigAccessor(_gZkClient);
+
+    // 1. disable a whole fault zone
+    Map<String, InstanceConfig> removedInstanceConfigMap = new HashMap<>();
+    for (MockParticipantManager participant : _participants) {
+      String instanceName = participant.getInstanceName();
+      InstanceConfig instanceConfig =
+          _gSetupTool.getClusterManagementTool().getInstanceConfig(CLUSTER_NAME, instanceName);
+      if (instanceConfig.getDomainAsMap().get("zone").equals("zone-0")) {
+        instanceConfig.setInstanceEnabled(false);
+        _gSetupTool.getClusterManagementTool()
+            .setInstanceConfig(CLUSTER_NAME, instanceName, instanceConfig);
+        removedInstanceConfigMap.put(instanceName, instanceConfig);
+      }
+    }
+    Assert.assertTrue(_clusterVerifier.verify(10000));
+
+    // 2. then entering maintenance mode and remove all the zone-0 nodes from topology
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, true, "NodeSwap", Collections.emptyMap());
+    Iterator<MockParticipantManager> iter = _participants.iterator();
+    while(iter.hasNext()) {
+      MockParticipantManager participant = iter.next();
+      String instanceName = participant.getInstanceName();
+      if (removedInstanceConfigMap.containsKey(instanceName)) {
+        participant.syncStop();
+        iter.remove();
+        Thread.sleep(1000);
+        _gSetupTool.getClusterManagementTool()
+            .dropInstance(CLUSTER_NAME, removedInstanceConfigMap.get(instanceName));
+      }
+    }
+
+    // 3. create new participants with same topology
+    Set<String> newInstanceNames = new HashSet<>();
+    for (int i = 0; i < removedInstanceConfigMap.size(); i++) {
+      String newParticipantName = "NewParticipant_" + (START_PORT + i++);
+      newInstanceNames.add(newParticipantName);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, newParticipantName);
+      InstanceConfig newConfig = configAccessor.getInstanceConfig(CLUSTER_NAME, newParticipantName);
+      String domain = String.format("zone=zone-0,instance=%s", newParticipantName);
+      newConfig.setDomain(domain);
+      _gSetupTool.getClusterManagementTool()
+          .setInstanceConfig(CLUSTER_NAME, newParticipantName, newConfig);
+
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, newParticipantName);
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    // 4. exit maintenance mode and rebalance
+    _gSetupTool.getClusterManagementTool()
+        .manuallyEnableMaintenanceMode(CLUSTER_NAME, false, "NodeSwapDone", Collections.emptyMap());
+
+    Thread.sleep(2000);
+    _clusterVerifier = new StrictMatchExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
+        .setResources(_allDBs).build();
+    Assert.assertTrue(_clusterVerifier.verify(5000));
+
+    for (String db : _allDBs) {
+      ExternalView ev =
+          _gSetupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, db);
+      ExternalView oldEv = record.get(db);
+      for (String partition : ev.getPartitionSet()) {
+        Map<String, String> stateMap = ev.getStateMap(partition);
+        Map<String, String> oldStateMap = oldEv.getStateMap(partition);
+        Assert.assertTrue(oldStateMap != null && stateMap != null);
+        Assert.assertEquals(stateMap.size(), _replica);
+        Set<String> instanceSet = new HashSet<>(stateMap.keySet());
+        instanceSet.removeAll(oldStateMap.keySet());
+        // All the different instances in the new mapping are the newly added instance
+        Assert.assertTrue(newInstanceNames.containsAll(instanceSet));
+        instanceSet = new HashSet<>(oldStateMap.keySet());
+        instanceSet.removeAll(stateMap.keySet());
+        // All the different instances in the old mapping are the removed instance
+        Assert.assertTrue(removedInstanceConfigMap.keySet().containsAll(instanceSet));
+      }
+    }
+  }
+}


[helix] 02/02: Adjust the topology processing logic for instance to ensure backward compatibility.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3ac3b22584e9fb7dda7cd31583881dbd1a99522a
Author: jiajunwang <jj...@linkedin.com>
AuthorDate: Tue Oct 1 13:49:33 2019 -0700

    Adjust the topology processing logic for instance to ensure backward compatibility.
---
 .../rebalancer/waged/model/AssignableNode.java     | 52 +++++++++-------------
 .../rebalancer/waged/model/TestAssignableNode.java | 15 ++++---
 2 files changed, 28 insertions(+), 39 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
index 2a68e15..3bfd225 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/AssignableNode.java
@@ -28,16 +28,14 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.helix.HelixException;
 import org.apache.helix.model.ClusterConfig;
 import org.apache.helix.model.InstanceConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
 /**
  * This class represents a possible allocation of the replication.
  * Note that any usage updates to the AssignableNode are not thread safe.
@@ -138,8 +136,8 @@ public class AssignableNode implements Comparable<AssignableNode> {
     }
 
     Map<String, AssignableReplica> partitionMap = _currentAssignedReplicaMap.get(resourceName);
-    if (!partitionMap.containsKey(partitionName)
-        || !partitionMap.get(partitionName).equals(replica)) {
+    if (!partitionMap.containsKey(partitionName) || !partitionMap.get(partitionName)
+        .equals(replica)) {
       LOG.warn("Replica {} is not assigned to node {}. Ignore the release call.",
           replica.toString(), getInstanceName());
       return;
@@ -269,10 +267,9 @@ public class AssignableNode implements Comparable<AssignableNode> {
    * For example, when
    * the domain is "zone=2, instance=testInstance" and the fault zone type is "zone", this function
    * returns "2".
-   * If cannot find the fault zone id, this function leaves the fault zone id as the instance name.
-   * TODO merge this logic with Topology.java tree building logic.
-   * For now, the WAGED rebalancer has a more strict topology def requirement.
-   * Any missing field will cause an invalid topology config exception.
+   * If cannot find the fault zone type, this function leaves the fault zone id as the instance name.
+   * Note the WAGED rebalancer does not require full topology tree to be created. So this logic is
+   * simpler than the CRUSH based rebalancer.
    */
   private String computeFaultZone(ClusterConfig clusterConfig, InstanceConfig instanceConfig) {
     if (!clusterConfig.isTopologyAwareEnabled()) {
@@ -290,36 +287,27 @@ public class AssignableNode implements Comparable<AssignableNode> {
       return zoneId == null ? instanceConfig.getInstanceName() : zoneId;
     } else {
       // Get the fault zone information from the complete topology definition.
-      String[] topologyDef = topologyStr.trim().split("/");
-      if (topologyDef.length == 0
-          || Arrays.stream(topologyDef).noneMatch(type -> type.equals(faultZoneType))) {
+      String[] topologyKeys = topologyStr.trim().split("/");
+      if (topologyKeys.length == 0 || Arrays.stream(topologyKeys)
+          .noneMatch(type -> type.equals(faultZoneType))) {
         throw new HelixException(
             "The configured topology definition is empty or does not contain the fault zone type.");
       }
 
       Map<String, String> domainAsMap = instanceConfig.getDomainAsMap();
-      if (domainAsMap == null) {
-        throw new HelixException(
-            String.format("The domain configuration of node %s is not configured", _instanceName));
-      } else {
-        StringBuilder faultZoneStringBuilder = new StringBuilder();
-        for (String key : topologyDef) {
-          if (!key.isEmpty()) {
-            if (domainAsMap.containsKey(key)) {
-              faultZoneStringBuilder.append(domainAsMap.get(key));
-              faultZoneStringBuilder.append('/');
-            } else {
-              throw new HelixException(String.format(
-                  "The domain configuration of node %s is not complete. Type %s is not found.",
-                  _instanceName, key));
-            }
-            if (key.equals(faultZoneType)) {
-              break;
-            }
+      StringBuilder faultZoneStringBuilder = new StringBuilder();
+      for (String key : topologyKeys) {
+        if (!key.isEmpty()) {
+          // if a key does not exist in the instance domain config, apply the default domain value.
+          faultZoneStringBuilder.append(domainAsMap.getOrDefault(key, "Default_" + key));
+          if (key.equals(faultZoneType)) {
+            break;
+          } else {
+            faultZoneStringBuilder.append('/');
           }
         }
-        return faultZoneStringBuilder.toString();
       }
+      return faultZoneStringBuilder.toString();
     }
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
index b48587f..e8b010e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/TestAssignableNode.java
@@ -189,24 +189,25 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     assignableNode.assign(duplicateReplica);
   }
 
-  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "The domain configuration of node testInstanceId is not complete. Type DOES_NOT_EXIST is not found.")
+  @Test
   public void testParseFaultZoneNotFound() throws IOException {
     ResourceControllerDataProvider testCache = setupClusterDataCache();
 
     ClusterConfig testClusterConfig = new ClusterConfig("testClusterConfigId");
-    testClusterConfig.setFaultZoneType("DOES_NOT_EXIST");
+    testClusterConfig.setFaultZoneType("zone");
     testClusterConfig.setTopologyAwareEnabled(true);
-    testClusterConfig.setTopology("/DOES_NOT_EXIST/");
+    testClusterConfig.setTopology("/zone/");
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
 
     InstanceConfig testInstanceConfig = new InstanceConfig("testInstanceConfigId");
-    testInstanceConfig.setDomain("zone=2, instance=testInstance");
+    testInstanceConfig.setDomain("instance=testInstance");
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
     instanceConfigMap.put(_testInstanceId, testInstanceConfig);
     when(testCache.getInstanceConfigMap()).thenReturn(instanceConfigMap);
 
-    new AssignableNode(testCache.getClusterConfig(),
+    AssignableNode node = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
+    Assert.assertEquals(node.getFaultZone(), "Default_zone");
   }
 
   @Test
@@ -228,7 +229,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     AssignableNode assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
-    Assert.assertEquals(assignableNode.getFaultZone(), "2/");
+    Assert.assertEquals(assignableNode.getFaultZone(), "2");
 
     testClusterConfig = new ClusterConfig("testClusterConfigId");
     testClusterConfig.setFaultZoneType("instance");
@@ -245,7 +246,7 @@ public class TestAssignableNode extends AbstractTestClusterModel {
     assignableNode = new AssignableNode(testCache.getClusterConfig(),
         testCache.getInstanceConfigMap().get(_testInstanceId), _testInstanceId);
 
-    Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance/");
+    Assert.assertEquals(assignableNode.getFaultZone(), "2/testInstance");
   }
 
   @Test