You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2019/08/15 01:52:03 UTC

[GitHub] [helix] narendly commented on a change in pull request #392: Implement Cluster Model Provider.

narendly commented on a change in pull request #392: Implement Cluster Model Provider.
URL: https://github.com/apache/helix/pull/392#discussion_r314149772
 
 

 ##########
 File path: helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
 ##########
 @@ -0,0 +1,247 @@
+package org.apache.helix.controller.rebalancer.waged.model;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ClusterDataDetector;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This util class generates Cluster Model object based on the controller's data cache.
+ */
+public class ClusterModelProvider {
+
+  /**
+   * @param dataProvider           The controller's data cache.
+   * @param resourceMap            The full list of the resources to be rebalanced. Note that any
+   *                               resources that are not in this list will be removed from the
+   *                               final assignment.
+   * @param activeInstances        The active instances that will be used in the calculation.
+   *                               Note this list can be different from the real active node list
+   *                               according to the rebalancer logic.
+   * @param clusterChanges         All the cluster changes that happened after the previous rebalance.
+   * @param baselineAssignment     The persisted Baseline assignment.
+   * @param bestPossibleAssignment The persisted Best Possible assignment that was generated in the
+   *                               previous rebalance.
+   * @return Generate a new Cluster Model object according to the current cluster status.
+   */
+  public static ClusterModel generateClusterModel(ResourceControllerDataProvider dataProvider,
+      Map<String, Resource> resourceMap, Set<String> activeInstances,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges,
+      Map<String, ResourceAssignment> baselineAssignment,
+      Map<String, ResourceAssignment> bestPossibleAssignment) {
+    // Generate replica objects for all the resource partitions.
+    // <resource, replica set>
+    Map<String, Set<AssignableReplica>> replicaMap =
+        parseAllReplicas(dataProvider, resourceMap, activeInstances.size());
+
+    // Check if the replicas need to be reassigned.
+    Map<String, Set<AssignableReplica>> allocatedReplicas =
+        new HashMap<>(); // <instanceName, replica set>
+    Set<AssignableReplica> toBeAssignedReplicas =
+        findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
+            bestPossibleAssignment, allocatedReplicas);
+
+    // Construct all the assignable nodes and initialize with the allocated replicas.
+    Set<AssignableNode> assignableNodes =
+        parseAllNodes(dataProvider.getClusterConfig(), dataProvider.getInstanceConfigMap(),
+            activeInstances, allocatedReplicas);
+
+    // Construct and initialize cluster context.
+    ClusterContext context = new ClusterContext(
+        replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()),
+        activeInstances.size());
+    // Initial the cluster context with the allocated assignments.
+    context.setAssignmentForFaultZoneMap(mapAssignmentToFaultZone(assignableNodes));
+
+    return new ClusterModel(context, toBeAssignedReplicas, assignableNodes, baselineAssignment,
+        bestPossibleAssignment);
+  }
+
+  /**
+   * Find the minimum set of replicas that need to be reassigned.
+   * A replica needs to be reassigned if one of the following condition is true:
+   * 1. Cluster topology (the cluster config / any instance config) has been updated.
+   * 2. The baseline assignment has been updated.
+   * 3. The resource config has been updated.
+   * 4. The resource idealstate has been updated. TODO remove this condition when all resource configurations are migrated to resource config.
+   * 5. If the current best possible assignment does not contain the partition's valid assignment.
+   *
+   * @param replicaMap             A map contains all the replicas grouped by resource name.
+   * @param clusterChanges         A map contains all the important metadata updates that happened after the previous rebalance.
+   * @param activeInstances        All the instances that are alive and enabled.
+   * @param bestPossibleAssignment The current best possible assignment.
+   * @param allocatedReplicas      Return the allocated replicas grouped by the target instance name.
+   * @return The replicas that need to be reassigned.
+   */
+  private static Set<AssignableReplica> findToBeAssignedReplicas(
+      Map<String, Set<AssignableReplica>> replicaMap,
+      Map<ClusterDataDetector.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
+      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+    if (clusterChanges.containsKey(ClusterDataDetector.ChangeType.ClusterConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.InstanceConfigChange)
+        || clusterChanges.containsKey(ClusterDataDetector.ChangeType.BaselineAssignmentChange)) {
+      // If the cluster topology or baseline assignment has been modified, need to reassign all replicas
+      toBeAssignedReplicas
+          .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
+    } else {
+      // check each resource to identify the allocated replicas and to-be-assigned replicas.
+      for (String resourceName : replicaMap.keySet()) {
+        Set<AssignableReplica> replicas = replicaMap.get(resourceName);
+        // 1. if the resource config/idealstate is changed, need to reassign.
+        // 2. if the resource does appear in the best possible assignment, need to reassign.
+        if (clusterChanges.getOrDefault(ClusterDataDetector.ChangeType.ResourceConfigChange,
+            Collections.emptySet()).contains(resourceName) || clusterChanges
+            .getOrDefault(ClusterDataDetector.ChangeType.ResourceIdealStatesChange,
+                Collections.emptySet()).contains(resourceName) || !bestPossibleAssignment
+            .containsKey(resourceName)) {
+          toBeAssignedReplicas.addAll(replicas);
+          continue; // go to check next resource
+        } else {
+          // check for every best possible assignments to identify if the related replicas need to reassign.
+          ResourceAssignment assignment = bestPossibleAssignment.get(resourceName);
+          // <partition, <instance, state>>
+          Map<String, Map<String, String>> stateMap = assignment.getMappedPartitions().stream()
+              .collect(Collectors.toMap(partition -> partition.getPartitionName(),
+                  partition -> new HashMap<>(assignment.getReplicaMap(partition))));
+          for (AssignableReplica replica : replicas) {
+            // Find any ACTIVE instance allocation that has the same state with the replica
+            Optional<Map.Entry<String, String>> instanceNameOptional =
+                stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap()).entrySet()
+                    .stream().filter(instanceStateMap ->
+                    instanceStateMap.getValue().equals(replica.getReplicaState()) && activeInstances
+                        .contains(instanceStateMap.getKey())).findAny();
+            // 3. if no such an instance in the bestPossible assignment, need to reassign the replica
+            if (!instanceNameOptional.isPresent()) {
+              toBeAssignedReplicas.add(replica);
+              continue; // go to check the next replica
+            } else {
+              String instanceName = instanceNameOptional.get().getKey();
+              // * cleanup the best possible state map record,
+              // * so the selected instance won't be picked up again for the another replica check
+              stateMap.getOrDefault(replica.getPartitionName(), Collections.emptyMap())
+                  .remove(instanceName);
+              // the current best possible assignment for this replica is valid,
+              // add to the allocated replica list.
+              allocatedReplicas.computeIfAbsent(instanceName, key -> new HashSet<>()).add(replica);
+            }
+          }
+        }
+      }
+    }
+    return toBeAssignedReplicas;
+  }
+
+  /**
+   * Parse all the nodes that can be assigned replicas based on the configurations.
+   *
+   * @param clusterConfig     The cluster configuration.
+   * @param instanceConfigMap A map of all the instance configuration.
+   * @param activeInstances   All the instances that are online and enabled.
+   * @param allocatedReplicas A map of all the assigned replicas, which will not be reassigned during the rebalance.
+   * @return A map of assignable node set, <InstanceName, node set>.
+   */
+  private static Set<AssignableNode> parseAllNodes(ClusterConfig clusterConfig,
+      Map<String, InstanceConfig> instanceConfigMap, Set<String> activeInstances,
+      Map<String, Set<AssignableReplica>> allocatedReplicas) {
+    return activeInstances.stream().map(
+        instanceName -> new AssignableNode(clusterConfig, instanceConfigMap.get(instanceName),
+            instanceName, allocatedReplicas.getOrDefault(instanceName, Collections.emptySet())))
+        .collect(Collectors.toSet());
+  }
+
+  /**
+   * Parse all the replicas that need to be reallocated from the cluster data cache.
+   *
+   * @param dataProvider The cluster status cache that contains the current cluster status.
+   * @param resourceMap  All the valid resources that are managed by the rebalancer.
+   * @return A map of assignable replica set, <ResourceName, replica set>.
+   */
+  private static Map<String, Set<AssignableReplica>> parseAllReplicas(
+      ResourceControllerDataProvider dataProvider, Map<String, Resource> resourceMap,
+      int instanceCount) {
+    Map<String, Set<AssignableReplica>> totalReplicaMap = new HashMap<>();
+
+    for (String resourceName : resourceMap.keySet()) {
+      ResourceConfig config = dataProvider.getResourceConfig(resourceName);
+      IdealState is = dataProvider.getIdealState(resourceName);
+      if (is == null) {
+        throw new HelixException(
+            "Cannot find the resource ideal state for resource: " + resourceName);
+      }
+      String defName = is.getStateModelDefRef();
+      StateModelDefinition def = dataProvider.getStateModelDef(defName);
+      if (def == null) {
+        throw new IllegalArgumentException(String
+            .format("Cannot find state model definition %s for resource %s.",
+                is.getStateModelDefRef(), resourceName));
+      }
+
+      Map<String, Integer> stateCountMap =
+          def.getStateCountMap(instanceCount, is.getReplicaCount(instanceCount));
+
+      for (String partition : is.getPartitionSet()) {
+        for (Map.Entry<String, Integer> entry : stateCountMap.entrySet()) {
+          String state = entry.getKey();
+          for (int i = 0; i < entry.getValue(); i++) {
+            totalReplicaMap.computeIfAbsent(resourceName, key -> new HashSet<>()).add(
+                new AssignableReplica(config, partition, state,
+                    def.getStatePriorityMap().get(state)));
+          }
+        }
+      }
+    }
+    return totalReplicaMap;
+  }
+
+  /**
+   * @return A map contains the assignments for each fault zone. <fault zone, <resource, set of partitions>>
 
 Review comment:
   A map containing the assignments..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org