You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/04 18:50:34 UTC

[helix] branch abnormalResolver updated: Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/abnormalResolver by this push:
     new f8bc5fd  Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
f8bc5fd is described below

commit f8bc5fd15976a9820d92fa5818292aa1f3c13589
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:50:23 2020 -0700

    Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
    
    Although the rebalancer will fix the additional master eventually, the default operations are arbitrary and it may cause an older master to survive. This may cause serious application logic issues since many applications require the master to have the latest data.
    With this state resolver, the rebalancer will change the default behavior to reset all the master replicas so as to ensure the remaining one is the youngest one. Then the double-masters situation is gracefully resolved.
---
 .../constraint/AbnormalStateResolver.java          |   4 +-
 .../controller/rebalancer/AbstractRebalancer.java  |   2 +-
 .../constraint/ExcessiveTopStateResolver.java      | 123 +++++++++++++++++++++
 .../constraint/MockAbnormalStateResolver.java      |   2 +-
 .../rebalancer/TestAbnormalStatesResolver.java     | 117 ++++++++++++++++++++
 5 files changed, 244 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 7e9946c..1a75e0b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -35,7 +35,7 @@ public interface AbnormalStateResolver {
    * This is a dummy class that does not really functional.
    */
   AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+    public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
         final String resourceName, final Partition partition,
         final StateModelDefinition stateModelDef) {
       // By default, all current states are valid.
@@ -56,7 +56,7 @@ public interface AbnormalStateResolver {
    * @param stateModelDef
    * @return true if the current states of the specified partition is valid.
    */
-  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef);
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 6fdd0b6..a1ca6ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -245,7 +245,7 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
       Map<String, String> recoveryAssignment = resolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
               preferenceList);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
