You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/12/02 18:20:28 UTC

[helix] branch nealsun/waged-pipeline-redesign updated: WAGED tests and metrics (#2302)

This is an automated email from the ASF dual-hosted git repository.

nealsun pushed a commit to branch nealsun/waged-pipeline-redesign
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/nealsun/waged-pipeline-redesign by this push:
     new 0c3e9b08d WAGED tests and metrics (#2302)
0c3e9b08d is described below

commit 0c3e9b08dfd4f9d6407484c00adfa4c5d7a5ef5b
Author: Neal Sun <ne...@linkedin.com>
AuthorDate: Fri Dec 2 10:20:23 2022 -0800

    WAGED tests and metrics (#2302)
    
    Add WAGED tests and metrics.
---
 .../rebalancer/waged/WagedRebalancer.java          |  31 ++++-
 .../metrics/WagedRebalancerMetricCollector.java    |  20 ++-
 .../rebalancer/waged/TestWagedRebalancer.java      | 141 +++++++++++++++++++++
 3 files changed, 188 insertions(+), 4 deletions(-)

diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e7c09ab65..0c1b09395 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -112,6 +112,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
   private final LatencyMetric _writeLatency;
   private final CountMetric _partialRebalanceCounter;
   private final LatencyMetric _partialRebalanceLatency;
+  private final CountMetric _emergencyRebalanceCounter;
+  private final LatencyMetric _emergencyRebalanceLatency;
+  private final CountMetric _rebalanceOverwriteCounter;
+  private final LatencyMetric _rebalanceOverwriteLatency;
   private final LatencyMetric _stateReadLatency;
   private final BaselineDivergenceGauge _baselineDivergenceGauge;
 
@@ -209,6 +213,16 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
             .name(),
         LatencyMetric.class);
+    _emergencyRebalanceCounter = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class);
+    _emergencyRebalanceLatency = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+        LatencyMetric.class);
+    _rebalanceOverwriteCounter = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(), CountMetric.class);
+    _rebalanceOverwriteLatency = _metricCollector.getMetric(
+        WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+        LatencyMetric.class);
     _writeLatency = _metricCollector.getMetric(
         WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
         LatencyMetric.class);
@@ -617,11 +631,15 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private Map<String, ResourceAssignment> emergencyRebalance(
+  protected Map<String, ResourceAssignment> emergencyRebalance(
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
       RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    LOG.info("Start emergency rebalance.");
+    _emergencyRebalanceCounter.increment(1L);
+    _emergencyRebalanceLatency.startMeasuringLatency();
+
     Map<String, ResourceAssignment> currentBestPossibleAssignment =
         getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
             resourceMap.keySet());
@@ -643,6 +661,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     // Step 2: if there are permanent node downs, calculate for a new one best possible
     Map<String, ResourceAssignment> newAssignment;
     if (!allNodesActive.get()) {
+      LOG.info("Emergency rebalance responding to permanent node down.");
       ClusterModel clusterModel;
       try {
         clusterModel =
@@ -659,6 +678,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
 
     // Step 3: persist result to metadata store
     persistBestPossibleAssignment(newAssignment);
+    _emergencyRebalanceLatency.endMeasuringLatency();
     LOG.info("Finish emergency rebalance");
 
     partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
@@ -833,7 +853,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
     }
   }
 
-  private boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
+  protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
       Map<String, ResourceAssignment> bestPossibleAssignment) {
     AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
     bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
@@ -872,10 +892,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
    * @param baseline the baseline assignment.
    * @param algorithm the rebalance algorithm.
    */
-  private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+  protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
       ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
       Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
       throws HelixRebalanceException {
+    _rebalanceOverwriteCounter.increment(1L);
+    _rebalanceOverwriteLatency.startMeasuringLatency();
+
     ClusterModel clusterModel;
     try {
       // Note this calculation uses the baseline as the best possible assignment input here.
@@ -912,6 +935,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
               Math.min(minActiveReplica, numReplica));
 
       newIdealState.setPreferenceLists(finalPreferenceLists);
+
+      _rebalanceOverwriteLatency.endMeasuringLatency();
     }
   }
 
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index dcd9a0857..94a99c95a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -43,6 +43,8 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
     // Per-stage latency metrics
     GlobalBaselineCalcLatencyGauge,
     PartialRebalanceLatencyGauge,
