You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:22 UTC

[helix] 44/50: Refine the rebalance scope calculating logic in the WAGED rebalancer. (#519)

This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git

commit f8ab3428bf8ed9cbba17274f89eeb3799f55ca2c
Author: Jiajun Wang <18...@users.noreply.github.com>
AuthorDate: Fri Oct 25 12:21:11 2019 -0700

    Refine the rebalance scope calculating logic in the WAGED rebalancer. (#519)
    
    * Refine the rebalane scope calculating logic in the WAGED rebalancer.
    
    1. Ignore the IdealState mapping/listing fields if the resource is in FULL_AUTO mode.
    2. On IdealState change, the resource shall be fully rebalanced since some filter conditions might be changed. Such as instance tag.
    3. Live instance change (node newly connected) shall trigger full rebalance so partitions will be re-assigned to the new node.
    4. Modify the related test cases.
    5. Adding an option to the change detector so if it is used elsewhere, the caller has an option to listen to any change.
---
 .../changedetector/ResourceChangeDetector.java     | 22 +++++----
 .../changedetector/ResourceChangeSnapshot.java     | 53 ++++++++++++++++++----
 .../rebalancer/waged/WagedRebalancer.java          |  5 +-
 .../waged/model/ClusterModelProvider.java          | 30 ++++++++----
 .../changedetector/TestResourceChangeDetector.java | 45 ++++++++++++++++++
 .../rebalancer/waged/TestWagedRebalancer.java      | 16 +++----
 .../waged/model/AbstractTestClusterModel.java      |  4 ++
 7 files changed, 138 insertions(+), 37 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
index 1df61f8..8402efd 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java
@@ -19,6 +19,12 @@ package org.apache.helix.controller.changedetector;
  * under the License.
  */
 
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
 import com.google.common.collect.Sets;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixProperty;
@@ -27,12 +33,6 @@ import org.apache.helix.model.ClusterConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-
 /**
  * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from
  * Helix's main resource pipeline cache (DataProvider) and the computation results of change
@@ -42,6 +42,7 @@ import java.util.Map;
 public class ResourceChangeDetector implements ChangeDetector {
   private static final Logger LOG = LoggerFactory.getLogger(ResourceChangeDetector.class.getName());
 
+  private final boolean _ignoreControllerGeneratedFields;
   private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run
   private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run
 
@@ -50,8 +51,13 @@ public class ResourceChangeDetector implements ChangeDetector {
   private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>();
   private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>();
 
-  public ResourceChangeDetector() {
+  public ResourceChangeDetector(boolean ignoreControllerGeneratedFields) {
     _newSnapshot = new ResourceChangeSnapshot();
+    _ignoreControllerGeneratedFields = ignoreControllerGeneratedFields;
+  }
+
+  public ResourceChangeDetector() {
+    this(false);
   }
 
   /**
@@ -135,7 +141,7 @@ public class ResourceChangeDetector implements ChangeDetector {
   public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) {
     // If there are changes, update internal states
     _oldSnapshot = new ResourceChangeSnapshot(_newSnapshot);
-    _newSnapshot = new ResourceChangeSnapshot(dataProvider);
+    _newSnapshot = new ResourceChangeSnapshot(dataProvider, _ignoreControllerGeneratedFields);
     dataProvider.clearRefreshedChangeTypes();
 
     // Invalidate cached computation
diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
index 6351eb9..c965124 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.changedetector;
  * under the License.
  */
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -67,11 +68,21 @@ class ResourceChangeSnapshot {
    * Constructor using controller cache (ResourceControllerDataProvider).
    *
    * @param dataProvider
+   * @param ignoreControllerGeneratedFields if true, the snapshot won't record any changes that is
+   *                                        being modified by the controller.
    */
-  ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) {
+  ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider,
+      boolean ignoreControllerGeneratedFields) {
     _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes());
     _instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap());
     _idealStateMap = new HashMap<>(dataProvider.getIdealStates());
+    if (ignoreControllerGeneratedFields && (
+        dataProvider.getClusterConfig().isPersistBestPossibleAssignment() || dataProvider
+            .getClusterConfig().isPersistIntermediateAssignment())) {
+      for (String resourceName : _idealStateMap.keySet()) {
+        _idealStateMap.put(resourceName, trimIdealState(_idealStateMap.get(resourceName)));
+      }
+    }
     _resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap());
     _liveInstances = new HashMap<>(dataProvider.getLiveInstances());
     _clusterConfig = dataProvider.getClusterConfig();
