You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/09 21:13:29 UTC

[helix] branch abnormalResolver updated (f8bc5fd -> 5b4d480)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a change to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git.


    omit f8bc5fd  Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
    omit ac0be01  Add Abnormal States Resolver interface and configuration item. (#1028)
     add 431a096  Add getIdealAssignmentForWagedFullAuto in HelixUtil for WAGED rebalancer (#1031)
     add 9ab4bb4  Clean up lock module dependencies (#1038)
     add dbe2831  Add PropertyStore write endpoint to Helix REST (#1049)
     add 02dfe1a  Fix test testDropInstance (#1053)
     add 6c62cd9  Add more accurate error message for resetPartition  (#1007)
     add b226eb9  Change the error message to be meaningful naming (#1041)
     add a4b87b4  Fix rest tests with prefix
     add 48da168  Add path exists check for customized state (#1033)
     add 00eb678  Avoid adding JobConfig if queue has reached its capacity limit (#1064)
     add 39bf0ad  Deprecate Raw ZkClient in helix-core (#1070)
     add f0d9da6  Trim non cluster topology related changes in the WAGED rebalancer calculation. (#1065)
     new 2974086  Add Abnormal States Resolver interface and configuration item. (#1028)
     new 5b4d480  Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (f8bc5fd)
            \
             N -- N -- N   refs/heads/abnormalResolver (5b4d480)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../helix/controller/GenericHelixController.java   |  10 +-
 .../changedetector/ResourceChangeDetector.java     |   8 +-
 .../changedetector/ResourceChangeSnapshot.java     |  72 +++---
 .../trimmer/ClusterConfigTrimmer.java              |  92 +++++++
 .../trimmer/HelixPropertyTrimmer.java              |  96 ++++++++
 .../changedetector/trimmer/IdealStateTrimmer.java  | 103 ++++++++
 .../trimmer/InstanceConfigTrimmer.java             |  73 ++++++
 .../trimmer/ResourceConfigTrimmer.java             |  81 +++++++
 .../dataproviders/BaseControllerDataProvider.java  |   8 +
 .../rebalancer/waged/ReadOnlyWagedRebalancer.java  |  88 +++++++
 .../apache/helix/manager/zk/CallbackHandler.java   |  94 ++++----
 .../manager/zk/GenericBaseDataAccessorBuilder.java |   4 +-
 .../org/apache/helix/manager/zk/ZKHelixAdmin.java  |  74 +++---
 .../helix/manager/zk/ZkBucketDataAccessor.java     |   5 +-
 .../java/org/apache/helix/manager/zk/ZkClient.java | 200 +++-------------
 .../java/org/apache/helix/task/TaskDriver.java     |  15 ++
 .../BestPossibleExternalViewVerifier.java          |  89 +------
 .../main/java/org/apache/helix/util/HelixUtil.java |  96 ++++++++
 .../java/org/apache/helix/util/RebalanceUtil.java  |  24 ++
 .../helix/util/WeightAwareRebalanceUtil.java       |   2 +-
 .../src/test/java/org/apache/helix/TestHelper.java |   2 +-
 .../changedetector/TestResourceChangeDetector.java |  57 ++++-
 .../trimmer/TestHelixPropoertyTimmer.java          | 264 +++++++++++++++++++++
 .../rebalancer/waged/TestWagedRebalancer.java      |  15 +-
 .../WagedRebalancer/TestWagedRebalance.java        |  50 ++++
 .../helix/integration/task/TaskTestUtil.java       |  11 +-
 .../helix/integration/task/TestEnqueueJobs.java    |  43 ++++
 .../org/apache/helix/manager/zk/TestZKWatch.java   |   1 +
 .../apache/helix/manager/zk/TestZkHelixAdmin.java  |  98 ++++++++
 .../org/apache/helix/tools/TestClusterSetup.java   | 105 ++++----
 helix-lock/helix-lock-1.0.1-SNAPSHOT.ivy           |   3 -
 helix-lock/pom.xml                                 |  25 --
 .../apache/helix/rest/server/ServerContext.java    |  16 +-
 .../rest/server/json/instance/StoppableCheck.java  |   6 +-
 .../rest/server/resources/AbstractResource.java    |   5 +
 .../server/resources/helix/InstancesAccessor.java  |   2 +-
 .../resources/helix/PropertyStoreAccessor.java     |  60 ++++-
 .../helix/rest/server/TestInstancesAccessor.java   |  20 +-
 .../helix/rest/server/TestPerInstanceAccessor.java |   2 +-
 .../rest/server/TestPropertyStoreAccessor.java     |  48 +++-
 .../server/json/instance/TestStoppableCheck.java   |   4 +-
 .../apache/helix/zookeeper/zkclient/ZkClient.java  |  14 +-
 .../apache/helix/zookeeper/impl/TestHelper.java    | 133 +++++++++++
 .../apache/helix/zookeeper/impl/ZkTestBase.java    |   1 +
 .../apache/helix/zookeeper/impl}/ZkTestHelper.java | 101 +++-----
 .../zookeeper/impl/client}/TestRawZkClient.java    |  78 +++---
 46 files changed, 1754 insertions(+), 644 deletions(-)
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/ClusterConfigTrimmer.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/HelixPropertyTrimmer.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/IdealStateTrimmer.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/InstanceConfigTrimmer.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/changedetector/trimmer/ResourceConfigTrimmer.java
 create mode 100644 helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
 create mode 100644 helix-core/src/test/java/org/apache/helix/controller/changedetector/trimmer/TestHelixPropoertyTimmer.java
 create mode 100644 zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/TestHelper.java
 copy {helix-core/src/test/java/org/apache/helix => zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl}/ZkTestHelper.java (82%)
 rename {helix-core/src/test/java/org/apache/helix/manager/zk => zookeeper-api/src/test/java/org/apache/helix/zookeeper/impl/client}/TestRawZkClient.java (93%)


[helix] 01/02: Add Abnormal States Resolver interface and configuration item. (#1028)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 29740864838b7325bc9198861c79b4349207e20e
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu May 28 15:55:33 2020 -0700

    Add Abnormal States Resolver interface and configuration item. (#1028)
    
    The Abnormal States Resolver defines a generic interface to find and recover if the partition has any abnormal current states. For example,
    - double masters
    - application data out of sync
    The interface shall be implemented according to the requirement.
    
    The resolver is applied in the rebalance process according to the corresponding cluster config item. For example,
    "ABNORMAL_STATES_RESOLVER_MAP" : {
     "MASTERSLAVE" : "org.apache.helix.api.rebalancer.constraint.MasterSlaveAbnormalStateReslovler"
    }
    The default behavior without any configuration is not doing any recovery work.
---
 .../constraint/AbnormalStateResolver.java          | 75 ++++++++++++++++++
 .../dataproviders/BaseControllerDataProvider.java  | 45 ++++++++++-
 .../controller/rebalancer/AbstractRebalancer.java  | 89 +++++++++++++++++-----
 .../rebalancer/DelayedAutoRebalancer.java          | 40 +++-------
 .../java/org/apache/helix/model/ClusterConfig.java | 28 ++++++-
 .../rebalancer/TestAbstractRebalancer.java         |  4 +-
 .../rebalancer/TestAutoRebalanceStrategy.java      |  6 +-
 .../rebalancer/TestZeroReplicaAvoidance.java       | 10 ++-
 .../constraint/MockAbnormalStateResolver.java      | 48 ++++++++++++
 .../waged/model/AbstractTestClusterModel.java      |  4 +
 .../rebalancer/TestAbnormalStatesResolver.java     | 67 ++++++++++++++++
 .../org/apache/helix/model/TestClusterConfig.java  | 16 ++++
 12 files changed, 376 insertions(+), 56 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
new file mode 100644
index 0000000..7e9946c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -0,0 +1,75 @@
+package org.apache.helix.api.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A generic interface to find and recover if the partition has abnormal current states.
+ */
+public interface AbnormalStateResolver {
+  /**
+   * A placeholder which will be used when the resolver is not specified.
+   * This is a dummy class that does not really functional.
+   */
+  AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
+    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef) {
+      // By default, all current states are valid.
+      return true;
+    }
+    public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+        final String resourceName, final Partition partition,
+        final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+      throw new UnsupportedOperationException("This resolver won't recover abnormal states.");
+    }
+  };
+
+  /**
+   * Check if the current states of the specified partition is valid.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @return true if the current states of the specified partition is valid.
+   */
+  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef);
+
+  /**
+   * Compute a transient partition state assignment to fix the abnormal.
+   * @param currentStateOutput
+   * @param resourceName
+   * @param partition
+   * @param stateModelDef
+   * @param preferenceList
+   * @return the transient partition state assignment which remove the abnormal states.
+   */
+  Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList);
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index a24ea46..59058ab 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.common.caches.AbstractDataCache;
 import org.apache.helix.common.caches.CurrentStateCache;
 import org.apache.helix.common.caches.InstanceMessagesCache;
@@ -53,6 +55,7 @@ import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.util.HelixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,6 +106,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   private Map<String, Map<String, String>> _idealStateRuleMap;
   private Map<String, Map<String, Set<String>>> _disabledInstanceForPartitionMap = new HashMap<>();
   private Set<String> _disabledInstanceSet = new HashSet<>();
+  private final Map<String, AbnormalStateResolver> _abnormalStateResolverMap = new HashMap<>();
 
   public BaseControllerDataProvider() {
     this(AbstractDataCache.UNKNOWN_CLUSTER, AbstractDataCache.UNKNOWN_PIPELINE);
@@ -225,6 +229,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CLUSTER_CONFIG).getAndSet(false)) {
       _clusterConfig = accessor.getProperty(accessor.keyBuilder().clusterConfig());
       refreshedType.add(HelixConstants.ChangeType.CLUSTER_CONFIG);
+      refreshAbnormalStateResolverMap(_clusterConfig);
     } else {
       LogUtil.logInfo(logger, getClusterEventId(), String.format(
           "No ClusterConfig change for cluster %s, pipeline %s", _clusterName, getPipelineName()));
@@ -372,6 +377,7 @@ public class BaseControllerDataProvider implements ControlContextProvider {
 
   public void setClusterConfig(ClusterConfig clusterConfig) {
     _clusterConfig = clusterConfig;
+    refreshAbnormalStateResolverMap(_clusterConfig);
   }
 
   @Override
@@ -731,6 +737,43 @@ public class BaseControllerDataProvider implements ControlContextProvider {
     _asyncTasksThreadPool = asyncTasksThreadPool;
   }
 
+
+  public AbnormalStateResolver getAbnormalStateResolver(String stateModel) {
+    return _abnormalStateResolverMap
+        .getOrDefault(stateModel, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
+  }
+
+  private void refreshAbnormalStateResolverMap(ClusterConfig clusterConfig) {
+    if (clusterConfig == null) {
+      logger.debug("Skip refreshing abnormal state resolvers because the ClusterConfig is missing");
+      return;
+    }
+    Map<String, String> resolverMap = clusterConfig.getAbnormalStateResolverMap();
+    logger.info("Start loading the abnormal state resolvers with configuration {}", resolverMap);
+    // Remove any resolver configuration that does not exist anymore.
+    _abnormalStateResolverMap.keySet().retainAll(resolverMap.keySet());
+    // Reload the resolver classes into cache based on the configuration.
+    for (String stateModel : resolverMap.keySet()) {
+      String resolverClassName = resolverMap.get(stateModel);
+      if (resolverClassName == null || resolverClassName.isEmpty()) {
+        // skip the empty definition.
+        continue;
+      }
+      if (!resolverClassName.equals(getAbnormalStateResolver(stateModel).getClass().getName())) {
+        try {
+          AbnormalStateResolver resolver = AbnormalStateResolver.class
+              .cast(HelixUtil.loadClass(getClass(), resolverClassName).newInstance());
+          _abnormalStateResolverMap.put(stateModel, resolver);
+        } catch (Exception e) {
+          throw new HelixException(String
+              .format("Failed to instantiate the abnormal state resolver %s for state model %s",
+                  resolverClassName, stateModel));
+        }
+      } // else, nothing to update since the same resolver class has been loaded.
+    }
+    logger.info("Finish loading the abnormal state resolvers {}", _abnormalStateResolverMap);
+  }
+
   public boolean isMaintenanceModeEnabled() {
     return _isMaintenanceModeEnabled;
   }
@@ -768,4 +811,4 @@ public class BaseControllerDataProvider implements ControlContextProvider {
   public String toString() {
     return genCacheContentStringBuilder().toString();
   }
-}
\ No newline at end of file
+}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index e85a2df..6fdd0b6 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -27,11 +27,13 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
@@ -102,7 +104,8 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), stateModelDef,
               preferenceList, currentStateOutput, disabledInstancesForPartition, idealState,
-              cache.getClusterConfig(), partition);
+              cache.getClusterConfig(), partition,
+              cache.getAbnormalStateResolver(stateModelDefName));
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
     return partitionMapping;
@@ -179,44 +182,97 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     return rebalanceStrategy;
   }
 
+  /**
+   * Compute best state for partition in AUTO ideal state mode.
+   * @param liveInstances
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput instance->state for each partition
+   * @param disabledInstancesForPartition
+   * @param idealState
+   * @param clusterConfig
+   * @param partition
+   * @param resolver
+   * @return
+   */
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
 
-    Map<String, String> currentStateMap =
-        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition);
+    Map<String, String> currentStateMap = new HashMap<>(
+        currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
+    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
+        disabledInstancesForPartition);
+  }
 
-    if (currentStateMap == null) {
-      currentStateMap = Collections.emptyMap();
-    }
+  /**
+   * Compute if an overwritten is necessary for the partition assignment in case that the proposed
+   * assignment is not valid or empty.
+   * @param stateModelDef
+   * @param preferenceList
+   * @param currentStateOutput
+   * @param idealState
+   * @param partition
+   * @param resolver
+   * @return An optional object which contains the assignment map if overwritten is necessary.
+   * Otherwise return Optional.empty().
+   */
+  protected Optional<Map<String, String>> computeStatesOverwriteForPartition(
+      final StateModelDefinition stateModelDef, final List<String> preferenceList,
+      final CurrentStateOutput currentStateOutput, IdealState idealState, final Partition partition,
+      final AbnormalStateResolver resolver) {
+    String resourceName = idealState.getResourceName();
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
 
     // (1) If the partition is removed from IS or the IS is deleted.
     // Transit to DROPPED no matter the instance is disabled or not.
     if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
+      return Optional.of(computeBestPossibleMapForDroppedResource(currentStateMap));
     }
 
     // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
     if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
+      return Optional.of(computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef));
     }
 
-    return computeBestPossibleMap(preferenceList, stateModelDef, currentStateMap, liveInstances,
-        disabledInstancesForPartition);
+    // (3) If the current states are not valid, fix the invalid part first.
+    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+      Map<String, String> recoveryAssignment = resolver
+          .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
+              preferenceList);
+      if (recoveryAssignment == null || !recoveryAssignment.keySet()
+          .equals(currentStateMap.keySet())) {
+        throw new HelixException(String.format(
+            "Invalid recovery assignment %s since it changed the current partition placement %s",
+            recoveryAssignment, currentStateMap));
+      }
+      return Optional.of(recoveryAssignment);
+    }
+
+    return Optional.empty();
   }
 