+    EmergencyRebalanceLatencyGauge,
+    RebalanceOverwriteLatencyGauge,
 
     // The following latency metrics are related to AssignmentMetadataStore
     StateReadLatencyGauge,
@@ -61,7 +63,9 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
 
     // Waged rebalance counters.
     GlobalBaselineCalcCounter,
-    PartialRebalanceCounter
+    PartialRebalanceCounter,
+    EmergencyRebalanceCounter,
+    RebalanceOverwriteCounter
   }
 
   public WagedRebalancerMetricCollector(String clusterName) {
@@ -97,6 +101,12 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
     LatencyMetric partialRebalanceLatencyGauge =
         new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
             getResetIntervalInMs());
+    LatencyMetric emergencyRebalanceLatencyGauge =
+        new RebalanceLatencyGauge(WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+            getResetIntervalInMs());
+    LatencyMetric rebalanceOverwriteLatencyGauge =
+        new RebalanceLatencyGauge(WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+            getResetIntervalInMs());
     LatencyMetric stateReadLatencyGauge =
         new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
             getResetIntervalInMs());
@@ -111,15 +121,23 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
         new RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
     CountMetric partialRebalanceCounter =
         new RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
+    CountMetric emergencyRebalanceCounter =
+        new RebalanceCounter(WagedRebalancerMetricNames.EmergencyRebalanceCounter.name());
+    CountMetric rebalanceOverwriteCounter =
+        new RebalanceCounter(WagedRebalancerMetricNames.RebalanceOverwriteCounter.name());
 
     // Add metrics to WagedRebalancerMetricCollector
     addMetric(globalBaselineCalcLatencyGauge);
     addMetric(partialRebalanceLatencyGauge);
+    addMetric(emergencyRebalanceLatencyGauge);
+    addMetric(rebalanceOverwriteLatencyGauge);
     addMetric(stateReadLatencyGauge);
     addMetric(stateWriteLatencyGauge);
     addMetric(baselineDivergenceGauge);
     addMetric(calcFailureCount);
     addMetric(globalBaselineCalcCounter);
     addMetric(partialRebalanceCounter);
+    addMetric(emergencyRebalanceCounter);
+    addMetric(rebalanceOverwriteCounter);
   }
 }
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 5d4047d5b..d011c5c2e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,9 +20,12 @@ package org.apache.helix.controller.rebalancer.waged;
  */
 
 import java.io.IOException;
+import java.sql.Array;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -32,8 +35,10 @@ import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixRebalanceException;
 import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
 import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
 import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
 import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
 import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
 import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -50,6 +55,7 @@ import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
 import org.apache.helix.monitoring.metrics.model.CountMetric;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -575,6 +581,141 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
     Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
   }
 