@@ -79,15 +90,15 @@ class ResourceChangeSnapshot {
 
   /**
    * Copy constructor for ResourceChangeCache.
-   * @param cache
+   * @param snapshot
    */
-  ResourceChangeSnapshot(ResourceChangeSnapshot cache) {
-    _changedTypes = new HashSet<>(cache._changedTypes);
-    _instanceConfigMap = new HashMap<>(cache._instanceConfigMap);
-    _idealStateMap = new HashMap<>(cache._idealStateMap);
-    _resourceConfigMap = new HashMap<>(cache._resourceConfigMap);
-    _liveInstances = new HashMap<>(cache._liveInstances);
-    _clusterConfig = cache._clusterConfig;
+  ResourceChangeSnapshot(ResourceChangeSnapshot snapshot) {
+    _changedTypes = new HashSet<>(snapshot._changedTypes);
+    _instanceConfigMap = new HashMap<>(snapshot._instanceConfigMap);
+    _idealStateMap = new HashMap<>(snapshot._idealStateMap);
+    _resourceConfigMap = new HashMap<>(snapshot._resourceConfigMap);
+    _liveInstances = new HashMap<>(snapshot._liveInstances);
+    _clusterConfig = snapshot._clusterConfig;
   }
 
   Set<HelixConstants.ChangeType> getChangedTypes() {
@@ -113,4 +124,28 @@ class ResourceChangeSnapshot {
   ClusterConfig getClusterConfig() {
     return _clusterConfig;
   }
+
+  // Trim the IdealState to exclude any controller modified information.
+  private IdealState trimIdealState(IdealState originalIdealState) {
+    // Clone the IdealState to avoid modifying the objects in the Cluster Data Cache, which might
+    // be used by the other stages in the pipeline.
+    IdealState trimmedIdealState = new IdealState(originalIdealState.getRecord());
+    switch (originalIdealState.getRebalanceMode()) {
+      case FULL_AUTO:
+        // For FULL_AUTO resources, both map fields and list fields are not considered as data input
+        // for the controller. The controller will write to these two types of fields for persisting
+        // the assignment mapping.
+        trimmedIdealState.getRecord().setListFields(Collections.emptyMap());
+        trimmedIdealState.getRecord().setMapFields(Collections.emptyMap());
+        break;
+      case SEMI_AUTO:
+        // For SEMI_AUTO resources, map fields are not considered as data input for the controller.
+        // The controller will write to the map fields for persisting the assignment mapping.
+        trimmedIdealState.getRecord().setMapFields(Collections.emptyMap());
+        break;
+      default:
+        break;
+    }
+    return trimmedIdealState;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 5b2573f..b455992 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -71,6 +71,7 @@ public class WagedRebalancer {
   // contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
   private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
       ImmutableSet.of(HelixConstants.ChangeType.RESOURCE_CONFIG,
+          HelixConstants.ChangeType.IDEAL_STATE,
           HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
   // The cluster change detector is a stateful object.
   // Make it static to avoid unnecessary reinitialization.
@@ -256,8 +257,6 @@ public class WagedRebalancer {
     if (clusterChanges.keySet().stream()
         .anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
       refreshBaseline(clusterData, clusterChanges, resourceMap, currentStateOutput);
-      // Inject a cluster config change for large scale partial rebalance once the baseline changed.
-      clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
     }
 
     Set<String> activeNodes = DelayedRebalanceUtil.getActiveNodes(clusterData.getAllInstances(),
@@ -470,7 +469,7 @@ public class WagedRebalancer {
 
   private ResourceChangeDetector getChangeDetector() {
     if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
-      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
+      CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector(true));
     }
     return CHANGE_DETECTOR_THREAD_LOCAL.get();
   }
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
index 276b998..4c32f4c 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/model/ClusterModelProvider.java
@@ -77,7 +77,7 @@ public class ClusterModelProvider {
         new HashMap<>(); // <instanceName, replica set>
     Set<AssignableReplica> toBeAssignedReplicas =
         findToBeAssignedReplicas(replicaMap, clusterChanges, activeInstances,
-            bestPossibleAssignment, allocatedReplicas);
+            dataProvider.getLiveInstances().keySet(), bestPossibleAssignment, allocatedReplicas);
 
     // Update the allocated replicas to the assignable nodes.
     assignableNodes.stream().forEach(node -> node.assignInitBatch(
@@ -97,14 +97,13 @@ public class ClusterModelProvider {
    * Find the minimum set of replicas that need to be reassigned.
    * A replica needs to be reassigned if one of the following condition is true:
    * 1. Cluster topology (the cluster config / any instance config) has been updated.
-   * 2. The baseline assignment has been updated.
-   * 3. The resource config has been updated.
-   * 4. The resource idealstate has been updated. TODO remove this condition when all resource configurations are migrated to resource config.
-   * 5. If the current best possible assignment does not contain the partition's valid assignment.
+   * 2. The resource config has been updated.
+   * 3. If the current best possible assignment does not contain the partition's valid assignment.
    *
    * @param replicaMap             A map contains all the replicas grouped by resource name.
    * @param clusterChanges         A map contains all the important metadata updates that happened after the previous rebalance.
-   * @param activeInstances        All the instances that are alive and enabled.
+   * @param activeInstances        All the instances that are live and enabled according to the delay rebalance configuration.
+   * @param liveInstances          All the instances that are live.
    * @param bestPossibleAssignment The current best possible assignment.
    * @param allocatedReplicas      Return the allocated replicas grouped by the target instance name.
    * @return The replicas that need to be reassigned.
@@ -112,12 +111,25 @@ public class ClusterModelProvider {
   private static Set<AssignableReplica> findToBeAssignedReplicas(
       Map<String, Set<AssignableReplica>> replicaMap,
       Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Set<String> activeInstances,
-      Map<String, ResourceAssignment> bestPossibleAssignment,
+      Set<String> liveInstances, Map<String, ResourceAssignment> bestPossibleAssignment,
       Map<String, Set<AssignableReplica>> allocatedReplicas) {
     Set<AssignableReplica> toBeAssignedReplicas = new HashSet<>();
+
+    // A newly connected node = A new LiveInstance znode (or session Id updated) & the
+    // corresponding instance is live.
+    // TODO: The assumption here is that if the LiveInstance znode is created or it's session Id is
+    // TODO: updated, we need to call algorithm for moving some partitions to this new node.
+    // TODO: However, if the liveInstance znode is changed because of some other reason, it will be
+    // TODO: treated as a newly connected nodes. We need to find a better way to identify which one
+    // TODO: is the real newly connected nodes.
+    Set<String> newlyConnectedNodes = clusterChanges
+        .getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, Collections.emptySet());
+    newlyConnectedNodes.retainAll(liveInstances);
     if (clusterChanges.containsKey(HelixConstants.ChangeType.CLUSTER_CONFIG) || clusterChanges
-        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
-      // If the cluster topology has been modified, need to reassign all replicas
+        .containsKey(HelixConstants.ChangeType.INSTANCE_CONFIG) || !newlyConnectedNodes.isEmpty()) {
+      // 1. If the cluster topology has been modified, need to reassign all replicas.
+      // 2. If any node was newly connected, need to rebalance all replicas for the evenness of
+      // distribution.
       toBeAssignedReplicas
           .addAll(replicaMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet()));
     } else {
diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
index 445add4..8567c4c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java
@@ -20,6 +20,9 @@ package org.apache.helix.controller.changedetector;
  */
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+
 import org.apache.helix.AccessOption;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.HelixDataAccessor;
@@ -31,6 +34,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.ResourceConfig;
 import org.testng.Assert;
@@ -339,6 +343,47 @@ public class TestResourceChangeDetector extends ZkTestBase {
   }
 
   /**
+   * Modify IdealState mapping fields for a FULL_AUTO resource and see if detector detects.
+   */
+  @Test(dependsOnMethods = "testNoChange")
+  public void testIgnoreControllerGeneratedFields() {
+    // Modify cluster config and IdealState to ensure the mapping field of the IdealState will be
+    // considered as the fields that are modified by Helix logic.
+    ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig());
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig);
+
+    // Create an new IS
+    String resourceName = "Resource" + TestHelper.getTestMethodName();
+    _gSetupTool.getClusterManagementTool()
+        .addResource(CLUSTER_NAME, resourceName, NUM_PARTITIONS, STATE_MODEL);
+    IdealState idealState = _dataAccessor.getProperty(_keyBuilder.idealStates(resourceName));
+    idealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO);
+    _dataAccessor.updateProperty(_keyBuilder.idealStates(resourceName), idealState);
+
+    _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG);
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+
+    ResourceChangeDetector changeDetector = new ResourceChangeDetector(true);
+    changeDetector.updateSnapshots(_dataProvider);
+
+    // Now, modify the field that is modifying by Helix logic
+    idealState.getRecord().getMapFields().put("Extra_Change", new HashMap<>());
+    _dataAccessor.updateProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME), idealState);
+    _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE);
+    _dataProvider.refresh(_dataAccessor);
+    changeDetector.updateSnapshots(_dataProvider);
+
+    Assert.assertEquals(changeDetector.getChangeTypes(),
+        Collections.singleton(ChangeType.IDEAL_STATE));
+    Assert.assertEquals(
+        changeDetector.getAdditionsByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getChangesByType(ChangeType.IDEAL_STATE).size() + changeDetector
+            .getRemovalsByType(ChangeType.IDEAL_STATE).size(), 0);
+  }
+
+  /**
    * Check that the given change types appear in detector's change types.
    * @param types
    */
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 9b6ccbe..abf084d 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.Resource;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.mockito.Mockito;
@@ -348,7 +349,6 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
 
     // Note that this test relies on the MockRebalanceAlgorithm implementation. The mock algorithm
     // won't propagate any existing assignment from the cluster model.
-
     _metadataStore.clearMetadataStore();
     WagedRebalancer rebalancer =
         new WagedRebalancer(_metadataStore, _algorithm, new DelayedAutoRebalancer());
@@ -378,15 +378,15 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
         _metadataStore.getBestPossibleAssignment();
     Assert.assertEquals(bestPossibleAssignment, algorithmResult);
 
-    // 2. rebalance with one ideal state changed only
+    // 2. rebalance with one resource changed in the Resource Config znode only
     String changedResourceName = _resourceNames.get(0);
-    // Create a new cluster data cache to simulate cluster change
-    clusterData = setupClusterDataCache();
     when(clusterData.getRefreshedChangeTypes())
-        .thenReturn(Collections.singleton(HelixConstants.ChangeType.IDEAL_STATE));
-    IdealState is = clusterData.getIdealState(changedResourceName);
-    // Update the tag so the ideal state will be marked as changed.
-    is.setInstanceGroupTag("newTag");
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.RESOURCE_CONFIG));
+    ResourceConfig config = new ResourceConfig(clusterData.getResourceConfig(changedResourceName).getRecord());
+    // Update the config so the resource will be marked as changed.
+    config.putSimpleConfig("foo", "bar");
+    when(clusterData.getResourceConfig(changedResourceName)).thenReturn(config);
+    clusterData.getResourceConfigMap().put(changedResourceName, config);
 
     // Although the input contains 2 resources, the rebalancer shall only call the algorithm to
     // rebalance the changed one.
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
index 2350448..54cbd41 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/model/AbstractTestClusterModel.java
@@ -162,6 +162,10 @@ public abstract class AbstractTestClusterModel {
     testResourceConfigResource2.setPartitionCapacityMap(
         Collections.singletonMap(ResourceConfig.DEFAULT_PARTITION_KEY, capacityDataMapResource2));
     when(testCache.getResourceConfig("Resource2")).thenReturn(testResourceConfigResource2);
+    Map<String, ResourceConfig> configMap = new HashMap<>();
+    configMap.put("Resource1", testResourceConfigResource1);
+    configMap.put("Resource2", testResourceConfigResource2);
+    when(testCache.getResourceConfigMap()).thenReturn(configMap);
 
     // 6. Define mock state model
     for (BuiltInStateModelDefinitions bsmd : BuiltInStateModelDefinitions.values()) {