-  protected Map<String, String> computeBestPossibleMapForDroppedResource(Map<String, String> currentStateMap) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDroppedResource(
+      final Map<String, String> currentStateMap) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.toString());
     }
     return bestPossibleStateMap;
   }
 
-  protected Map<String, String> computeBestPossibleMapForDisabledResource(Map<String, String> currentStateMap
-      , StateModelDefinition stateModelDef) {
-    Map<String, String> bestPossibleStateMap = new HashMap<String, String>();
+  protected Map<String, String> computeBestPossibleMapForDisabledResource(
+      final Map<String, String> currentStateMap, StateModelDefinition stateModelDef) {
+    Map<String, String> bestPossibleStateMap = new HashMap<>();
     for (String instance : currentStateMap.keySet()) {
       if (!HelixDefinedState.ERROR.name().equals(currentStateMap.get(instance))) {
         bestPossibleStateMap.put(instance, stateModelDef.getInitialState());
@@ -267,7 +323,6 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
    */
   protected Map<String, String> computeBestPossibleMap(List<String> preferenceList, StateModelDefinition stateModelDef,
       Map<String, String> currentStateMap, Set<String> liveInstances, Set<String> disabledInstancesForPartition) {
-
     Map<String, String> bestPossibleStateMap = new HashMap<>();
 
     // (1) Instances that have current state but not in preference list, drop, no matter it's disabled or not.
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index f4c95a6..f169e07 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -27,10 +27,12 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -263,7 +265,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       Map<String, String> bestStateForPartition =
           computeBestPossibleStateForPartition(liveNodes, stateModelDef, preferenceList,
               currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
-              partition);
+              partition, cache.getAbnormalStateResolver(stateModelDefName));
 
       partitionMapping.addReplicaMap(partition, bestStateForPartition);
     }
@@ -276,39 +278,20 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
     return partitionMapping;
   }
 
-  /**
-   * compute best state for resource in AUTO ideal state mode
-   * @param liveInstances
-   * @param stateModelDef
-   * @param preferenceList
-   * @param currentStateOutput
-   *          : instance->state for each partition
-   * @param disabledInstancesForPartition
-   * @param idealState
-   * @param  clusterConfig
-   * @param  partition
-   * @return
-   */
   @Override
   protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
       StateModelDefinition stateModelDef, List<String> preferenceList,
       CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
-      IdealState idealState, ClusterConfig clusterConfig, Partition partition) {
-
+      IdealState idealState, ClusterConfig clusterConfig, Partition partition,
+      AbnormalStateResolver resolver) {
+    Optional<Map<String, String>> optionalOverwrittenStates =
+        computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
+            idealState, partition, resolver);
+    if (optionalOverwrittenStates.isPresent()) {
+      return optionalOverwrittenStates.get();
+    }
     Map<String, String> currentStateMap = new HashMap<>(
         currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
-
-    // (1) If the partition is removed from IS or the IS is deleted.
-    // Transit to DROPPED no matter the instance is disabled or not.
-    if (preferenceList == null) {
-      return computeBestPossibleMapForDroppedResource(currentStateMap);
-    }
-
-    // (2) If resource disabled altogether, transit to initial-state (e.g. OFFLINE) if it's not in ERROR.
-    if (!idealState.isEnabled()) {
-      return computeBestPossibleMapForDisabledResource(currentStateMap, stateModelDef);
-    }
-
     // Instances not in preference list but still have active replica, retain to avoid zero replica during movement
     List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
     Collections.sort(currentInstances);
@@ -332,7 +315,6 @@ public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceController
       }
     }
 