+  @Test(dependsOnMethods = "testRebalance")
+  public void testEmergencyRebalance() throws IOException, HelixRebalanceException {
+    _metadataStore.reset();
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty());
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    // Global Rebalance once, Partial Rebalance once
+    verify(spyAlgorithm, times(2)).calculate(any());
+
+    // Artificially insert an offline node in the best possible assignment
+    Map<String, ResourceAssignment> bestPossibleAssignment =
+        _metadataStore.getBestPossibleAssignment();
+    String offlineResource = _resourceNames.get(0);
+    String offlinePartition = _partitionNames.get(0);
+    String offlineState = "MASTER";
+    String offlineInstance = "offlineInstance";
+    for (Partition partition : bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
+      if (partition.getPartitionName().equals(offlinePartition)) {
+        bestPossibleAssignment.get(offlineResource)
+            .addReplicaMap(partition, Collections.singletonMap(offlineInstance, offlineState));
+      }
+    }
+    _metadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+
+    // This should trigger both emergency rebalance and partial rebalance
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    ArgumentCaptor<ClusterModel> capturedClusterModel = ArgumentCaptor.forClass(ClusterModel.class);
+    // 2 from previous case, Emergency + Partial from this case, 4 in total
+    verify(spyAlgorithm, times(4)).calculate(capturedClusterModel.capture());
+    // In the cluster model for Emergency rebalance, the assignableReplica is the offline one
+    ClusterModel clusterModelForEmergencyRebalance = capturedClusterModel.getAllValues().get(2);
+    Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().size(), 1);
+    Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).size(), 1);
+    AssignableReplica assignableReplica =
+        clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).iterator().next();
+    Assert.assertEquals(assignableReplica.getPartitionName(), offlinePartition);
+    Assert.assertEquals(assignableReplica.getReplicaState(), offlineState);
+
+    bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
+    for (Map.Entry<String, ResourceAssignment> entry : bestPossibleAssignment.entrySet()) {
+      ResourceAssignment resourceAssignment = entry.getValue();
+      for (Partition partition : resourceAssignment.getMappedPartitions()) {
+        for (String instance: resourceAssignment.getReplicaMap(partition).keySet()) {
+          Assert.assertNotSame(instance, offlineInstance);
+        }
+      }
+    }
+  }
+
+  @Test(dependsOnMethods = "testRebalance")
+  public void testRebalanceOverwriteTrigger() throws IOException, HelixRebalanceException {
+    _metadataStore.reset();
+
+    ResourceControllerDataProvider clusterData = setupClusterDataCache();
+    // Enable delay rebalance
+    ClusterConfig clusterConfig = clusterData.getClusterConfig();
+    clusterConfig.setDelayRebalaceEnabled(true);
+    clusterConfig.setRebalanceDelayTime(1);
+    clusterData.setClusterConfig(clusterConfig);
+
+    // force create a fake offlineInstance that's in delay window
+    Set<String> instances = new HashSet<>(_instances);
+    String offlineInstance = "offlineInstance";
+    instances.add(offlineInstance);
+    when(clusterData.getAllInstances()).thenReturn(instances);
+    Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+    instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
+    when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+    Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+    instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+    when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+    // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns false
+    Map<String, IdealState> isMap = new HashMap<>();
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(0);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new MockRebalanceAlgorithm());
+    WagedRebalancer rebalancer = Mockito.spy(new WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty()));
+
+    // Cluster config change will trigger baseline to be recalculated.
+    when(clusterData.getRefreshedChangeTypes())
+        .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+    Map<String, Resource> resourceMap =
+        clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+          Resource resource = new Resource(entry.getKey());
+          entry.getValue().getPartitionSet().forEach(resource::addPartition);
+          return resource;
+        }));
+    // Populate best possible assignment
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+
+    // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
+    for (String resource : _resourceNames) {
+      IdealState idealState = clusterData.getIdealState(resource);
+      idealState.setMinActiveReplicas(3);
+      isMap.put(resource, idealState);
+    }
+    when(clusterData.getIdealState(anyString())).thenAnswer(
+        (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+    when(clusterData.getIdealStates()).thenReturn(isMap);
+
+    _metadataStore.reset();
+    // Update the config so the cluster config will be marked as changed.
+    clusterConfig = clusterData.getClusterConfig();
+    Map<String, Integer> defaultCapacityMap =
+        new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+    defaultCapacityMap.put("foobar", 0);
+    clusterConfig.setDefaultInstanceCapacityMap(defaultCapacityMap);
+    clusterData.setClusterConfig(clusterConfig);
+    rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+    verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
+    verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+  }
+
   @Test(dependsOnMethods = "testRebalance")
   public void testReset() throws IOException, HelixRebalanceException {
     _metadataStore.reset();