You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ne...@apache.org on 2022/12/02 18:20:28 UTC
[helix] branch nealsun/waged-pipeline-redesign updated: WAGED tests and metrics (#2302)
This is an automated email from the ASF dual-hosted git repository.
nealsun pushed a commit to branch nealsun/waged-pipeline-redesign
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/nealsun/waged-pipeline-redesign by this push:
new 0c3e9b08d WAGED tests and metrics (#2302)
0c3e9b08d is described below
commit 0c3e9b08dfd4f9d6407484c00adfa4c5d7a5ef5b
Author: Neal Sun <ne...@linkedin.com>
AuthorDate: Fri Dec 2 10:20:23 2022 -0800
WAGED tests and metrics (#2302)
Add WAGED tests and metrics.
---
.../rebalancer/waged/WagedRebalancer.java | 31 ++++-
.../metrics/WagedRebalancerMetricCollector.java | 20 ++-
.../rebalancer/waged/TestWagedRebalancer.java | 141 +++++++++++++++++++++
3 files changed, 188 insertions(+), 4 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index e7c09ab65..0c1b09395 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -112,6 +112,10 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
private final LatencyMetric _writeLatency;
private final CountMetric _partialRebalanceCounter;
private final LatencyMetric _partialRebalanceLatency;
+ private final CountMetric _emergencyRebalanceCounter;
+ private final LatencyMetric _emergencyRebalanceLatency;
+ private final CountMetric _rebalanceOverwriteCounter;
+ private final LatencyMetric _rebalanceOverwriteLatency;
private final LatencyMetric _stateReadLatency;
private final BaselineDivergenceGauge _baselineDivergenceGauge;
@@ -209,6 +213,16 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
.name(),
LatencyMetric.class);
+ _emergencyRebalanceCounter = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class);
+ _emergencyRebalanceLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+ LatencyMetric.class);
+ _rebalanceOverwriteCounter = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(), CountMetric.class);
+ _rebalanceOverwriteLatency = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ LatencyMetric.class);
_writeLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
LatencyMetric.class);
@@ -617,11 +631,15 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
- private Map<String, ResourceAssignment> emergencyRebalance(
+ protected Map<String, ResourceAssignment> emergencyRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ LOG.info("Start emergency rebalance.");
+ _emergencyRebalanceCounter.increment(1L);
+ _emergencyRebalanceLatency.startMeasuringLatency();
+
Map<String, ResourceAssignment> currentBestPossibleAssignment =
getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
@@ -643,6 +661,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Step 2: if there are permanent node downs, calculate for a new one best possible
Map<String, ResourceAssignment> newAssignment;
if (!allNodesActive.get()) {
+ LOG.info("Emergency rebalance responding to permanent node down.");
ClusterModel clusterModel;
try {
clusterModel =
@@ -659,6 +678,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
// Step 3: persist result to metadata store
persistBestPossibleAssignment(newAssignment);
+ _emergencyRebalanceLatency.endMeasuringLatency();
LOG.info("Finish emergency rebalance");
partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
@@ -833,7 +853,7 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
}
}
- private boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
+ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
Map<String, ResourceAssignment> bestPossibleAssignment) {
AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
@@ -872,10 +892,13 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
* @param baseline the baseline assignment.
* @param algorithm the rebalance algorithm.
*/
- private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
+ protected void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
+ _rebalanceOverwriteCounter.increment(1L);
+ _rebalanceOverwriteLatency.startMeasuringLatency();
+
ClusterModel clusterModel;
try {
// Note this calculation uses the baseline as the best possible assignment input here.
@@ -912,6 +935,8 @@ public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDat
Math.min(minActiveReplica, numReplica));
newIdealState.setPreferenceLists(finalPreferenceLists);
+
+ _rebalanceOverwriteLatency.endMeasuringLatency();
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index dcd9a0857..94a99c95a 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -43,6 +43,8 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
// Per-stage latency metrics
GlobalBaselineCalcLatencyGauge,
PartialRebalanceLatencyGauge,
+ EmergencyRebalanceLatencyGauge,
+ RebalanceOverwriteLatencyGauge,
// The following latency metrics are related to AssignmentMetadataStore
StateReadLatencyGauge,
@@ -61,7 +63,9 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
// Waged rebalance counters.
GlobalBaselineCalcCounter,
- PartialRebalanceCounter
+ PartialRebalanceCounter,
+ EmergencyRebalanceCounter,
+ RebalanceOverwriteCounter
}
public WagedRebalancerMetricCollector(String clusterName) {
@@ -97,6 +101,12 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
LatencyMetric partialRebalanceLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.PartialRebalanceLatencyGauge.name(),
getResetIntervalInMs());
+ LatencyMetric emergencyRebalanceLatencyGauge =
+ new RebalanceLatencyGauge(WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
+ getResetIntervalInMs());
+ LatencyMetric rebalanceOverwriteLatencyGauge =
+ new RebalanceLatencyGauge(WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
+ getResetIntervalInMs());
LatencyMetric stateReadLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
getResetIntervalInMs());
@@ -111,15 +121,23 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
new RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
CountMetric partialRebalanceCounter =
new RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
+ CountMetric emergencyRebalanceCounter =
+ new RebalanceCounter(WagedRebalancerMetricNames.EmergencyRebalanceCounter.name());
+ CountMetric rebalanceOverwriteCounter =
+ new RebalanceCounter(WagedRebalancerMetricNames.RebalanceOverwriteCounter.name());
// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
+ addMetric(emergencyRebalanceLatencyGauge);
+ addMetric(rebalanceOverwriteLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
addMetric(baselineDivergenceGauge);
addMetric(calcFailureCount);
addMetric(globalBaselineCalcCounter);
addMetric(partialRebalanceCounter);
+ addMetric(emergencyRebalanceCounter);
+ addMetric(rebalanceOverwriteCounter);
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index 5d4047d5b..d011c5c2e 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -20,9 +20,12 @@ package org.apache.helix.controller.rebalancer.waged;
*/
import java.io.IOException;
+import java.sql.Array;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -32,8 +35,10 @@ import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
+import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
@@ -50,6 +55,7 @@ import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -575,6 +581,141 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
Assert.assertEquals(bestPossibleAssignment, newAlgorithmResult);
}
+ @Test(dependsOnMethods = "testRebalance")
+ public void testEmergencyRebalance() throws IOException, HelixRebalanceException {
+ _metadataStore.reset();
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new MockRebalanceAlgorithm());
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty());
+
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap =
+ clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+ // Populate best possible assignment
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ // Global Rebalance once, Partial Rebalance once
+ verify(spyAlgorithm, times(2)).calculate(any());
+
+ // Artificially insert an offline node in the best possible assignment
+ Map<String, ResourceAssignment> bestPossibleAssignment =
+ _metadataStore.getBestPossibleAssignment();
+ String offlineResource = _resourceNames.get(0);
+ String offlinePartition = _partitionNames.get(0);
+ String offlineState = "MASTER";
+ String offlineInstance = "offlineInstance";
+ for (Partition partition : bestPossibleAssignment.get(offlineResource).getMappedPartitions()) {
+ if (partition.getPartitionName().equals(offlinePartition)) {
+ bestPossibleAssignment.get(offlineResource)
+ .addReplicaMap(partition, Collections.singletonMap(offlineInstance, offlineState));
+ }
+ }
+ _metadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
+
+ // This should trigger both emergency rebalance and partial rebalance
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ ArgumentCaptor<ClusterModel> capturedClusterModel = ArgumentCaptor.forClass(ClusterModel.class);
+ // 2 from previous case, Emergency + Partial from this case, 4 in total
+ verify(spyAlgorithm, times(4)).calculate(capturedClusterModel.capture());
+ // In the cluster model for Emergency rebalance, the assignableReplica is the offline one
+ ClusterModel clusterModelForEmergencyRebalance = capturedClusterModel.getAllValues().get(2);
+ Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().size(), 1);
+ Assert.assertEquals(clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).size(), 1);
+ AssignableReplica assignableReplica =
+ clusterModelForEmergencyRebalance.getAssignableReplicaMap().get(offlineResource).iterator().next();
+ Assert.assertEquals(assignableReplica.getPartitionName(), offlinePartition);
+ Assert.assertEquals(assignableReplica.getReplicaState(), offlineState);
+
+ bestPossibleAssignment = _metadataStore.getBestPossibleAssignment();
+ for (Map.Entry<String, ResourceAssignment> entry : bestPossibleAssignment.entrySet()) {
+ ResourceAssignment resourceAssignment = entry.getValue();
+ for (Partition partition : resourceAssignment.getMappedPartitions()) {
+ for (String instance: resourceAssignment.getReplicaMap(partition).keySet()) {
+ Assert.assertNotSame(instance, offlineInstance);
+ }
+ }
+ }
+ }
+
+ @Test(dependsOnMethods = "testRebalance")
+ public void testRebalanceOverwriteTrigger() throws IOException, HelixRebalanceException {
+ _metadataStore.reset();
+
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ // Enable delay rebalance
+ ClusterConfig clusterConfig = clusterData.getClusterConfig();
+ clusterConfig.setDelayRebalaceEnabled(true);
+ clusterConfig.setRebalanceDelayTime(1);
+ clusterData.setClusterConfig(clusterConfig);
+
+ // force create a fake offlineInstance that's in delay window
+ Set<String> instances = new HashSet<>(_instances);
+ String offlineInstance = "offlineInstance";
+ instances.add(offlineInstance);
+ when(clusterData.getAllInstances()).thenReturn(instances);
+ Map<String, Long> instanceOfflineTimeMap = new HashMap<>();
+ instanceOfflineTimeMap.put(offlineInstance, System.currentTimeMillis() + Integer.MAX_VALUE);
+ when(clusterData.getInstanceOfflineTimeMap()).thenReturn(instanceOfflineTimeMap);
+ Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
+ instanceConfigMap.put(offlineInstance, createMockInstanceConfig(offlineInstance));
+ when(clusterData.getInstanceConfigMap()).thenReturn(instanceConfigMap);
+
+ // Set minActiveReplica to 0 so that requireRebalanceOverwrite returns false
+ Map<String, IdealState> isMap = new HashMap<>();
+ for (String resource : _resourceNames) {
+ IdealState idealState = clusterData.getIdealState(resource);
+ idealState.setMinActiveReplicas(0);
+ isMap.put(resource, idealState);
+ }
+ when(clusterData.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+ when(clusterData.getIdealStates()).thenReturn(isMap);
+
+ MockRebalanceAlgorithm spyAlgorithm = Mockito.spy(new MockRebalanceAlgorithm());
+ WagedRebalancer rebalancer = Mockito.spy(new WagedRebalancer(_metadataStore, spyAlgorithm, Optional.empty()));
+
+ // Cluster config change will trigger baseline to be recalculated.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+ Map<String, Resource> resourceMap =
+ clusterData.getIdealStates().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().forEach(resource::addPartition);
+ return resource;
+ }));
+ // Populate best possible assignment
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ verify(rebalancer, times(1)).requireRebalanceOverwrite(any(), any());
+ verify(rebalancer, times(0)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+
+ // Set minActiveReplica to 1 so that requireRebalanceOverwrite returns true
+ for (String resource : _resourceNames) {
+ IdealState idealState = clusterData.getIdealState(resource);
+ idealState.setMinActiveReplicas(3);
+ isMap.put(resource, idealState);
+ }
+ when(clusterData.getIdealState(anyString())).thenAnswer(
+ (Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
+ when(clusterData.getIdealStates()).thenReturn(isMap);
+
+ _metadataStore.reset();
+ // Update the config so the cluster config will be marked as changed.
+ clusterConfig = clusterData.getClusterConfig();
+ Map<String, Integer> defaultCapacityMap =
+ new HashMap<>(clusterConfig.getDefaultInstanceCapacityMap());
+ defaultCapacityMap.put("foobar", 0);
+ clusterConfig.setDefaultInstanceCapacityMap(defaultCapacityMap);
+ clusterData.setClusterConfig(clusterConfig);
+ rebalancer.computeNewIdealStates(clusterData, resourceMap, new CurrentStateOutput());
+ verify(rebalancer, times(2)).requireRebalanceOverwrite(any(), any());
+ verify(rebalancer, times(1)).applyRebalanceOverwrite(any(), any(), any(), any(), any());
+ }
+
@Test(dependsOnMethods = "testRebalance")
public void testReset() throws IOException, HelixRebalanceException {
_metadataStore.reset();