-
     // Sort the instancesToMove by their current partition state.
     // Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
     // [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
index 165919c..bb0f728 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfig.java
@@ -30,10 +30,10 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixProperty;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.api.config.HelixConfigProperty;
 import org.apache.helix.api.config.StateTransitionThrottleConfig;
 import org.apache.helix.api.config.StateTransitionTimeoutConfig;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 
 /**
  * Cluster configurations
@@ -109,7 +109,13 @@ public class ClusterConfig extends HelixProperty {
     // https://github.com/apache/helix/wiki/Weight-aware-Globally-Evenly-distributed-Rebalancer#rebalance-coordinator
     //
     // Default to be true.
-    GLOBAL_REBALANCE_ASYNC_MODE
+    GLOBAL_REBALANCE_ASYNC_MODE,
+
+    /**
+     * Configure the abnormal partition states resolver classes for the corresponding state model.
+     * <State Model Def Name, Full Path of the Resolver Class Name>
+     */
+    ABNORMAL_STATES_RESOLVER_MAP
   }
 
   public enum GlobalRebalancePreferenceKey {
@@ -852,6 +858,24 @@ public class ClusterConfig extends HelixProperty {
   }
 
   /**
+   * Set the abnormal state resolver class map.
+   */
+  public void setAbnormalStateResolverMap(Map<String, String> resolverMap) {
+    if (resolverMap.values().stream()
+        .anyMatch(className -> className == null || className.isEmpty())) {
+      throw new IllegalArgumentException(
+          "Invalid Abnormal State Resolver Map definition. Class name cannot be empty.");
+    }
+    _record.setMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name(), resolverMap);
+  }
+
+  public Map<String, String> getAbnormalStateResolverMap() {
+    Map<String, String> resolverMap =
+        _record.getMapField(ClusterConfigProperty.ABNORMAL_STATES_RESOLVER_MAP.name());
+    return resolverMap == null ? Collections.EMPTY_MAP : resolverMap;
+  }
+
+  /**
    * Get IdealState rules defined in the cluster config.
    * @return
    */
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 0886768..72bb726 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -52,7 +53,8 @@ public class TestAbstractRebalancer {
         .computeBestPossibleStateForPartition(new HashSet<>(liveInstances),
             BuiltInStateModelDefinitions.valueOf(stateModelName).getStateModelDefinition(),
             preferenceList, currentStateOutput, new HashSet<>(disabledInstancesForPartition),
-            new IdealState("test"), new ClusterConfig("TestCluster"), partition);
+            new IdealState("test"), new ClusterConfig("TestCluster"), partition,
+            AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
index 0b1370e..0d09079 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAutoRebalanceStrategy.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Sets;
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.MockAccessor;
 import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
@@ -52,6 +52,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -243,7 +244,8 @@ public class TestAutoRebalanceStrategy {
         }
         Map<String, String> assignment = new AutoRebalancer()
             .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef,
-                preferenceList, currentStateOutput, disabled, is, clusterConfig, p);
+                preferenceList, currentStateOutput, disabled, is, clusterConfig, p,
+                AbnormalStateResolver.DUMMY_STATE_RESOLVER);
         mapResult.put(partition, assignment);
       }
       return mapResult;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
