You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2019/10/28 22:33:21 UTC
[helix] 43/50: Implement monitoring mbeans for the WAGED
rebalancer. (#525)
This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch wagedRebalancer
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 9196bdd503d965d7a702573c112a440ca5606102
Author: Huizhi L <ih...@gmail.com>
AuthorDate: Fri Oct 25 10:50:51 2019 -0700
Implement monitoring mbeans for the WAGED rebalancer. (#525)
Change list:
1. GlobalBaselineCalcCounter: Counter of the global rebalance.
2. PartialRebalanceCounter: Counter of the partial rebalance done.
3. BaselineDivergenceGauge: Gauge of the difference at replica level between the Baseline and the Best Possible assignments.
---
.../rebalancer/util/ResourceUsageCalculator.java | 112 ++++++++++++++++++++-
.../rebalancer/waged/WagedRebalancer.java | 21 ++++
.../metrics/WagedRebalancerMetricCollector.java | 25 ++++-
.../implementation/BaselineDivergenceGauge.java | 68 +++++++++++++
.../RebalanceCounter.java} | 38 +++----
.../implementation/RebalanceLatencyGauge.java | 2 +-
.../monitoring/metrics/model/CountMetric.java | 4 +-
.../monitoring/metrics/model/LatencyMetric.java | 2 +-
.../helix/monitoring/metrics/model/Metric.java | 5 +-
.../model/{CountMetric.java => RatioMetric.java} | 41 +++-----
.../util/TestResourceUsageCalculator.java | 87 ++++++++++++++++
.../rebalancer/waged/TestWagedRebalancer.java | 1 +
.../waged/TestWagedRebalancerMetrics.java | 59 ++++++++++-
...eUsageCalculator.MeasureBaselineDivergence.json | 37 +++++++
14 files changed, 442 insertions(+), 60 deletions(-)
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
index b47a2ed..becf72a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ResourceUsageCalculator.java
@@ -1,11 +1,32 @@
package org.apache.helix.controller.rebalancer.util;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
-import java.util.HashMap;
-import java.util.Map;
public class ResourceUsageCalculator {
/**
@@ -33,4 +54,91 @@ public class ResourceUsageCalculator {
}
return newParticipantUsage;
}
+
+ /**
+ * Measure baseline divergence between baseline assignment and best possible assignment at
+ * replica level. Example as below:
+ * baseline =
+ * {
+ * resource1={
+ * partition1={
+ * instance1=master,
+ * instance2=slave
+ * },
+ * partition2={
+ * instance2=slave
+ * }
+ * }
+ * }
+ * bestPossible =
+ * {
+ * resource1={
+ * partition1={
+ * instance1=master, <--- matched
+ * instance3=slave <--- doesn't match
+ * },
+ * partition2={
+ * instance3=master <--- doesn't match
+ * }
+ * }
+ * }
+ * baseline divergence = (matched: 1) / (doesn't match: 3) = 1/3 ~= 0.333
+ * @param baseline baseline assignment
+ * @param bestPossibleAssignment best possible assignment
+ * @return double value range at [0.0, 1.0]
+ */
+ public static double measureBaselineDivergence(Map<String, ResourceAssignment> baseline,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ int numMatchedReplicas = 0;
+ int numTotalBestPossibleReplicas = 0;
+
+ // 1. Check resource assignment names.
+ for (Map.Entry<String, ResourceAssignment> resourceEntry : bestPossibleAssignment.entrySet()) {
+ String resourceKey = resourceEntry.getKey();
+ if (!baseline.containsKey(resourceKey)) {
+ continue;
+ }
+
+ // Resource assignment names are matched.
+ // 2. check partitions.
+ Map<String, Map<String, String>> bestPossiblePartitions =
+ resourceEntry.getValue().getRecord().getMapFields();
+ Map<String, Map<String, String>> baselinePartitions =
+ baseline.get(resourceKey).getRecord().getMapFields();
+
+ for (Map.Entry<String, Map<String, String>> partitionEntry
+ : bestPossiblePartitions.entrySet()) {
+ String partitionName = partitionEntry.getKey();
+ if (!baselinePartitions.containsKey(partitionName)) {
+ continue;
+ }
+
+ // Partition names are matched.
+ // 3. Check replicas.
+ Map<String, String> bestPossibleReplicas = partitionEntry.getValue();
+ Map<String, String> baselineReplicas = baselinePartitions.get(partitionName);
+
+ for (Map.Entry<String, String> replicaEntry : bestPossibleReplicas.entrySet()) {
+ String replicaName = replicaEntry.getKey();
+ if (!baselineReplicas.containsKey(replicaName)) {
+ continue;
+ }
+
+ // Replica names are matched.
+ // 4. Check replica values.
+ String bestPossibleReplica = replicaEntry.getValue();
+ String baselineReplica = baselineReplicas.get(replicaName);
+ if (bestPossibleReplica.equals(baselineReplica)) {
+ numMatchedReplicas++;
+ }
+ }
+
+ // Count total best possible replicas.
+ numTotalBestPossibleReplicas += bestPossibleReplicas.size();
+ }
+ }
+
+ return numTotalBestPossibleReplicas == 0 ? 0.0d
+ : (double) numMatchedReplicas / (double) numTotalBestPossibleReplicas;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
index 9050b59..5b2573f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/WagedRebalancer.java
@@ -50,11 +50,13 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
@@ -330,6 +332,11 @@ public class WagedRebalancer {
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput) throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
+ CountMetric globalBaselineCalcCounter = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+ CountMetric.class);
+ globalBaselineCalcCounter.increaseCount(1L);
+
LatencyMetric globalBaselineCalcLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
.name(),
@@ -372,6 +379,11 @@ public class WagedRebalancer {
Set<String> activeNodes, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
+ CountMetric partialRebalanceCounter = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+ CountMetric.class);
+ partialRebalanceCounter.increaseCount(1L);
+
LatencyMetric partialRebalanceLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
.name(),
@@ -390,6 +402,14 @@ public class WagedRebalancer {
Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
resourceMap, activeNodes, currentBaseline, currentBestPossibleAssignment);
+ // Asynchronously report baseline divergence metric before persisting to metadata store,
+ // just in case if persisting fails, we still have the metric.
+ BaselineDivergenceGauge baselineDivergenceGauge = _metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+ BaselineDivergenceGauge.class);
+ baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
+ currentBaseline, newAssignment);
+
if (_assignmentMetadataStore != null) {
try {
LatencyMetric writeLatency = _metricCollector.getMetric(
@@ -444,6 +464,7 @@ public class WagedRebalancer {
optimalAssignment.getOptimalResourceAssignment();
LOG.info("Finish calculating an assignment. Took: {} ms.", System.currentTimeMillis() - startTime);
+
return newAssignment;
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
index e9494ff..3dd16ad 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/WagedRebalancerMetricCollector.java
@@ -22,10 +22,14 @@ package org.apache.helix.monitoring.metrics;
import javax.management.JMException;
import org.apache.helix.monitoring.mbeans.MonitorDomainNames;
+import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
+import org.apache.helix.monitoring.metrics.implementation.RebalanceCounter;
import org.apache.helix.monitoring.metrics.implementation.RebalanceFailureCount;
import org.apache.helix.monitoring.metrics.implementation.RebalanceLatencyGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
+
public class WagedRebalancerMetricCollector extends MetricCollector {
private static final String WAGED_REBALANCER_ENTITY_NAME = "WagedRebalancer";
@@ -43,10 +47,20 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
StateReadLatencyGauge,
StateWriteLatencyGauge,
+ /*
+ * Gauge of the difference (state and partition allocation) between the baseline and the best
+ * possible assignment.
+ */
+ BaselineDivergenceGauge,
+
// Count of any rebalance compute failure.
// Note the rebalancer may still be able to return the last known-good assignment on a rebalance
// compute failure. And this fallback logic won't impact this counting.
- RebalanceFailureCounter
+ RebalanceFailureCounter,
+
+ // Waged rebalance counters.
+ GlobalBaselineCalcCounter,
+ PartialRebalanceCounter
}
public WagedRebalancerMetricCollector(String clusterName) throws JMException {
@@ -82,14 +96,23 @@ public class WagedRebalancerMetricCollector extends MetricCollector {
LatencyMetric stateWriteLatencyGauge =
new RebalanceLatencyGauge(WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
getResetIntervalInMs());
+ RatioMetric baselineDivergenceGauge =
+ new BaselineDivergenceGauge(WagedRebalancerMetricNames.BaselineDivergenceGauge.name());
CountMetric calcFailureCount =
new RebalanceFailureCount(WagedRebalancerMetricNames.RebalanceFailureCounter.name());
+ CountMetric globalBaselineCalcCounter =
+ new RebalanceCounter(WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name());
+ CountMetric partialRebalanceCounter =
+ new RebalanceCounter(WagedRebalancerMetricNames.PartialRebalanceCounter.name());
// Add metrics to WagedRebalancerMetricCollector
addMetric(globalBaselineCalcLatencyGauge);
addMetric(partialRebalanceLatencyGauge);
addMetric(stateReadLatencyGauge);
addMetric(stateWriteLatencyGauge);
+ addMetric(baselineDivergenceGauge);
addMetric(calcFailureCount);
+ addMetric(globalBaselineCalcCounter);
+ addMetric(partialRebalanceCounter);
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java
new file mode 100644
index 0000000..8e6d49b
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/BaselineDivergenceGauge.java
@@ -0,0 +1,68 @@
+package org.apache.helix.monitoring.metrics.implementation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Gauge of the difference (state and partition allocation) between the baseline and the best
+ * possible assignment. Its value range is [0.0, 1.0].
+ */
+public class BaselineDivergenceGauge extends RatioMetric {
+ private static final Logger LOG = LoggerFactory.getLogger(BaselineDivergenceGauge.class);
+
+ /**
+ * Instantiates a new Simple dynamic metric.
+ * @param metricName the metric name
+ */
+ public BaselineDivergenceGauge(String metricName) {
+ super(metricName, 0.0d);
+ }
+
+ /**
+ * Asynchronously measure and update metric value.
+ * @param threadPool an executor service to asynchronously run the task
+ * @param baseline baseline assignment
+ * @param bestPossibleAssignment best possible assignment
+ */
+ public void asyncMeasureAndUpdateValue(ExecutorService threadPool,
+ Map<String, ResourceAssignment> baseline,
+ Map<String, ResourceAssignment> bestPossibleAssignment) {
+ AbstractBaseStage.asyncExecute(threadPool, () -> {
+ try {
+ double baselineDivergence =
+ ResourceUsageCalculator.measureBaselineDivergence(baseline, bestPossibleAssignment);
+ updateValue(baselineDivergence);
+ } catch (Exception e) {
+ LOG.error("Failed to report BaselineDivergenceGauge metric.", e);
+ }
+ return null;
+ });
+ }
+}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
similarity index 55%
copy from helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
index 22378dc..fc370a8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceCounter.java
@@ -1,4 +1,4 @@
-package org.apache.helix.monitoring.metrics.model;
+package org.apache.helix.monitoring.metrics.implementation;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -9,7 +9,7 @@ package org.apache.helix.monitoring.metrics.model;
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -19,31 +19,23 @@ package org.apache.helix.monitoring.metrics.model;
* under the License.
*/
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+
/**
- * Defines a generic metric interface.
+ * To report counter type metrics related to rebalance. This monitor monotonically increases values.
*/
-public interface Metric {
-
- /**
- * Gets the name of the metric.
- */
- String getMetricName();
-
+public class RebalanceCounter extends CountMetric {
/**
- * Prints the metric along with its name.
+ * Instantiates a new count metric.
+ * @param metricName the metric name
*/
- String toString();
+ public RebalanceCounter(String metricName) {
+ super(metricName, 0L);
+ }
- /**
- * Returns the most recently emitted value for the metric at the time of the call.
- * @return metric value
- */
- long getLastEmittedMetricValue();
-
- /**
- * Returns the underlying DynamicMetric.
- */
- DynamicMetric getDynamicMetric();
+ @Override
+ public void increaseCount(long count) {
+ updateValue(getValue() + count);
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
index b6e58b4..365f0dc 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/implementation/RebalanceLatencyGauge.java
@@ -76,7 +76,7 @@ public class RebalanceLatencyGauge extends LatencyMetric {
* @return
*/
@Override
- public long getLastEmittedMetricValue() {
+ public Long getLastEmittedMetricValue() {
return _lastEmittedMetricValue;
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
index 424ac9e..81aa001 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
@@ -26,7 +26,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
* Represents a count metric and defines methods to help with calculation. A count metric gives a
* gauge value of a certain property.
*/
-public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric {
+public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric<Long> {
/**
* Instantiates a new count metric.
@@ -56,7 +56,7 @@ public abstract class CountMetric extends SimpleDynamicMetric<Long> implements M
}
@Override
- public long getLastEmittedMetricValue() {
+ public Long getLastEmittedMetricValue() {
return getValue();
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
index d60f245..de63d0f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/LatencyMetric.java
@@ -27,7 +27,7 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
* Represents a latency metric and defines methods to help with calculation. A latency metric gives
* how long a particular stage in the logic took in milliseconds.
*/
-public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric {
+public abstract class LatencyMetric extends HistogramDynamicMetric implements Metric<Long> {
protected long _startTime;
protected long _endTime;
protected String _metricName;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
index 22378dc..be7ea80 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/Metric.java
@@ -23,8 +23,9 @@ import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
/**
* Defines a generic metric interface.
+ * @param <T> type of input value for the metric
*/
-public interface Metric {
+public interface Metric<T> {
/**
* Gets the name of the metric.
@@ -40,7 +41,7 @@ public interface Metric {
* Returns the most recently emitted value for the metric at the time of the call.
* @return metric value
*/
- long getLastEmittedMetricValue();
+ T getLastEmittedMetricValue();
/**
* Returns the underlying DynamicMetric.
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
similarity index 62%
copy from helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
copy to helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
index 424ac9e..d321e51 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/CountMetric.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/metrics/model/RatioMetric.java
@@ -9,7 +9,7 @@ package org.apache.helix.monitoring.metrics.model;
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -22,46 +22,37 @@ package org.apache.helix.monitoring.metrics.model;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
/**
- * Represents a count metric and defines methods to help with calculation. A count metric gives a
- * gauge value of a certain property.
+ * A gauge which defines the ratio of one value to another.
*/
-public abstract class CountMetric extends SimpleDynamicMetric<Long> implements Metric {
-
+public abstract class RatioMetric extends SimpleDynamicMetric<Double> implements Metric<Double> {
/**
- * Instantiates a new count metric.
- *
- * @param metricName the metric name
- * @param initCount the initial count
+ * Instantiates a new Simple dynamic metric.
+ * @param metricName the metric name
+ * @param metricObject the metric object
*/
- public CountMetric(String metricName, long initCount) {
- super(metricName, initCount);
+ public RatioMetric(String metricName, double metricObject) {
+ super(metricName, metricObject);
}
- /**
- * Increment the metric by the input count.
- *
- * @param count
- */
- public abstract void increaseCount(long count);
-
@Override
- public String getMetricName() {
- return _metricName;
+ public DynamicMetric getDynamicMetric() {
+ return this;
}
@Override
- public String toString() {
- return String.format("Metric %s's count is %d", getMetricName(), getValue());
+ public String getMetricName() {
+ return _metricName;
}
@Override
- public long getLastEmittedMetricValue() {
+ public Double getLastEmittedMetricValue() {
return getValue();
}
@Override
- public DynamicMetric getDynamicMetric() {
- return this;
+ public String toString() {
+ return String.format("Metric name: %s, metric value: %f", getMetricName(), getValue());
}
}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
new file mode 100644
index 0000000..76b2318
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/util/TestResourceUsageCalculator.java
@@ -0,0 +1,87 @@
+package org.apache.helix.controller.rebalancer.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.TestInputLoader;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class TestResourceUsageCalculator {
+ @Test(dataProvider = "TestMeasureBaselineDivergenceInput")
+ public void testMeasureBaselineDivergence(Map<String, Map<String, Map<String, String>>> baseline,
+ Map<String, Map<String, Map<String, String>>> someMatchBestPossible,
+ Map<String, Map<String, Map<String, String>>> noMatchBestPossible) {
+ Map<String, ResourceAssignment> baselineAssignment = buildResourceAssignment(baseline);
+ Map<String, ResourceAssignment> someMatchBestPossibleAssignment =
+ buildResourceAssignment(someMatchBestPossible);
+ Map<String, ResourceAssignment> noMatchBestPossibleAssignment =
+ buildResourceAssignment(noMatchBestPossible);
+
+ // Empty best possible assignment.
+ Assert.assertEquals(ResourceUsageCalculator
+ .measureBaselineDivergence(baselineAssignment, Collections.emptyMap()), 0.0d);
+ // Empty baseline assignment.
+ Assert.assertEquals(ResourceUsageCalculator
+ .measureBaselineDivergence(Collections.emptyMap(), noMatchBestPossibleAssignment), 0.0d);
+
+ Assert.assertEquals(ResourceUsageCalculator
+ .measureBaselineDivergence(baselineAssignment, noMatchBestPossibleAssignment), 0.0d);
+ Assert.assertEquals(ResourceUsageCalculator
+ .measureBaselineDivergence(baselineAssignment, someMatchBestPossibleAssignment),
+ (double) 1 / (double) 3);
+ Assert.assertEquals(
+ ResourceUsageCalculator.measureBaselineDivergence(baselineAssignment, baselineAssignment),
+ 1.0d);
+ }
+
+ private Map<String, ResourceAssignment> buildResourceAssignment(
+ Map<String, Map<String, Map<String, String>>> resourceMap) {
+ Map<String, ResourceAssignment> assignment = new HashMap<>();
+ for (Map.Entry<String, Map<String, Map<String, String>>> resourceEntry
+ : resourceMap.entrySet()) {
+ ResourceAssignment resource = new ResourceAssignment(resourceEntry.getKey());
+ Map<String, Map<String, String>> partitionMap = resourceEntry.getValue();
+ for (Map.Entry<String, Map<String, String>> partitionEntry : partitionMap.entrySet()) {
+ resource.addReplicaMap(new Partition(partitionEntry.getKey()), partitionEntry.getValue());
+ }
+
+ assignment.put(resourceEntry.getKey(), resource);
+ }
+
+ return assignment;
+ }
+
+ @DataProvider(name = "TestMeasureBaselineDivergenceInput")
+ public Object[][] loadTestMeasureBaselineDivergenceInput() {
+ final String[] params =
+ new String[]{"baseline", "someMatchBestPossible", "noMatchBestPossible"};
+ return TestInputLoader
+ .loadTestInputs("TestResourceUsageCalculator.MeasureBaselineDivergence.json", params);
+ }
+}
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
index dd0cc8c..9b6ccbe 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancer.java
@@ -107,6 +107,7 @@ public class TestWagedRebalancer extends AbstractTestClusterModel {
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAllInstances()).thenReturn(_instances);
}
return testCache;
diff --git a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
index dc0c89e..30700ed 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/rebalancer/waged/TestWagedRebalancerMetrics.java
@@ -25,9 +25,13 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import javax.management.JMException;
+
+import org.apache.helix.HelixConstants;
import org.apache.helix.HelixRebalanceException;
+import org.apache.helix.TestHelper;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.waged.constraints.MockRebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AbstractTestClusterModel;
@@ -38,13 +42,15 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Resource;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
+import org.apache.helix.monitoring.metrics.model.CountMetric;
+import org.apache.helix.monitoring.metrics.model.RatioMetric;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.*;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
private static final String TEST_STRING = "TEST";
@@ -85,7 +91,52 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
// Check that there exists a non-zero value in the metrics
Assert.assertTrue(_metricCollector.getMetricMap().values().stream()
- .anyMatch(metric -> metric.getLastEmittedMetricValue() > 0L));
+ .anyMatch(metric -> (long) metric.getLastEmittedMetricValue() > 0L));
+ }
+
+ @Test
+ public void testWagedRebalanceMetrics()
+ throws Exception {
+ _metadataStore.clearMetadataStore();
+ MetricCollector metricCollector = new WagedRebalancerMetricCollector(TEST_STRING);
+ WagedRebalancer rebalancer = new WagedRebalancer(_metadataStore, _algorithm, metricCollector);
+ // Generate the input for the rebalancer.
+ ResourceControllerDataProvider clusterData = setupClusterDataCache();
+ Map<String, Resource> resourceMap = clusterData.getIdealStates().entrySet().stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry -> {
+ Resource resource = new Resource(entry.getKey());
+ entry.getValue().getPartitionSet().stream()
+ .forEach(partition -> resource.addPartition(partition));
+ return resource;
+ }));
+
+ Assert.assertEquals((long) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+ CountMetric.class).getLastEmittedMetricValue(), 0L);
+ Assert.assertEquals((long) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+ CountMetric.class).getLastEmittedMetricValue(), 0L);
+ Assert.assertEquals((double) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+ RatioMetric.class).getLastEmittedMetricValue(), 0.0d);
+
+ // Cluster config change will trigger baseline recalculation and partial rebalance.
+ when(clusterData.getRefreshedChangeTypes())
+ .thenReturn(Collections.singleton(HelixConstants.ChangeType.CLUSTER_CONFIG));
+
+ rebalancer.computeBestPossibleStates(clusterData, resourceMap, new CurrentStateOutput());
+
+ Assert.assertEquals((long) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
+ CountMetric.class).getLastEmittedMetricValue(), 1L);
+ Assert.assertEquals((long) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
+ CountMetric.class).getLastEmittedMetricValue(), 1L);
+
+ // Wait for asyncReportBaselineDivergenceGauge to complete and verify.
+ Assert.assertTrue(TestHelper.verify(() -> (double) metricCollector.getMetric(
+ WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
+ RatioMetric.class).getLastEmittedMetricValue() == 1.0d, TestHelper.WAIT_DURATION));
}
@Override
@@ -108,6 +159,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
when(testCache.getIdealState(anyString())).thenAnswer(
(Answer<IdealState>) invocationOnMock -> isMap.get(invocationOnMock.getArguments()[0]));
when(testCache.getIdealStates()).thenReturn(isMap);
+ when(testCache.getAsyncTasksThreadPool()).thenReturn(Executors.newSingleThreadExecutor());
// Set up 2 more instances
for (int i = 1; i < 3; i++) {
@@ -125,6 +177,7 @@ public class TestWagedRebalancerMetrics extends AbstractTestClusterModel {
when(testCache.getLiveInstances()).thenReturn(liveInstanceMap);
when(testCache.getEnabledInstances()).thenReturn(liveInstanceMap.keySet());
when(testCache.getEnabledLiveInstances()).thenReturn(liveInstanceMap.keySet());
+ when(testCache.getAllInstances()).thenReturn(_instances);
}
return testCache;
diff --git a/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json b/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json
new file mode 100644
index 0000000..dab432e
--- /dev/null
+++ b/helix-core/src/test/resources/TestResourceUsageCalculator.MeasureBaselineDivergence.json
@@ -0,0 +1,37 @@
+[
+ {
+ "baseline": {
+ "resource1": {
+ "partition1": {
+ "instance1": "MASTER",
+ "instance2": "SLAVE"
+ },
+ "partition2": {
+ "instance2": "SLAVE"
+ }
+ }
+ },
+ "someMatchBestPossible": {
+ "resource1": {
+ "partition1": {
+ "instance1": "MASTER",
+ "instance3": "SLAVE"
+ },
+ "partition2": {
+ "instance3": "MASTER"
+ }
+ }
+ },
+ "noMatchBestPossible": {
+ "resource1": {
+ "partition1": {
+ "instance2": "MASTER",
+ "instance3": "SLAVE"
+ },
+ "partition2": {
+ "instance3": "MASTER"
+ }
+ }
+ }
+ }
+]