new file mode 100644
index 0000000..3c1e100
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The abnormal state resolver that gracefully fixes the abnormality of excessive top states for
+ * single-topstate state model. For example, two replcias of a MasterSlave partition are assigned
+ * with the Master state at the same time. This could be caused by a network partitioning or the
+ * other unexpected issues.
+ *
+ * The resolver checks for the abnormality and computes recovery assignment which triggers the
+ * rebalancer to eventually reset all the top state replias for once. After the resets, only one
+ * replica will be assigned the top state.
+ *
+ * Note that without using this resolver, the regular Helix rebalance pipeline also removes the
+ * excessive top state replicas. However, the default logic does not force resetting ALL the top
+ * state replicas. Since the multiple top states situation may break application data, the default
+ * resolution won't be enough to fix the potential problem.
+ */
+public class ExcessiveTopStateResolver implements AbnormalStateResolver {
+  private static final Logger LOG = LoggerFactory.getLogger(ExcessiveTopStateResolver.class);
+
+  /**
+   * The current states are not valid if there are more than one top state replicas for a single top
+   * state state model.
+   */
+  @Override
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef) {
+    if (!stateModelDef.isSingleTopStateModel()) {
+      return true;
+    }
+    // TODO: Cache top state count in the ResourceControllerDataProvider and avoid repeated counting
+    // TODO: here. It would be premature to do it now. But with more use case, we can improve the
+    // TODO: ResourceControllerDataProvider to calculate by default.
+    if (currentStateOutput.getCurrentStateMap(resourceName, partition).values().stream()
+        .filter(state -> state.equals(stateModelDef.getTopState())).count() > 1) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
+    if (checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
+      // This method should not be triggered when the mapping is valid.
+      // Log the warning for debug purposes.
+      LOG.warn("The input current state map {} is valid, return the original current state.",
+          currentStateMap);
+      return currentStateMap;
+    }
+
+    Map<String, String> recoverMap = new HashMap<>(currentStateMap);
+    String recoveryState = stateModelDef
+        .getNextStateForTransition(stateModelDef.getTopState(), stateModelDef.getInitialState());
+
+    // 1. We have to reset the expected top state replica host if it is hosting the top state
+    // replica. Otherwise, the old master replica with the possible stale data will never be reset
+    // there.
+    if (preferenceList != null && !preferenceList.isEmpty()) {
+      String expectedTopStateHost = preferenceList.get(0);
+      if (recoverMap.get(expectedTopStateHost).equals(stateModelDef.getTopState())) {
+        recoverMap.put(expectedTopStateHost, recoveryState);
+      }
+    }
+
+    // 2. To minimize the impact of the resolution, we want to reserve one top state replica even
+    // during the recovery process.
+    boolean hasReservedTopState = false;
+    for (String instance : recoverMap.keySet()) {
+      if (recoverMap.get(instance).equals(stateModelDef.getTopState())) {
+        if (hasReservedTopState) {
+          recoverMap.put(instance, recoveryState);
+        } else {
+          hasReservedTopState = true;
+        }
+      }
+    }
+    // Here's what we expect to happen next:
+    // 1. The ideal partition assignment is changed to the proposed recovery state. Then the current
+    // rebalance pipeline proceeds. State transition messages will be sent accordingly.
+    // 2. When the next rebalance pipeline starts, the new current state may still contain
+    // abnormality if the participants have not finished state transition yet. Then the resolver
+    // continues to fix the states with the same logic.
+    // 3. Otherwise, if the new current state contains only one top state replica, then we will hand
+    // it over to the regular rebalancer logic. The rebalancer will trigger the state transition to
+    // bring the top state back in the expected allocation.
+    // And the masters with potential stale data will be all reset by then.
+    return recoverMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
index 4718921..00a2f68 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -33,7 +33,7 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public class MockAbnormalStateResolver implements AbnormalStateResolver {
   @Override
-  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef) {
     // By default, all current states are valid.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index dde2644..e10645f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -19,16 +19,33 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,4 +81,104 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
   }
+
+  @Test(dependsOnMethods = "testConfigureResolver")
+  public void testExcessiveTopStateResolver() {
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+
+    // 1. Find a partition with a MASTER replica and a SLAVE replica
+    HelixAdmin admin = new ZKHelixAdmin.Builder().setZkAddress(ZK_ADDR).build();
+    ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    String targetPartition = ev.getPartitionSet().iterator().next();
+    Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
+    String slaveHost =
+        partitionAssignment.entrySet().stream().filter(entry -> entry.getValue().equals("SLAVE"))
+            .findAny().get().getKey();
+    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+
+    // Build SLAVE to MASTER message
+    String msgId = new UUID(123, 456).toString();
+    Message msg =
+        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", "MASTER", TEST_DB,
+            slaveHost);
+    msg.setStateModelDef(MasterSlaveSMD.name);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(slaveHost);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(true);
+    cr.setPartition(targetPartition);
+    cr.setResource(TEST_DB);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback = new AsyncCallback() {
+      @Override
+      public void onTimeOut() {
+        Assert.fail("The test state transition timeout.");
+      }
+
+      @Override
+      public void onReplyMessage(Message message) {
+        Assert.assertEquals(message.getMsgState(), Message.MessageState.READ);
+      }
+    };
+
+    // 2. Send the SLAVE to MASTER message to the SLAVE host to make abnormal partition states.
+
+    // 2.A. Without resolver, the fixing is not completely done by the default rebalancer logic.
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Since the resolver is not used in the auto default fix process, there is no update on the
+    // original master. So if there is any data issue, it was not fixed.
+    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // 2.B. with resolver configured, the fixing is complete.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, ExcessiveTopStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Now the resolver is used in the auto fix process, the original master has also been refreshed.
+    // The potential data issue has been fixed in this process.
+    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  private long getTopStateUpdateTime(ExternalView ev, String partition, String state) {
+    String topStateHost = ev.getStateMap(partition).entrySet().stream()
+        .filter(entry -> entry.getValue().equals(state)).findFirst().get().getKey();
+    MockParticipantManager participant = Arrays.stream(_participants)
+        .filter(instance -> instance.getInstanceName().equals(topStateHost)).findFirst().get();
+
+    HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    CurrentState currentState = accessor.getProperty(keyBuilder
+        .currentState(participant.getInstanceName(), participant.getSessionId(),
+            ev.getResourceName()));
+    return currentState.getEndTime(partition);
+  }
 }