index 9a5e085..33885ca 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestZeroReplicaAvoidance.java
@@ -32,7 +32,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.stages.BaseStageTest;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
@@ -41,6 +41,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.ObjectReader;
 import org.testng.Assert;
@@ -85,9 +86,10 @@ public class TestZeroReplicaAvoidance extends BaseStageTest {
         }
       }
     }
-    Map<String, String> bestPossibleMap = rebalancer.computeBestPossibleStateForPartition(
-        liveInstances, stateModelDef, instancePreferenceList, currentStateOutput,
-        Collections.emptySet(), is, new ClusterConfig("TestCluster"), partition);
+    Map<String, String> bestPossibleMap = rebalancer
+        .computeBestPossibleStateForPartition(liveInstances, stateModelDef, instancePreferenceList,
+            currentStateOutput, Collections.emptySet(), is, new ClusterConfig("TestCluster"),
+            partition, AbnormalStateResolver.DUMMY_STATE_RESOLVER);
     Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap,
         "Differs, get " + bestPossibleMap + "\nexpected: " + expectedBestPossibleMap
             + "\ncurrentState: " + currentStateMap + "\npreferenceList: " + instancePreferenceList);
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
new file mode 100644
index 0000000..4718921
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -0,0 +1,48 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+
+/**
+ * A mock abnormal state resolver for supporting tests.
+ * It always return dummy result.
+ */
+public class MockAbnormalStateResolver implements AbnormalStateResolver {
+  @Override
+  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef) {
+    // By default, all current states are valid.
+    return true;
+  }
+
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition,
+      final StateModelDefinition stateModelDef, final List<String> preferenceList) {
+    throw new UnsupportedOperationException("The mock resolver won't recover abnormal states.");
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 7f8281b..ca8fd53 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.ClusterConfig;
@@ -39,6 +40,7 @@ import org.apache.helix.model.ResourceConfig;
 import org.mockito.Mockito;
 import org.testng.annotations.BeforeClass;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.when;
 
 public abstract class AbstractTestClusterModel {
@@ -109,6 +111,8 @@ public abstract class AbstractTestClusterModel {
         _capacityDataMap.keySet().stream().collect(Collectors.toMap(key -> key, key -> 0)));
     testClusterConfig.setTopologyAwareEnabled(true);
     when(testCache.getClusterConfig()).thenReturn(testClusterConfig);
+    when(testCache.getAbnormalStateResolver(any()))
+        .thenReturn(AbnormalStateResolver.DUMMY_STATE_RESOLVER);
 
     // 3. Mock the live instance node for the default instance.
     LiveInstance testLiveInstance = createMockLiveInstance(_testInstanceId);
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
new file mode 100644
index 0000000..dde2644
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -0,0 +1,67 @@
+package org.apache.helix.integration.rebalancer;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
+import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.MasterSlaveSMD;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
+  @Test
+  public void testConfigureResolver() {
+    ResourceControllerDataProvider cache = new ResourceControllerDataProvider(CLUSTER_NAME);
+    // Verify the initial setup.
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Update the resolver configuration for MasterSlave state model.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, MockAbnormalStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+
+    cache.requireFullRefresh();
+    cache.refresh(_controller.getHelixDataAccessor());
+    for (String stateModelDefName : cache.getStateModelDefMap().keySet()) {
+      Assert.assertEquals(cache.getAbnormalStateResolver(stateModelDefName).getClass(),
+          stateModelDefName.equals(MasterSlaveSMD.name) ?
+              MockAbnormalStateResolver.class :
+              AbnormalStateResolver.DUMMY_STATE_RESOLVER.getClass());
+    }
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
index 8e4a016..f353293 100644
--- a/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/model/TestClusterConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -256,4 +257,19 @@ public class TestClusterConfig {
         .getBooleanField(ClusterConfig.ClusterConfigProperty.GLOBAL_REBALANCE_ASYNC_MODE.name(),
             false), true);
   }
+
+  @Test
+  public void testAbnormalStatesResolverConfig() {
+    ClusterConfig testConfig = new ClusterConfig("testConfig");
+    // Default value is empty
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+    // Test set
+    Map<String, String> resolverMap = ImmutableMap.of(MasterSlaveSMD.name,
+        MockAbnormalStateResolver.class.getName());
+    testConfig.setAbnormalStateResolverMap(resolverMap);
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), resolverMap);
+    // Test empty the map
+    testConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    Assert.assertEquals(testConfig.getAbnormalStateResolverMap(), Collections.EMPTY_MAP);
+  }
 }


