You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2020/06/18 19:57:07 UTC

[helix] 01/01: Revert "Fix the issue that the instance may not be assigned a replica as expected. (#1098)"

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch revert-1098-master
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 3136de21211fcd555be710cfb73e7a9c9d9e7815
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Thu Jun 18 12:56:58 2020 -0700

    Revert "Fix the issue that the instance may not be assigned a replica as expected. (#1098)"
    
    This reverts commit 35857a8cc025a1efb49943b6ab31dc12622bf986.
---
 .../controller/rebalancer/AbstractRebalancer.java  | 157 +++++----------------
 .../helix/model/BuiltInStateModelDefinitions.java  |   1 -
 .../helix/model/OnlineOfflineWithBootstrapSMD.java |  74 ----------
 .../rebalancer/TestAbstractRebalancer.java         |   2 +-
 ...bstractRebalancer.ComputeBestPossibleState.json |  24 ----
 5 files changed, 39 insertions(+), 219 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 10d9976..f118bcb 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -24,15 +24,11 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
@@ -356,137 +352,60 @@ public abstract class AbstractRebalancer<T extends BaseControllerDataProvider> i
     }
 
     // (3) Assign normal states to instances.
-    assignStatesToInstances(preferenceList, stateModelDef, currentStateMap, liveInstances,
-        disabledInstancesForPartition, bestPossibleStateMap);
-
-    return bestPossibleStateMap;
-  }
-
-  /**
-   * Assign the states to the instances listed in the preference list according to inputs.
-   * Note that when we choose the top-state (e.g. MASTER) replica for a partition, we prefer to
-   * choose it from these replicas which are already in the secondary states (e.g, SLAVE) instead
-   * of in lower-state. This is because a replica in secondary state will take shorter time to
-   * transition to the top-state, which could minimize the impact to the application's availability.
-   * To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and
-   * second-states with same priority and rely on the fact that Collections.sort() is stable.
-   */
-  private void assignStatesToInstances(final List<String> preferenceList,
-      final StateModelDefinition stateModelDef, final Map<String, String> currentStateMap,
-      final Set<String> liveInstances, final Set<String> disabledInstancesForPartition,
-      Map<String, String> bestPossibleStateMap) {
-    // Record the assigned instances to avoid double calculating or conflict assignment.
-    Set<String> assignedInstances = new HashSet<>();
-
-    Set<String> liveAndEnabled =
-        liveInstances.stream().filter(instance -> !disabledInstancesForPartition.contains(instance))
-            .collect(Collectors.toSet());
-
-    Queue<String> preferredActiveInstanceQueue = new LinkedList<>(preferenceList);
-    preferredActiveInstanceQueue.retainAll(liveAndEnabled);
-    int totalCandidateCount = preferredActiveInstanceQueue.size();
-
-    // Sort the preferred instances based on replicas' state priority in the current state.
-    // Note that if one instance exists in the current states but not in the preference list, then
-    // it won't show in the prioritized list.
-    List<String> currentStatePrioritizedList = new ArrayList<>(preferredActiveInstanceQueue);
-    currentStatePrioritizedList.sort(new StatePriorityComparator(currentStateMap, stateModelDef));
-    Iterator<String> currentStatePrioritizedInstanceIter = currentStatePrioritizedList.iterator();
-
-    // Assign the states to the instances that appear in the preference list.
-    for (String state : stateModelDef.getStatesPriorityList()) {
+    // When we choose the top-state (e.g. MASTER) replica for a partition, we prefer to choose it from
+    // these replicas which are already in the secondary states (e.g, SLAVE) instead of in lower-state.
+    // This is because a replica in secondary state will take shorter time to transition to the top-state,
+    // which could minimize the impact to the application's availability.
+    // To achieve that, we sort the preferenceList based on CurrentState, by treating top-state and second-states with
+    // same priority and rely on the fact that Collections.sort() is stable.
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    Set<String> assigned = new HashSet<>();
+    Set<String> liveAndEnabled = new HashSet<>(liveInstances);
+    liveAndEnabled.removeAll(disabledInstancesForPartition);
+
+    // Sort the instances based on replicas' state priority in the current state
+    List<String> sortedPreferenceList = new ArrayList<>(preferenceList);
+    sortedPreferenceList.sort(new StatePriorityComparator(currentStateMap, stateModelDef));
+
+    // Assign the state to the instances that appear in the preference list.
+    for (String state : statesPriorityList) {
       int stateCount =
           getStateCount(state, stateModelDef, liveAndEnabled.size(), preferenceList.size());
-      while (!preferredActiveInstanceQueue.isEmpty()) {
+      for (String instance : preferenceList) {
         if (stateCount <= 0) {
           break; // continue assigning for the next state
         }
-        String peekInstance = preferredActiveInstanceQueue.peek();
-        if (assignedInstances.contains(peekInstance)) {
-          preferredActiveInstanceQueue.poll();
+        if (assigned.contains(instance) || !liveAndEnabled.contains(instance)) {
           continue; // continue checking for the next available instance
         }
-        String proposedInstance = adjustInstanceIfNecessary(state, peekInstance,
-            currentStateMap.getOrDefault(peekInstance, stateModelDef.getInitialState()),
-            stateModelDef, assignedInstances, totalCandidateCount - assignedInstances.size(),
-            stateCount, currentStatePrioritizedInstanceIter);
-
-        if (proposedInstance.equals(peekInstance)) {
-          // If the peeked instance is the final decision, then poll it from the queue.
-          preferredActiveInstanceQueue.poll();
+        String proposedInstance = instance;
+        // Additional check and alternate the assignment for reducing top state handoff.
+        if (state.equals(stateModelDef.getTopState()) && !stateModelDef.getSecondTopStates()
+            .contains(currentStateMap.getOrDefault(instance, stateModelDef.getInitialState()))) {
+          // If the desired state is the top state, but the instance cannot be transited to the
+          // top state in one hop, try to keep the top state on current host or a host with a closer
+          // state.
+          for (String currentStatePrioritizedInstance : sortedPreferenceList) {
+            if (!assigned.contains(currentStatePrioritizedInstance) && liveAndEnabled
+                .contains(currentStatePrioritizedInstance)) {
+              proposedInstance = currentStatePrioritizedInstance;
+              break;
+            }
+          }
+          // Note that if all the current top state instances are not assignable, then we fallback
+          // to the default logic that assigning the state according to preference list order.
         }
-        // else, if we found a different instance for the partition placement, then we need to
-        // check the same instance again or it will not be assigned with any partitions.
-
-        // Assign the desired state to the proposed instance if not on ERROR state.
+        // Assign the desired state to the proposed instance
         if (HelixDefinedState.ERROR.toString().equals(currentStateMap.get(proposedInstance))) {
           bestPossibleStateMap.put(proposedInstance, HelixDefinedState.ERROR.toString());
         } else {
           bestPossibleStateMap.put(proposedInstance, state);
           stateCount--;
         }
-        // Note that in either case, the proposed instance is considered to be assigned with a state
-        // by now.
-        if (!assignedInstances.add(proposedInstance)) {
-          throw new AssertionError(String
-              .format("The proposed instance %s has been already assigned before.",
-                  proposedInstance));
-        }
+        assigned.add(proposedInstance);
       }
     }
-  }
-
-  /**
-   * If the proposed instance may cause unnecessary state transition (according to the current
-   * state), check and return a alternative instance to avoid.
-   *
-   * @param requestedState                      The requested state.
-   * @param proposedInstance                    The current proposed instance to host the replica
-   *                                            with the specified state.
-   * @param currentState                        The current state of the proposed Instance, or init
-   *                                            state if the proposed instance does not have an
-   *                                            assignment.
-   * @param stateModelDef
-   * @param assignedInstances
-   * @param remainCandidateCount                The count of the remaining unassigned instances
-   * @param remainRequestCount                  The count of the remaining replicas that need to be
-   *                                            assigned with the given state.
-   * @param currentStatePrioritizedInstanceIter The iterator of the prioritized instance list which
-   *                                            can be used to find a better alternative instance.
-   * @return The alternative instance, or the original proposed instance if adjustment is not
-   * necessary.
-   */
-  private String adjustInstanceIfNecessary(String requestedState, String proposedInstance,
-      String currentState, StateModelDefinition stateModelDef, Set<String> assignedInstances,
-      int remainCandidateCount, int remainRequestCount,
-      Iterator<String> currentStatePrioritizedInstanceIter) {
-    String adjustedInstance = proposedInstance;
-    // Check and alternate the assignment for reducing top state handoff.
-    // 1. If the requested state is not the top state, then it does not worth it to adjust.
-    // 2. If all remaining candidates need to be assigned with the the state, then there is no need
-    // to adjust.
-    // 3. If the proposed instance already has the top state or a secondary state, then adjustment
-    // is not necessary.
-    if (remainRequestCount < remainCandidateCount && requestedState
-        .equals(stateModelDef.getTopState()) && !requestedState.equals(currentState)
-        && !stateModelDef.getSecondTopStates().contains(currentState)) {
-      // If the desired state is the top state, but the instance cannot be transited to the
-      // top state in one hop, try to keep the top state on current host or a host with a closer
-      // state.
-      while (currentStatePrioritizedInstanceIter.hasNext()) {
-        // Note that it is safe to check the prioritized instance items only once here.
-        // Since the only possible condition when we don't use an instance in this list is that
-        // it has been assigned with a state. And this is not revertable.
-        String currentStatePrioritizedInstance = currentStatePrioritizedInstanceIter.next();
-        if (!assignedInstances.contains(currentStatePrioritizedInstance)) {
-          adjustedInstance = currentStatePrioritizedInstance;
-          break;
-        }
-      }
-      // Note that if all the current top state instances are not assignable, then we fallback
-      // to the default logic that assigning the state according to preference list order.
-    }
-    return adjustedInstance;
+    return bestPossibleStateMap;
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/model/BuiltInStateModelDefinitions.java b/helix-core/src/main/java/org/apache/helix/model/BuiltInStateModelDefinitions.java
index f42c048..e060c67 100644
--- a/helix-core/src/main/java/org/apache/helix/model/BuiltInStateModelDefinitions.java
+++ b/helix-core/src/main/java/org/apache/helix/model/BuiltInStateModelDefinitions.java
@@ -27,7 +27,6 @@ public enum BuiltInStateModelDefinitions {
   LeaderStandby(new LeaderStandbySMD()),
   StorageSchemata(new StorageSchemataSMD()),
   OnlineOffline(new OnlineOfflineSMD()),
-  OnlineOfflineWithBootstrap(OnlineOfflineWithBootstrapSMD.build()),
   ScheduledTask(new ScheduledTaskSMD()),
   Task(new TaskSMD());
 
diff --git a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java b/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
deleted file mode 100644
index 97a2f45..0000000
--- a/helix-core/src/main/java/org/apache/helix/model/OnlineOfflineWithBootstrapSMD.java
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.helix.model;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.zookeeper.datamodel.ZNRecord;
-
-/**
- * Helix built-in state model definition based on Online-Offline but with the additional bootstrap
- * state.
- */
-public final class OnlineOfflineWithBootstrapSMD extends StateModelDefinition {
-  public static final String name = "OnlineOfflineWithBootstrap";
-
-  /**
-   * Instantiate from a pre-populated record
-   *
-   * @param record ZNRecord representing a state model definition
-   */
-  private OnlineOfflineWithBootstrapSMD(ZNRecord record) {
-    super(record);
-  }
-
-  public enum States {
-    ONLINE, BOOTSTRAP, OFFLINE
-  }
-
-  /**
-   * Build OnlineOfflineWithBootstrap state model definition
-   *
-   * @return
-   */
-  public static OnlineOfflineWithBootstrapSMD build() {
-    Builder builder = new Builder(name);
-    // init state
-    builder.initialState(States.OFFLINE.name());
-
-    // add states
-    builder.addState(States.ONLINE.name(), 0);
-    builder.addState(States.BOOTSTRAP.name(), 1);
-    builder.addState(States.OFFLINE.name(), 2);
-    for (HelixDefinedState state : HelixDefinedState.values()) {
-      builder.addState(state.name());
-    }
-
-    // add transitions
-    builder.addTransition(States.ONLINE.name(), States.OFFLINE.name(), 0);
-    builder.addTransition(States.OFFLINE.name(), States.BOOTSTRAP.name(), 1);
-    builder.addTransition(States.BOOTSTRAP.name(), States.ONLINE.name(), 2);
-    builder.addTransition(States.OFFLINE.name(), HelixDefinedState.DROPPED.name());
-
-    // bounds
-    builder.dynamicUpperBound(States.ONLINE.name(), "R");
-
-    return new OnlineOfflineWithBootstrapSMD(builder.build().getRecord());
-  }
-}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
index 513fd2c..6ecaa30 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/TestAbstractRebalancer.java
@@ -56,7 +56,7 @@ public class TestAbstractRebalancer {
             new IdealState("test"), new ClusterConfig("TestCluster"), partition,
             MonitoredAbnormalResolver.DUMMY_STATE_RESOLVER);
 
-    Assert.assertTrue(bestPossibleMap.equals(expectedBestPossibleMap));
+    Assert.assertEquals(bestPossibleMap, expectedBestPossibleMap);
   }
 
   @DataProvider(name = "TestComputeBestPossibleStateInput")
diff --git a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
index 3659942..8aac5c8 100644
--- a/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
+++ b/helix-core/src/test/resources/TestAbstractRebalancer.ComputeBestPossibleState.json
@@ -198,29 +198,5 @@
       "node_3": "DROPPED",
       "node_4": "SLAVE"
     }
-  }, {
-    "comment": "For a multiple top state state model with complicated middle states, the rebalancer correctly handles the top states and assigns all instances with the expected state.",
-    "stateModel": "OnlineOfflineWithBootstrap",
-    "liveInstances": [
-      "node_1",
-      "node_2",
-      "node_3"
-    ],
-    "preferenceList": [
-      "node_3",
-      "node_2",
-      "node_1"
-    ],
-    "currentStateMap": {
-      "node_1": "OFFLINE",
-      "node_2": "ONLINE",
-      "node_3": "ONLINE"
-    },
-    "disabledInstancesForPartition": [],
-    "expectedBestPossibleStateMap": {
-      "node_1": "ONLINE",
-      "node_2": "ONLINE",
-      "node_3": "ONLINE"
-    }
   }
 ]