[helix] 02/02: Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch abnormalResolver
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 5b4d480410adf539d4d43d8014676f501dcfc49d
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Jun 4 11:50:23 2020 -0700

    Add ExcessiveTopStateResolver to gracefully fix the double-masters situation. (#1037)
    
    Although the rebalancer will fix the additional master eventually, the default operations are arbitrary and it may cause an older master to survive. This may cause serious application logic issues since many applications require the master to have the latest data.
    With this state resolver, the rebalancer will change the default behavior to reset all the master replicas so as to ensure the remaining one is the youngest one. Then the double-masters situation is gracefully resolved.
---
 .../constraint/AbnormalStateResolver.java          |   4 +-
 .../controller/rebalancer/AbstractRebalancer.java  |   2 +-
 .../constraint/ExcessiveTopStateResolver.java      | 123 +++++++++++++++++++++
 .../constraint/MockAbnormalStateResolver.java      |   2 +-
 .../rebalancer/TestAbnormalStatesResolver.java     | 117 ++++++++++++++++++++
 5 files changed, 244 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
index 7e9946c..1a75e0b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
+++ b/helix-core/src/main/java/org/apache/helix/api/rebalancer/constraint/AbnormalStateResolver.java
@@ -35,7 +35,7 @@ public interface AbnormalStateResolver {
    * This is a dummy class that does not really functional.
    */
   AbnormalStateResolver DUMMY_STATE_RESOLVER = new AbnormalStateResolver() {
-    public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+    public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
         final String resourceName, final Partition partition,
         final StateModelDefinition stateModelDef) {
       // By default, all current states are valid.
@@ -56,7 +56,7 @@ public interface AbnormalStateResolver {
    * @param stateModelDef
    * @return true if the current states of the specified partition is valid.
    */
-  boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef);
 
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 6fdd0b6..a1ca6ec 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -245,7 +245,7 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) If the current states are not valid, fix the invalid part first.
-    if (!resolver.isCurrentStatesValid(currentStateOutput, resourceName, partition, stateModelDef)) {
+    if (!resolver.checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
       Map<String, String> recoveryAssignment = resolver
           .computeRecoveryAssignment(currentStateOutput, resourceName, partition, stateModelDef,
               preferenceList);
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
new file mode 100644
index 0000000..3c1e100
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/constraint/ExcessiveTopStateResolver.java
@@ -0,0 +1,123 @@
+package org.apache.helix.controller.rebalancer.constraint;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The abnormal state resolver that gracefully fixes the abnormality of excessive top states for
+ * single-topstate state model. For example, two replcias of a MasterSlave partition are assigned
+ * with the Master state at the same time. This could be caused by a network partitioning or the
+ * other unexpected issues.
+ *
+ * The resolver checks for the abnormality and computes recovery assignment which triggers the
+ * rebalancer to eventually reset all the top state replias for once. After the resets, only one
+ * replica will be assigned the top state.
+ *
+ * Note that without using this resolver, the regular Helix rebalance pipeline also removes the
+ * excessive top state replicas. However, the default logic does not force resetting ALL the top
+ * state replicas. Since the multiple top states situation may break application data, the default
+ * resolution won't be enough to fix the potential problem.
+ */
+public class ExcessiveTopStateResolver implements AbnormalStateResolver {
+  private static final Logger LOG = LoggerFactory.getLogger(ExcessiveTopStateResolver.class);
+
+  /**
+   * The current states are not valid if there are more than one top state replicas for a single top
+   * state state model.
+   */
+  @Override
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef) {
+    if (!stateModelDef.isSingleTopStateModel()) {
+      return true;
+    }
+    // TODO: Cache top state count in the ResourceControllerDataProvider and avoid repeated counting
+    // TODO: here. It would be premature to do it now. But with more use case, we can improve the
+    // TODO: ResourceControllerDataProvider to calculate by default.
+    if (currentStateOutput.getCurrentStateMap(resourceName, partition).values().stream()
+        .filter(state -> state.equals(stateModelDef.getTopState())).count() > 1) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public Map<String, String> computeRecoveryAssignment(final CurrentStateOutput currentStateOutput,
+      final String resourceName, final Partition partition, StateModelDefinition stateModelDef,
+      List<String> preferenceList) {
+    Map<String, String> currentStateMap =
+        currentStateOutput.getCurrentStateMap(resourceName, partition);
+    if (checkCurrentStates(currentStateOutput, resourceName, partition, stateModelDef)) {
+      // This method should not be triggered when the mapping is valid.
+      // Log the warning for debug purposes.
+      LOG.warn("The input current state map {} is valid, return the original current state.",
+          currentStateMap);
+      return currentStateMap;
+    }
+
+    Map<String, String> recoverMap = new HashMap<>(currentStateMap);
+    String recoveryState = stateModelDef
+        .getNextStateForTransition(stateModelDef.getTopState(), stateModelDef.getInitialState());
+
+    // 1. We have to reset the expected top state replica host if it is hosting the top state
+    // replica. Otherwise, the old master replica with the possible stale data will never be reset
+    // there.
+    if (preferenceList != null && !preferenceList.isEmpty()) {
+      String expectedTopStateHost = preferenceList.get(0);
+      if (recoverMap.get(expectedTopStateHost).equals(stateModelDef.getTopState())) {
+        recoverMap.put(expectedTopStateHost, recoveryState);
+      }
+    }
+
+    // 2. To minimize the impact of the resolution, we want to reserve one top state replica even
+    // during the recovery process.
+    boolean hasReservedTopState = false;
+    for (String instance : recoverMap.keySet()) {
+      if (recoverMap.get(instance).equals(stateModelDef.getTopState())) {
+        if (hasReservedTopState) {
+          recoverMap.put(instance, recoveryState);
+        } else {
+          hasReservedTopState = true;
+        }
+      }
+    }
+    // Here's what we expect to happen next:
+    // 1. The ideal partition assignment is changed to the proposed recovery state. Then the current
+    // rebalance pipeline proceeds. State transition messages will be sent accordingly.
+    // 2. When the next rebalance pipeline starts, the new current state may still contain
+    // abnormality if the participants have not finished state transition yet. Then the resolver
+    // continues to fix the states with the same logic.
+    // 3. Otherwise, if the new current state contains only one top state replica, then we will hand
+    // it over to the regular rebalancer logic. The rebalancer will trigger the state transition to
+    // bring the top state back in the expected allocation.
+    // And the masters with potential stale data will be all reset by then.
+    return recoverMap;
+  }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
index 4718921..00a2f68 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/constraint/MockAbnormalStateResolver.java
@@ -33,7 +33,7 @@ import org.apache.helix.model.StateModelDefinition;
  */
 public class MockAbnormalStateResolver implements AbnormalStateResolver {
   @Override
-  public boolean isCurrentStatesValid(final CurrentStateOutput currentStateOutput,
+  public boolean checkCurrentStates(final CurrentStateOutput currentStateOutput,
       final String resourceName, final Partition partition,
       final StateModelDefinition stateModelDef) {
     // By default, all current states are valid.
diff --git a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
index dde2644..e10645f 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/rebalancer/TestAbnormalStatesResolver.java
@@ -19,16 +19,33 @@ package org.apache.helix.integration.rebalancer;
  * under the License.
  */
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.helix.ConfigAccessor;
+import org.apache.helix.Criteria;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.TestHelper;
 import org.apache.helix.api.rebalancer.constraint.AbnormalStateResolver;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.constraint.ExcessiveTopStateResolver;
 import org.apache.helix.controller.rebalancer.constraint.MockAbnormalStateResolver;
 import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.messaging.AsyncCallback;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.MasterSlaveSMD;
+import org.apache.helix.model.Message;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -64,4 +81,104 @@ public class TestAbnormalStatesResolver extends ZkStandAloneCMTestBase {
     clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
     configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
   }
+
+  @Test(dependsOnMethods = "testConfigureResolver")
+  public void testExcessiveTopStateResolver() {
+    BestPossibleExternalViewVerifier verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+
+    // 1. Find a partition with a MASTER replica and a SLAVE replica
+    HelixAdmin admin = new ZKHelixAdmin.Builder().setZkAddress(ZK_ADDR).build();
+    ExternalView ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    String targetPartition = ev.getPartitionSet().iterator().next();
+    Map<String, String> partitionAssignment = ev.getStateMap(targetPartition);
+    String slaveHost =
+        partitionAssignment.entrySet().stream().filter(entry -> entry.getValue().equals("SLAVE"))
+            .findAny().get().getKey();
+    long previousMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+
+    // Build SLAVE to MASTER message
+    String msgId = new UUID(123, 456).toString();
+    Message msg =
+        createMessage(Message.MessageType.STATE_TRANSITION, msgId, "SLAVE", "MASTER", TEST_DB,
+            slaveHost);
+    msg.setStateModelDef(MasterSlaveSMD.name);
+
+    Criteria cr = new Criteria();
+    cr.setInstanceName(slaveHost);
+    cr.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    cr.setSessionSpecific(true);
+    cr.setPartition(targetPartition);
+    cr.setResource(TEST_DB);
+    cr.setClusterName(CLUSTER_NAME);
+
+    AsyncCallback callback = new AsyncCallback() {
+      @Override
+      public void onTimeOut() {
+        Assert.fail("The test state transition timeout.");
+      }
+
+      @Override
+      public void onReplyMessage(Message message) {
+        Assert.assertEquals(message.getMsgState(), Message.MessageState.READ);
+      }
+    };
+
+    // 2. Send the SLAVE to MASTER message to the SLAVE host to make abnormal partition states.
+
+    // 2.A. Without resolver, the fixing is not completely done by the default rebalancer logic.
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    verifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Since the resolver is not used in the auto default fix process, there is no update on the
+    // original master. So if there is any data issue, it was not fixed.
+    long currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertFalse(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // 2.B. with resolver configured, the fixing is complete.
+    ConfigAccessor configAccessor = new ConfigAccessor.Builder().setZkAddress(ZK_ADDR).build();
+    ClusterConfig clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(
+        ImmutableMap.of(MasterSlaveSMD.name, ExcessiveTopStateResolver.class.getName()));
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    _controller.getMessagingService()
+        .sendAndWait(cr, msg, callback, (int) TestHelper.WAIT_DURATION);
+    // Wait until the partition status is fixed, verify if the result is as expected
+    Assert.assertTrue(verifier.verify());
+    ev = admin.getResourceExternalView(CLUSTER_NAME, TEST_DB);
+    Assert.assertEquals(
+        ev.getStateMap(targetPartition).values().stream().filter(state -> state.equals("MASTER"))
+            .count(), 1);
+    // Now the resolver is used in the auto fix process, the original master has also been refreshed.
+    // The potential data issue has been fixed in this process.
+    currentMasterUpdateTime = getTopStateUpdateTime(ev, targetPartition, "MASTER");
+    Assert.assertTrue(currentMasterUpdateTime > previousMasterUpdateTime);
+
+    // Reset the resolver map
+    clusterConfig = configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.setAbnormalStateResolverMap(Collections.emptyMap());
+    configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  private long getTopStateUpdateTime(ExternalView ev, String partition, String state) {
+    String topStateHost = ev.getStateMap(partition).entrySet().stream()
+        .filter(entry -> entry.getValue().equals(state)).findFirst().get().getKey();
+    MockParticipantManager participant = Arrays.stream(_participants)
+        .filter(instance -> instance.getInstanceName().equals(topStateHost)).findFirst().get();
+
+    HelixDataAccessor accessor = _controller.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    CurrentState currentState = accessor.getProperty(keyBuilder
+        .currentState(participant.getInstanceName(), participant.getSessionId(),
+            ev.getResourceName()));
+    return currentState.getEndTime(partition);
+  }
 }