You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by as...@apache.org on 2018/04/09 18:09:44 UTC
[incubator-heron] branch master updated: Update Dhalion dependency
version (#2821)
This is an automated email from the ASF dual-hosted git repository.
ashvin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new cc4b2d7 Update Dhalion dependency version (#2821)
cc4b2d7 is described below
commit cc4b2d7d016e138999ab5476e178e76eac922b78
Author: Ashvin <as...@users.noreply.github.com>
AuthorDate: Mon Apr 9 11:09:41 2018 -0700
Update Dhalion dependency version (#2821)
* Refactor healthmgr for dhalion-v2 compatibility
* Update health manager's metrics collection for v2
* Added detectors, diagnosers
* Make health resolvers dhalion-v2 compatible
* Make health diagnosers dhalion-v2 compatible
* Make health policy dhalion-v2 api compatible
* Update MetricsCache provider as per dhalion-v2 api
* Modified skew detector and tests
* Tag facts with execution timestamp
* Update Dhalion dependency version
* Address review comments in PR #2821
* Fix checkstyle and unittest errors in HealthMgr
---
WORKSPACE | 17 +-
heron/executor/src/python/heron_executor.py | 2 +-
.../tests/python/heron_executor_unittest.py | 2 +-
heron/healthmgr/src/java/BUILD | 3 +
.../healthmgr/common/ComponentMetricsHelper.java | 111 -----------
.../healthmgr/common/HealthManagerEvents.java | 13 +-
.../healthmgr/common/PhysicalPlanProvider.java | 6 +-
.../healthmgr/detectors/BackPressureDetector.java | 59 +++---
.../heron/healthmgr/detectors/BaseDetector.java | 17 +-
.../detectors/GrowingWaitQueueDetector.java | 79 +++++---
.../detectors/LargeWaitQueueDetector.java | 64 ++++---
.../detectors/ProcessingRateSkewDetector.java | 11 +-
.../heron/healthmgr/detectors/SkewDetector.java | 101 +++++++---
...ityDetector.java => WaitQueueSkewDetector.java} | 15 +-
.../heron/healthmgr/diagnosers/BaseDiagnoser.java | 60 +-----
.../healthmgr/diagnosers/DataSkewDiagnoser.java | 95 +++++-----
.../diagnosers/SlowInstanceDiagnoser.java | 93 +++++-----
.../diagnosers/UnderProvisioningDiagnoser.java | 66 ++++---
.../AutoRestartBackpressureContainerPolicy.java | 52 ++----
.../policy/DynamicResourceAllocationPolicy.java | 46 +++--
.../resolvers/RestartContainerResolver.java | 105 ++++++-----
.../heron/healthmgr/resolvers/ScaleUpResolver.java | 122 +++++++------
.../healthmgr/sensors/BackPressureSensor.java | 77 +++-----
.../heron/healthmgr/sensors/BaseSensor.java | 20 +-
.../heron/healthmgr/sensors/BufferSizeSensor.java | 90 +++------
.../healthmgr/sensors/ExecuteCountSensor.java | 22 +--
.../sensors/MetricsCacheMetricsProvider.java | 86 ++++-----
.../healthmgr/sensors/TrackerMetricsProvider.java | 73 +++-----
.../com/twitter/heron/healthmgr/TestUtils.java | 69 -------
.../common/ComponentMetricsHelperTest.java | 70 -------
.../healthmgr/common/PackingPlanProviderTest.java | 4 +-
.../healthmgr/common/TopologyProviderTest.java | 2 +-
.../detectors/BackPressureDetectorTest.java | 73 +++++---
.../detectors/GrowingWaitQueueDetectorTest.java | 114 ++++++------
.../detectors/LargeWaitQueueDetectorTest.java | 56 +++---
.../detectors/ProcessingRateSkewDetectorTest.java | 203 ++++++++++++++-------
.../detectors/WaitQueueDisparityDetectorTest.java | 71 -------
.../detectors/WaitQueueSkewDetectorTest.java | 88 +++++++++
.../diagnosers/DataSkewDiagnoserTest.java | 124 +++++++++----
.../diagnosers/SlowInstanceDiagnoserTest.java | 98 +++++++---
.../diagnosers/UnderProvisioningDiagnoserTest.java | 82 ++++++---
.../healthmgr/resolvers/ScaleUpResolverTest.java | 87 +++++----
.../healthmgr/sensors/BackPressureSensorTest.java | 55 ++++--
.../healthmgr/sensors/BufferSizeSensorTest.java | 55 +++---
.../healthmgr/sensors/ExecuteCountSensorTest.java | 70 ++++---
.../sensors/MetricsCacheMetricsProviderTest.java | 162 ++++++++--------
.../sensors/TrackerMetricsProviderTest.java | 153 +++++++---------
47 files changed, 1592 insertions(+), 1551 deletions(-)
diff --git a/WORKSPACE b/WORKSPACE
index c0c40bb..72b582f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -460,7 +460,7 @@ maven_jar(
maven_jar(
name = "com_microsoft_dhalion",
- artifact = "com.microsoft.dhalion:dhalion:0.0.1_2",
+ artifact = "com.microsoft.dhalion:dhalion:0.2.1",
)
maven_jar(
@@ -468,6 +468,21 @@ maven_jar(
artifact = "org.apache.commons:commons-math3:3.6.1"
)
+maven_jar(
+ name = "tech_tablesaw",
+ artifact = "tech.tablesaw:tablesaw-core:0.11.4"
+)
+
+maven_jar(
+ name = "it_unimi_dsi_fastutil",
+ artifact = "it.unimi.dsi:fastutil:8.1.1"
+)
+
+maven_jar(
+ name = "org_roaringbitmap",
+ artifact = "org.roaringbitmap:RoaringBitmap:0.6.51"
+)
+
# Google Cloud
maven_jar(
name = "google_api_services_storage",
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 04db9b8..3f36e5d 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -498,7 +498,7 @@ class HeronExecutor(object):
"--cluster", self.cluster,
"--role", self.role,
"--environment", self.environment,
- "--topology_name", self.topology_name, "--verbose"]
+ "--topology_name", self.topology_name]
return healthmgr_cmd
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index f6155c0..5b661e7 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -131,7 +131,7 @@ class HeronExecutorTest(unittest.TestCase):
"-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \
"-cp scheduler_classpath:healthmgr_classpath " \
"com.twitter.heron.healthmgr.HealthManager --cluster cluster --role role " \
- "--environment environ --topology_name topname --verbose"
+ "--environment environ --topology_name topname"
def get_expected_instance_command(component_name, instance_id, container_id):
instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
diff --git a/heron/healthmgr/src/java/BUILD b/heron/healthmgr/src/java/BUILD
index 2587c8f..c081a08 100644
--- a/heron/healthmgr/src/java/BUILD
+++ b/heron/healthmgr/src/java/BUILD
@@ -42,6 +42,9 @@ healthmgr_deps_files = [
"@com_microsoft_dhalion//jar",
"@aopalliance_aopalliance//jar",
"@org_apache_commons_commons_math3//jar",
+ "@tech_tablesaw//jar",
+ "@it_unimi_dsi_fastutil//jar",
+ "@org_roaringbitmap//jar",
]
filegroup(
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
deleted file mode 100644
index 6a85188..0000000
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-
-import com.twitter.heron.healthmgr.sensors.BaseSensor;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-
-
-/**
- * A helper class to compute and hold metrics derived from component metrics
- */
-public class ComponentMetricsHelper {
- private final ComponentMetrics componentMetrics;
-
- private List<InstanceMetrics> boltsWithBackpressure = new ArrayList<>();
- private double maxBufferChangeRate = 0;
- private double totalBackpressure = 0;
-
- public ComponentMetricsHelper(ComponentMetrics compMetrics) {
- this.componentMetrics = compMetrics;
- }
-
- public void computeBpStats() {
- for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
- Double bpValue = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
- if (bpValue != null && bpValue > 0) {
- boltsWithBackpressure.add(instanceMetrics);
- totalBackpressure += bpValue;
- }
- }
- }
-
- public void computeBufferSizeTrend() {
- for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
- Map<Instant, Double> bufferMetrics
- = instanceMetrics.getMetrics().get(METRIC_BUFFER_SIZE.text());
- if (bufferMetrics == null || bufferMetrics.size() < 3) {
- // missing of insufficient data for creating a trend line
- continue;
- }
-
- SimpleRegression simpleRegression = new SimpleRegression(true);
- for (Instant timestamp : bufferMetrics.keySet()) {
- simpleRegression.addData(timestamp.getEpochSecond(), bufferMetrics.get(timestamp));
- }
-
- double slope = simpleRegression.getSlope();
- instanceMetrics.addMetric(METRIC_WAIT_Q_GROWTH_RATE.text(), slope);
-
- if (maxBufferChangeRate < slope) {
- maxBufferChangeRate = slope;
- }
- }
- }
-
- public MetricsStats computeMinMaxStats(BaseSensor.MetricName metric) {
- return computeMinMaxStats(metric.text());
- }
-
- public MetricsStats computeMinMaxStats(String metric) {
- double metricMax = 0;
- double metricMin = Double.MAX_VALUE;
- for (InstanceMetrics instance : componentMetrics.getMetrics().values()) {
-
- Double metricValue = instance.getMetricValueSum(metric);
- if (metricValue == null) {
- continue;
- }
- metricMax = metricMax < metricValue ? metricValue : metricMax;
- metricMin = metricMin > metricValue ? metricValue : metricMin;
- }
- return new MetricsStats(metricMin, metricMax);
- }
-
- public double getTotalBackpressure() {
- return totalBackpressure;
- }
-
- public double getMaxBufferChangeRate() {
- return maxBufferChangeRate;
- }
-
- public List<InstanceMetrics> getBoltsWithBackpressure() {
- return boltsWithBackpressure;
- }
-}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
index 5a207ee..7c81b5b 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
@@ -14,7 +14,10 @@
package com.twitter.heron.healthmgr.common;
-import com.microsoft.dhalion.resolver.Action;
+import java.time.Instant;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Action;
public class HealthManagerEvents {
@@ -22,8 +25,8 @@ public class HealthManagerEvents {
* This event is created when a resolver executes topology update action
*/
public static class TopologyUpdate extends Action {
- public TopologyUpdate() {
- super(TopologyUpdate.class.getSimpleName());
+ public TopologyUpdate(Instant timestamp, Collection<String> assignments) {
+ super(TopologyUpdate.class.getSimpleName(), timestamp, assignments, null);
}
}
@@ -31,8 +34,8 @@ public class HealthManagerEvents {
* This event is created when a resolver executes restart container action
*/
public static class ContainerRestart extends Action {
- public ContainerRestart() {
- super(ContainerRestart.class.getSimpleName());
+ public ContainerRestart(Instant timestamp, Collection<String> assignments) {
+ super(ContainerRestart.class.getSimpleName(), timestamp, assignments, null);
}
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
index b23bc2a..12f12f0 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -54,7 +54,7 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
@Override
public synchronized void onEvent(TopologyUpdate event) {
LOG.info(
- "Received topology update event, invalidating cached PhysicalPlan: " + event.getName());
+ "Received topology update event, invalidating cached PhysicalPlan: " + event.type());
physicalPlan = null;
}
});
@@ -64,8 +64,8 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
*/
@Override
public synchronized void onEvent(ContainerRestart event) {
- LOG.info("Received conatiner restart event, invalidating cached PhysicalPlan: "
- + event.getName());
+ LOG.info("Received container restart event, invalidating cached PhysicalPlan: "
+ + event.type());
physicalPlan = null;
}
});
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
index fd3b50b..74845a8 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
@@ -15,34 +15,33 @@
package com.twitter.heron.healthmgr.detectors;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.logging.Logger;
import javax.inject.Inject;
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-public class BackPressureDetector implements IDetector {
- public static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
+public class BackPressureDetector extends BaseDetector {
+ static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
private static final Logger LOG = Logger.getLogger(BackPressureDetector.class.getName());
- private final BackPressureSensor bpSensor;
private final int noiseFilterMillis;
@Inject
- BackPressureDetector(BackPressureSensor bpSensor,
- HealthPolicyConfig policyConfig) {
- this.bpSensor = bpSensor;
+ BackPressureDetector(HealthPolicyConfig policyConfig) {
noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
}
@@ -50,23 +49,33 @@ public class BackPressureDetector implements IDetector {
* Detects all components initiating backpressure above the configured limit. Normally there
* will be only one component
*
- * @return A collection of all components causing backpressure.
+ * @return A collection of symptoms each one corresponding to a components with backpressure.
*/
@Override
- public List<Symptom> detect() {
- ArrayList<Symptom> result = new ArrayList<>();
+ public Collection<Symptom> detect(Collection<Measurement> measurements) {
+ Collection<Symptom> result = new ArrayList<>();
+ Instant now = context.checkpoint();
- Map<String, ComponentMetrics> backpressureMetrics = bpSensor.get();
- for (ComponentMetrics compMetrics : backpressureMetrics.values()) {
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
- compStats.computeBpStats();
- if (compStats.getTotalBackpressure() > noiseFilterMillis) {
- LOG.info(String.format("Detected back pressure for %s, total back pressure is %f",
- compMetrics.getName(), compStats.getTotalBackpressure()));
- result.add(new Symptom(SYMPTOM_BACK_PRESSURE.text(), compMetrics));
+ MeasurementsTable bpMetrics
+ = MeasurementsTable.of(measurements).type(METRIC_BACK_PRESSURE.text());
+ for (String component : bpMetrics.uniqueComponents()) {
+ double compBackPressure = bpMetrics.component(component).sum();
+ if (compBackPressure > noiseFilterMillis) {
+ LOG.info(String.format("Detected component back-pressure for %s, total back pressure is %f",
+ component, compBackPressure));
+ List<String> addresses = Collections.singletonList(component);
+ result.add(new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, addresses));
+ }
+ }
+ for (String instance : bpMetrics.uniqueInstances()) {
+ double totalBP = bpMetrics.instance(instance).sum();
+ if (totalBP > noiseFilterMillis) {
+ LOG.info(String.format("Detected instance back-pressure for %s, total back pressure is %f",
+ instance, totalBP));
+ List<String> addresses = Collections.singletonList(instance);
+ result.add(new Symptom(SYMPTOM_INSTANCE_BACK_PRESSURE.text(), now, addresses));
}
}
-
return result;
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
index 9af1ef7..334857e 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
@@ -15,18 +15,22 @@
package com.twitter.heron.healthmgr.detectors;
import com.microsoft.dhalion.api.IDetector;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
public abstract class BaseDetector implements IDetector {
- public enum SymptomName {
- SYMPTOM_BACK_PRESSURE(BackPressureDetector.class.getSimpleName()),
+ protected ExecutionContext context;
+
+ public enum SymptomType {
+ SYMPTOM_COMP_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Component"),
+ SYMPTOM_INSTANCE_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Instance"),
SYMPTOM_GROWING_WAIT_Q(GrowingWaitQueueDetector.class.getSimpleName()),
SYMPTOM_LARGE_WAIT_Q(LargeWaitQueueDetector.class.getSimpleName()),
SYMPTOM_PROCESSING_RATE_SKEW(ProcessingRateSkewDetector.class.getSimpleName()),
- SYMPTOM_WAIT_Q_DISPARITY(WaitQueueDisparityDetector.class.getSimpleName());
+ SYMPTOM_WAIT_Q_SIZE_SKEW(WaitQueueSkewDetector.class.getSimpleName());
private String text;
- SymptomName(String name) {
+ SymptomType(String name) {
this.text = name;
}
@@ -39,4 +43,9 @@ public abstract class BaseDetector implements IDetector {
return text();
}
}
+
+ @Override
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
index 0cf305e..855cd12 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
@@ -16,34 +16,33 @@
package com.twitter.heron.healthmgr.detectors;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
import java.util.logging.Logger;
import javax.inject.Inject;
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
-public class GrowingWaitQueueDetector implements IDetector {
- static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
+public class GrowingWaitQueueDetector extends BaseDetector {
+ static final String CONF_LIMIT
+ = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName());
- private final BufferSizeSensor pendingBufferSensor;
private final double rateLimit;
@Inject
- GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
- HealthPolicyConfig policyConfig) {
- this.pendingBufferSensor = pendingBufferSensor;
+ GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) {
rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0);
}
@@ -51,23 +50,53 @@ public class GrowingWaitQueueDetector implements IDetector {
* Detects all components unable to keep up with input load, hence having a growing pending buffer
* or wait queue
*
- * @return A collection of all components executing slower than input rate.
+ * @return A collection of symptoms each one corresponding to a components executing slower
+ * than input rate.
*/
@Override
- public List<Symptom> detect() {
- ArrayList<Symptom> result = new ArrayList<>();
-
- Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
- for (ComponentMetrics compMetrics : bufferSizes.values()) {
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
- compStats.computeBufferSizeTrend();
- if (compStats.getMaxBufferChangeRate() > rateLimit) {
+ public Collection<Symptom> detect(Collection<Measurement> measurements) {
+ Collection<Symptom> result = new ArrayList<>();
+
+ MeasurementsTable waitQueueMetrics
+ = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+
+ for (String component : waitQueueMetrics.uniqueComponents()) {
+ double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component));
+ if (maxSlope > rateLimit) {
LOG.info(String.format("Detected growing wait queues for %s, max rate %f",
- compMetrics.getName(), compStats.getMaxBufferChangeRate()));
- result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics));
+ component, maxSlope));
+ Collection<String> addresses = Collections.singletonList(component);
+ result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses));
}
}
return result;
}
+
+
+ private double computeWaitQueueSizeTrend(MeasurementsTable metrics) {
+ double maxSlope = 0;
+ for (String instance : metrics.uniqueInstances()) {
+
+ if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) {
+ // missing of insufficient data for creating a trend line
+ continue;
+ }
+
+ Collection<Measurement> measurements
+ = metrics.instance(instance).sort(false, MeasurementsTable.SortKey.TIME_STAMP).get();
+ SimpleRegression simpleRegression = new SimpleRegression(true);
+
+ for (Measurement m : measurements) {
+ simpleRegression.addData(m.instant().getEpochSecond(), m.value());
+ }
+
+ double slope = simpleRegression.getSlope();
+
+ if (maxSlope < slope) {
+ maxSlope = slope;
+ }
+ }
+ return maxSlope;
+ }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
index 9a720e0..bd5a93c 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
@@ -16,59 +16,65 @@
package com.twitter.heron.healthmgr.detectors;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.logging.Logger;
import javax.inject.Inject;
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_LARGE_WAIT_Q;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_LARGE_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
-public class LargeWaitQueueDetector implements IDetector {
+public class LargeWaitQueueDetector extends BaseDetector {
static final String CONF_SIZE_LIMIT = "LargeWaitQueueDetector.limit";
private static final Logger LOG = Logger.getLogger(LargeWaitQueueDetector.class.getName());
- private final BufferSizeSensor pendingBufferSensor;
private final int sizeLimit;
@Inject
- LargeWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
- HealthPolicyConfig policyConfig) {
- this.pendingBufferSensor = pendingBufferSensor;
+ LargeWaitQueueDetector(HealthPolicyConfig policyConfig) {
sizeLimit = (int) policyConfig.getConfig(CONF_SIZE_LIMIT, 1000);
}
/**
- * Detects all components unable to keep up with input load, hence having a large pending buffer
- * or wait queue
+ * Detects all components having a large pending buffer or wait queue
*
- * @return A collection of all components executing slower than input rate.
+ * @return A collection of symptoms each one corresponding to components with
+ * large wait queues.
*/
@Override
- public List<Symptom> detect() {
- ArrayList<Symptom> result = new ArrayList<>();
-
- Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
- for (ComponentMetrics compMetrics : bufferSizes.values()) {
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
- MetricsStats stats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE.text());
- if (stats.getMetricMin() > sizeLimit) {
- LOG.info(String.format("Detected large wait queues for %s, smallest queue is %f",
- compMetrics.getName(), stats.getMetricMin()));
- result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), compMetrics));
+ public Collection<Symptom> detect(Collection<Measurement> measurements) {
+
+ Collection<Symptom> result = new ArrayList<>();
+
+ MeasurementsTable waitQueueMetrics
+ = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+ for (String component : waitQueueMetrics.uniqueComponents()) {
+ Set<String> addresses = new HashSet<>();
+ MeasurementsTable instanceMetrics = waitQueueMetrics.component(component);
+ for (String instance : instanceMetrics.uniqueInstances()) {
+ double avgWaitQSize = instanceMetrics.instance(instance).mean();
+ if (avgWaitQSize > sizeLimit) {
+ LOG.info(String.format("Detected large wait queues for instance"
+ + "%s, smallest queue is + %f", instance, avgWaitQSize));
+ addresses.add(instance);
+ }
+ }
+ if (addresses.size() > 0) {
+ result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), context.checkpoint(), addresses));
}
}
return result;
}
}
+
+
+
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
index 78c0ca7..f32c7be 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
@@ -18,16 +18,15 @@ package com.twitter.heron.healthmgr.detectors;
import javax.inject.Inject;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
public class ProcessingRateSkewDetector extends SkewDetector {
public static final String CONF_SKEW_RATIO = "ProcessingRateSkewDetector.skewRatio";
@Inject
- ProcessingRateSkewDetector(ExecuteCountSensor exeCountSensor,
- HealthPolicyConfig policyConfig) {
- super(exeCountSensor,
- (double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
- BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW);
+ ProcessingRateSkewDetector(HealthPolicyConfig policyConfig) {
+ super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
+ BaseSensor.MetricName.METRIC_EXE_COUNT,
+ BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW);
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
index fa26c32..473435d 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
@@ -15,53 +15,100 @@
package com.twitter.heron.healthmgr.detectors;
+import java.time.Instant;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import javax.inject.Inject;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
import com.twitter.heron.healthmgr.sensors.BaseSensor;
public class SkewDetector extends BaseDetector {
- private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName());
- private final BaseSensor sensor;
private final double skewRatio;
- private final BaseDetector.SymptomName symptomName;
+ private final String metricName;
+ private final BaseDetector.SymptomType symptomType;
@Inject
- SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) {
- this.sensor = sensor;
+ SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType
+ symptomType) {
this.skewRatio = skewRatio;
- this.symptomName = symptom;
+ this.metricName = metricName.text();
+ this.symptomType = symptomType;
}
/**
- * Detects components experiencing data skew, instances with vastly different execute counts.
+ * Detects components experiencing skew on a specific metric
*
- * @return A collection of affected components
+ * @return At most two symptoms corresponding to each affected component -- one for positive skew
+ * and one for negative skew
*/
@Override
- public List<Symptom> detect() {
- ArrayList<Symptom> result = new ArrayList<>();
-
- Map<String, ComponentMetrics> metrics = sensor.get();
- for (ComponentMetrics compMetrics : metrics.values()) {
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
- MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName());
- if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
- LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
- compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax()));
- result.add(new Symptom(symptomName.text(), compMetrics));
+ public Collection<Symptom> detect(Collection<Measurement> measurements) {
+ Collection<Symptom> result = new ArrayList<>();
+
+ MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName);
+ Instant now = context.checkpoint();
+ for (String component : metrics.uniqueComponents()) {
+ Set<String> addresses = new HashSet<>();
+ Set<String> positiveAddresses = new HashSet<>();
+ Set<String> negativeAddresses = new HashSet<>();
+
+ double componentMax = getMaxOfAverage(metrics.component(component));
+ double componentMin = getMinOfAverage(metrics.component(component));
+ if (componentMax > skewRatio * componentMin) {
+ //there is skew
+ addresses.add(component);
+ result.add(new Symptom(symptomType.text(), now, addresses));
+
+ for (String instance : metrics.component(component).uniqueInstances()) {
+ if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+ positiveAddresses.add(instance);
+ }
+ if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+ negativeAddresses.add(instance);
+ }
+ }
+
+ if (!positiveAddresses.isEmpty()) {
+ result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses));
+ }
+ if (!negativeAddresses.isEmpty()) {
+ result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses));
+ }
}
- }
+ }
return result;
}
+
+ @VisibleForTesting
+ double getMaxOfAverage(MeasurementsTable table) {
+ double max = 0;
+ for (String instance : table.uniqueInstances()) {
+ double instanceMean = table.instance(instance).mean();
+ if (instanceMean > max) {
+ max = instanceMean;
+ }
+ }
+ return max;
+ }
+
+ @VisibleForTesting
+ double getMinOfAverage(MeasurementsTable table) {
+ double min = Double.MAX_VALUE;
+ for (String instance : table.uniqueInstances()) {
+ double instanceMean = table.instance(instance).mean();
+ if (instanceMean < min) {
+ min = instanceMean;
+ }
+ }
+ return min;
+ }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
similarity index 59%
rename from heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
rename to heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
index c8e62d2..133e342 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
@@ -18,16 +18,15 @@ package com.twitter.heron.healthmgr.detectors;
import javax.inject.Inject;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
-public class WaitQueueDisparityDetector extends SkewDetector {
- public static final String CONF_DISPARITY_RATIO = "WaitQueueDisparityDetector.disparityRatio";
+public class WaitQueueSkewDetector extends SkewDetector {
+ static final String CONF_SKEW_RATIO = "WaitQueueSkewDetector.skewRatio";
@Inject
- WaitQueueDisparityDetector(BufferSizeSensor pendingBufferSensor,
- HealthPolicyConfig policyConfig) {
- super(pendingBufferSensor,
- (double) policyConfig.getConfig(CONF_DISPARITY_RATIO, 20.0),
- BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY);
+ WaitQueueSkewDetector(HealthPolicyConfig policyConfig) {
+ super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 20.0),
+ BaseSensor.MetricName.METRIC_WAIT_Q_SIZE,
+ BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW);
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
index 1075984..b56e488 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
@@ -14,26 +14,19 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import com.microsoft.dhalion.api.IDiagnoser;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
public abstract class BaseDiagnoser implements IDiagnoser {
- public enum DiagnosisName {
- SYMPTOM_UNDER_PROVISIONING("SYMPTOM_UNDER_PROVISIONING"),
- SYMPTOM_DATA_SKEW("SYMPTOM_DATA_SKEW"),
- SYMPTOM_SLOW_INSTANCE("SYMPTOM_SLOW_INSTANCE"),
+ protected ExecutionContext context;
+
+ @Override
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
+
+ public enum DiagnosisType {
DIAGNOSIS_UNDER_PROVISIONING(UnderProvisioningDiagnoser.class.getSimpleName()),
DIAGNOSIS_SLOW_INSTANCE(SlowInstanceDiagnoser.class.getSimpleName()),
@@ -41,7 +34,7 @@ public abstract class BaseDiagnoser implements IDiagnoser {
private String text;
- DiagnosisName(String name) {
+ DiagnosisType(String name) {
this.text = name;
}
@@ -54,37 +47,4 @@ public abstract class BaseDiagnoser implements IDiagnoser {
return text();
}
}
-
- List<Symptom> getBackPressureSymptoms(List<Symptom> symptoms) {
- return getFilteredSymptoms(symptoms, SYMPTOM_BACK_PRESSURE);
- }
-
- Map<String, ComponentMetrics> getProcessingRateSkewComponents(List<Symptom> symptoms) {
- return getFilteredComponents(symptoms, SYMPTOM_PROCESSING_RATE_SKEW);
- }
-
- Map<String, ComponentMetrics> getWaitQDisparityComponents(List<Symptom> symptoms) {
- return getFilteredComponents(symptoms, SYMPTOM_WAIT_Q_DISPARITY);
- }
-
- private List<Symptom> getFilteredSymptoms(List<Symptom> symptoms, SymptomName type) {
- List<Symptom> result = new ArrayList<>();
- for (Symptom symptom : symptoms) {
- if (symptom.getName().equals(type.text())) {
- result.add(symptom);
- }
- }
- return result;
- }
-
- private Map<String, ComponentMetrics> getFilteredComponents(List<Symptom> symptoms,
- SymptomName type) {
- Map<String, ComponentMetrics> result = new HashMap<>();
- for (Symptom symptom : symptoms) {
- if (symptom.getName().equals(type.text())) {
- result.putAll(symptom.getComponents());
- }
- }
- return result;
- }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
index 0082f2d..564dcfb 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
@@ -15,73 +15,74 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.logging.Logger;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_DATA_SKEW;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
public class DataSkewDiagnoser extends BaseDiagnoser {
private static final Logger LOG = Logger.getLogger(DataSkewDiagnoser.class.getName());
@Override
- public Diagnosis diagnose(List<Symptom> symptoms) {
- List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
- Map<String, ComponentMetrics> processingRateSkewComponents =
- getProcessingRateSkewComponents(symptoms);
- Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+ public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+ Collection<Diagnosis> diagnoses = new ArrayList<>();
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
- if (bpSymptoms.isEmpty() || processingRateSkewComponents.isEmpty()
- || waitQDisparityComponents.isEmpty()) {
- // Since there is no back pressure or disparate execute count, no action is needed
- return null;
- } else if (bpSymptoms.size() > 1) {
+ SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ if (bp.size() > 1) {
// TODO handle cases where multiple detectors create back pressure symptom
throw new IllegalStateException("Multiple back-pressure symptoms case");
}
- ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+ if (bp.size() == 0) {
+ return diagnoses;
+ }
+ String bpComponent = bp.first().assignments().iterator().next();
+
+ SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+ SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
// verify data skew, larger queue size and back pressure for the same component exists
- ComponentMetrics exeCountMetrics = processingRateSkewComponents.get(bpMetrics.getName());
- ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
- if (exeCountMetrics == null || pendingBufferMetrics == null) {
- // no processing rate skew and buffer size skew
- // for the component with back pressure. This is not a data skew case
- return null;
+ if (waitQSkew.assignment(bpComponent).size() == 0
+ || processingRateSkew.assignment(bpComponent).size() == 0) {
+ return diagnoses;
}
- ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics,
- ComponentMetrics.merge(exeCountMetrics, pendingBufferMetrics));
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
- compStats.computeBpStats();
- MetricsStats exeStats = compStats.computeMinMaxStats(METRIC_EXE_COUNT);
- MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+ Collection<String> assignments = new ArrayList<>();
- Symptom resultSymptom = null;
- for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
- double exeCount = boltMetrics.getMetricValueSum(METRIC_EXE_COUNT.text());
- double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
- double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
- if (exeStats.getMetricMax() < 1.10 * exeCount
- && bufferStats.getMetricMax() < 2 * bufferSize) {
- LOG.info(String.format("DataSkew: %s back-pressure(%s), high execution count: %s and "
- + "high buffer size %s", boltMetrics.getName(), bpValue, exeCount, bufferSize));
- resultSymptom = new Symptom(SYMPTOM_DATA_SKEW.text(), mergedData);
+ Instant newest = context.checkpoint();
+ Instant oldest = context.previousCheckpoint();
+ MeasurementsTable measurements = context.measurements()
+ .between(oldest, newest)
+ .component(bpComponent);
+
+ for (String instance : measurements.uniqueInstances()) {
+ MeasurementsTable instanceMeasurements = measurements.instance(instance);
+ double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+ double processingRate = instanceMeasurements.type(METRIC_EXE_COUNT.text()).mean();
+ if ((measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2)
+ && (measurements.type(METRIC_EXE_COUNT.text()).max() < 1.10 * processingRate)) {
+ assignments.add(instance);
+ LOG.info(String.format("DataSkew: %s back-pressure, high execution count: %s and "
+ + "high buffer size %s", instance, processingRate, waitQSize));
}
}
- return resultSymptom != null ? new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), resultSymptom) : null;
+ if (assignments.size() > 0) {
+ diagnoses.add(new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), context.checkpoint(), assignments));
+ }
+
+ return diagnoses;
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
index af1ae4c..673c3b6 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
@@ -14,69 +14,74 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.logging.Logger;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
public class SlowInstanceDiagnoser extends BaseDiagnoser {
private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
@Override
- public Diagnosis diagnose(List<Symptom> symptoms) {
- List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
- Map<String, ComponentMetrics> processingRateSkewComponents =
- getProcessingRateSkewComponents(symptoms);
- Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+ public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+ Collection<Diagnosis> diagnoses = new ArrayList<>();
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
- if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
- || !processingRateSkewComponents.isEmpty()) {
- // Since there is no back pressure or disparate wait count or similar
- // execution count, no action is needed
- return null;
- } else if (bpSymptoms.size() > 1) {
+ SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ if (bp.size() > 1) {
// TODO handle cases where multiple detectors create back pressure symptom
throw new IllegalStateException("Multiple back-pressure symptoms case");
}
- ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+ if (bp.size() == 0) {
+ return diagnoses;
+ }
+ String bpComponent = bp.first().assignments().iterator().next();
- // verify wait Q disparity and back pressure for the same component exists
- ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
- if (pendingBufferMetrics == null) {
- // no wait Q disparity for the component with back pressure. There is no slow instance
- return null;
+ SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+ SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+ // verify wait Q disparity, similar processing rates and back pressure for the same component
+ // exist
+ if (waitQSkew.assignment(bpComponent).size() == 0
+ || processingRateSkew.assignment(bpComponent).size() > 0) {
+ // TODO in a short window rate skew could exist
+ return diagnoses;
}
- ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, pendingBufferMetrics);
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
- compStats.computeBpStats();
- MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+ Collection<String> assignments = new ArrayList<>();
+
+ Instant newest = context.checkpoint();
+ Instant oldest = context.previousCheckpoint();
+ MeasurementsTable measurements = context.measurements()
+ .between(oldest, newest)
+ .component(bpComponent);
- Symptom resultSymptom = null;
- for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
- double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
- double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
- if (bufferStats.getMetricMax() < bufferSize * 2) {
- LOG.info(String.format("SLOW: %s back-pressure(%s) and high buffer size: %s "
+ for (String instance : measurements.uniqueInstances()) {
+ MeasurementsTable instanceMeasurements = measurements.instance(instance);
+ double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+ if (measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2) {
+ assignments.add(instance);
+ LOG.info(String.format("SLOW: %s back-pressure and high buffer size: %s "
+ "and similar processing rates",
- boltMetrics.getName(), bpValue, bufferSize));
- resultSymptom = new Symptom(SYMPTOM_SLOW_INSTANCE.text(), mergedData);
+ instance, waitQSize));
}
}
- return resultSymptom != null
- ? new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), resultSymptom) : null;
+ if (assignments.size() > 0) {
+ Instant now = context.checkpoint();
+ diagnoses.add(new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), now, assignments));
+ }
+
+ return diagnoses;
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
index 033002f..445a188 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
@@ -15,48 +15,56 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.logging.Logger;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
public class UnderProvisioningDiagnoser extends BaseDiagnoser {
private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
@Override
- public Diagnosis diagnose(List<Symptom> symptoms) {
- List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
- Map<String, ComponentMetrics> processingRateSkewComponents =
- getProcessingRateSkewComponents(symptoms);
- Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
-
- if (bpSymptoms.isEmpty() || !processingRateSkewComponents.isEmpty()
- || !waitQDisparityComponents.isEmpty()) {
- // Since there is no back pressure or similar processing rates
- // and buffer sizes, no action is needed
- return null;
- } else if (bpSymptoms.size() > 1) {
+ public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+ Collection<Diagnosis> diagnoses = new ArrayList<>();
+
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+ SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ if (bp.size() > 1) {
// TODO handle cases where multiple detectors create back pressure symptom
throw new IllegalStateException("Multiple back-pressure symptoms case");
}
- ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
- ComponentMetricsHelper compStats = new ComponentMetricsHelper(bpMetrics);
- compStats.computeBpStats();
- LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure(%s) and similar processing rates "
- + "and buffer sizes",
- bpMetrics.getName(), compStats.getTotalBackpressure()));
+ if (bp.size() == 0) {
+ return diagnoses;
+ }
+ String bpComponent = bp.first().assignments().iterator().next();
+
+ SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+ SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+
+ if (waitQSkew.assignment(bpComponent).size() != 0
+ || processingRateSkew.assignment(bpComponent).size() != 0) {
+ return diagnoses;
+ }
+
+ Collection<String> assignments = Collections.singletonList(bpComponent);
+ LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure and similar processing rates "
+ + "and wait queue sizes", bpComponent));
+
+ diagnoses.add(
+ new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), context.checkpoint(), assignments));
+ //TODO verify large wait queue for all instances
- Symptom resultSymptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), bpMetrics);
- return new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), resultSymptom);
+ return diagnoses;
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
index 57e18b9..3dc1ef7 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
@@ -15,36 +15,27 @@
package com.twitter.heron.healthmgr.policy;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import javax.inject.Inject;
-import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
import com.microsoft.dhalion.events.EventHandler;
import com.microsoft.dhalion.events.EventManager;
import com.microsoft.dhalion.policy.HealthPolicyImpl;
-import com.twitter.heron.common.basics.TypeUtils;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
-import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
-import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
import com.twitter.heron.healthmgr.resolvers.RestartContainerResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
/**
* This Policy class
* 1. detector: find out the container that has been in backpressure
- * state for long time, which we believe the container cannot recover.
+ * state for long time, which we believe the container cannot recover.
* 2. resolver: try to restart the backpressure container so as to be rescheduled.
*/
public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
@@ -56,44 +47,29 @@ public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
Logger.getLogger(AutoRestartBackpressureContainerPolicy.class.getName());
private final HealthPolicyConfig policyConfig;
- private final RestartContainerResolver restartContainerResolver;
@Inject
- AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig, EventManager eventManager,
- BackPressureDetector backPressureDetector,
- ProcessingRateSkewDetector processingRateSkewDetector,
- WaitQueueDisparityDetector waitQueueDisparityDetector,
- SlowInstanceDiagnoser slowInstanceDiagnoser,
- RestartContainerResolver restartContainerResolver) {
+ AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig,
+ EventManager eventManager,
+ BackPressureSensor backPressureSensor,
+ BackPressureDetector backPressureDetector,
+ RestartContainerResolver restartContainerResolver) {
this.policyConfig = policyConfig;
- this.restartContainerResolver = restartContainerResolver;
- registerDetectors(backPressureDetector, waitQueueDisparityDetector, processingRateSkewDetector);
- registerDiagnosers(slowInstanceDiagnoser);
+ registerSensors(backPressureSensor);
+ registerDetectors(backPressureDetector);
+ registerResolvers(restartContainerResolver);
- setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
- TypeUtils.getInteger(policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
+ setPolicyExecutionInterval(
+ Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
eventManager.addEventListener(ContainerRestart.class, this);
}
@Override
- public IResolver selectResolver(List<Diagnosis> diagnosis) {
- Map<String, Diagnosis> diagnosisMap =
- diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
-
- if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
- return restartContainerResolver;
- }
-
- LOG.warning("Unknown diagnoses. None resolver selected.");
- return null;
- }
-
- @Override
public void onEvent(ContainerRestart event) {
int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
LOG.info("Received container restart action event: " + event);
- setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+ setOneTimeDelay(Duration.ofMillis(interval));
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
index 61c0048..b060b40 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
@@ -15,16 +15,15 @@
package com.twitter.heron.healthmgr.policy;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Collection;
import java.util.logging.Logger;
-import java.util.stream.Collectors;
import javax.inject.Inject;
import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
import com.microsoft.dhalion.events.EventHandler;
import com.microsoft.dhalion.events.EventManager;
import com.microsoft.dhalion.policy.HealthPolicyImpl;
@@ -34,16 +33,19 @@ import com.twitter.heron.healthmgr.common.HealthManagerEvents.TopologyUpdate;
import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
import com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector;
import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
+import com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector;
import com.twitter.heron.healthmgr.diagnosers.DataSkewDiagnoser;
import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
import com.twitter.heron.healthmgr.diagnosers.UnderProvisioningDiagnoser;
import com.twitter.heron.healthmgr.resolvers.ScaleUpResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
+import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
implements EventHandler<TopologyUpdate> {
@@ -59,10 +61,13 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
@Inject
DynamicResourceAllocationPolicy(HealthPolicyConfig policyConfig,
EventManager eventManager,
+ BackPressureSensor backPressureSensor,
+ BufferSizeSensor bufferSizeSensor,
+ ExecuteCountSensor executeCountSensor,
BackPressureDetector backPressureDetector,
LargeWaitQueueDetector largeWaitQueueDetector,
ProcessingRateSkewDetector dataSkewDetector,
- WaitQueueDisparityDetector waitQueueDisparityDetector,
+ WaitQueueSkewDetector waitQueueSkewDetector,
UnderProvisioningDiagnoser underProvisioningDiagnoser,
DataSkewDiagnoser dataSkewDiagnoser,
SlowInstanceDiagnoser slowInstanceDiagnoser,
@@ -70,26 +75,27 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
this.policyConfig = policyConfig;
this.scaleUpResolver = scaleUpResolver;
+ registerSensors(backPressureSensor, bufferSizeSensor, executeCountSensor);
registerDetectors(backPressureDetector, largeWaitQueueDetector,
- waitQueueDisparityDetector, dataSkewDetector);
+ waitQueueSkewDetector, dataSkewDetector);
registerDiagnosers(underProvisioningDiagnoser, dataSkewDiagnoser, slowInstanceDiagnoser);
+ registerResolvers(scaleUpResolver);
- setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
- (int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000));
+ setPolicyExecutionInterval(
+ Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
eventManager.addEventListener(TopologyUpdate.class, this);
}
@Override
- public IResolver selectResolver(List<Diagnosis> diagnosis) {
- Map<String, Diagnosis> diagnosisMap
- = diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
+ public IResolver selectResolver(Collection<Diagnosis> diagnosis) {
+ DiagnosisTable diagnosisTable = DiagnosisTable.of(diagnosis);
- if (diagnosisMap.containsKey(DIAGNOSIS_DATA_SKEW.text())) {
+ if (diagnosisTable.type(DIAGNOSIS_DATA_SKEW.text()).size() > 0) {
LOG.warning("Data Skew diagnoses. This diagnosis does not have any resolver.");
- } else if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
+ } else if (diagnosisTable.type(DIAGNOSIS_SLOW_INSTANCE.text()).size() > 0) {
LOG.warning("Slow Instance diagnoses. This diagnosis does not have any resolver.");
- } else if (diagnosisMap.containsKey(DIAGNOSIS_UNDER_PROVISIONING.text())) {
+ } else if (diagnosisTable.type(DIAGNOSIS_UNDER_PROVISIONING.text()).size() > 0) {
return scaleUpResolver;
}
@@ -100,6 +106,6 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
public void onEvent(TopologyUpdate event) {
int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
LOG.info("Received topology update action event: " + event);
- setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+ setOneTimeDelay(Duration.ofMillis(interval));
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
index b7b7a31..7bc7833 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
@@ -13,7 +13,10 @@
// limitations under the License.
package com.twitter.heron.healthmgr.resolvers;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.logging.Logger;
@@ -21,89 +24,85 @@ import javax.inject.Inject;
import javax.inject.Named;
import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.SymptomsTable;
import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
-import com.twitter.heron.healthmgr.common.PhysicalPlanProvider;
import com.twitter.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
import com.twitter.heron.scheduler.client.ISchedulerClient;
import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
-import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
public class RestartContainerResolver implements IResolver {
private static final Logger LOG = Logger.getLogger(RestartContainerResolver.class.getName());
- private final PhysicalPlanProvider physicalPlanProvider;
private final EventManager eventManager;
private final String topologyName;
private final ISchedulerClient schedulerClient;
- private final int noiseFilterMillis;
+ private ExecutionContext context;
@Inject
public RestartContainerResolver(@Named(CONF_TOPOLOGY_NAME) String topologyName,
- PhysicalPlanProvider physicalPlanProvider, EventManager eventManager,
- ISchedulerClient schedulerClient, HealthPolicyConfig policyConfig) {
+ EventManager eventManager,
+ ISchedulerClient schedulerClient) {
this.topologyName = topologyName;
- this.physicalPlanProvider = physicalPlanProvider;
this.eventManager = eventManager;
this.schedulerClient = schedulerClient;
- this.noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
}
@Override
- public List<Action> resolve(List<Diagnosis> diagnosis) {
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
+
+ @Override
+ public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
List<Action> actions = new ArrayList<>();
- for (Diagnosis diagnoses : diagnosis) {
- Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_SLOW_INSTANCE.text());
- if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
- // nothing to fix as there is no back pressure
- continue;
- }
-
- if (bpSymptom.getComponents().size() > 1) {
- throw new UnsupportedOperationException("Multiple components with back pressure symptom");
- }
-
- // want to know which stmgr has backpressure
- String stmgrId = null;
- for (InstanceMetrics im : bpSymptom.getComponent().getMetrics().values()) {
- if (im.hasMetricAboveLimit(METRIC_BACK_PRESSURE.text(), noiseFilterMillis)) {
- String instanceId = im.getName();
- int fromIndex = instanceId.indexOf('_') + 1;
- int toIndex = instanceId.indexOf('_', fromIndex);
- stmgrId = instanceId.substring(fromIndex, toIndex);
- break;
- }
- }
+ // find all back pressure measurements reported in this execution cycle
+ Instant current = context.checkpoint();
+ Instant previous = context.previousCheckpoint();
+ SymptomsTable bpSymptoms = context.symptoms()
+ .type(SYMPTOM_INSTANCE_BACK_PRESSURE.text())
+ .between(previous, current);
+
+ if (bpSymptoms.size() == 0) {
+ LOG.fine("No back-pressure measurements found, ending as there's nothing to fix");
+ return actions;
+ }
+
+ Collection<String> allBpInstances = new HashSet<>();
+ bpSymptoms.get().forEach(symptom -> allBpInstances.addAll(symptom.assignments()));
+
+ LOG.info(String.format("%d instances caused back-pressure", allBpInstances.size()));
+
+ Collection<String> stmgrIds = new HashSet<>();
+ allBpInstances.forEach(instanceId -> {
+ LOG.info("Id of instance causing back-pressure: " + instanceId);
+ int fromIndex = instanceId.indexOf('_') + 1;
+ int toIndex = instanceId.indexOf('_', fromIndex);
+ String stmgrId = instanceId.substring(fromIndex, toIndex);
+ stmgrIds.add(stmgrId);
+ });
+
+ stmgrIds.forEach(stmgrId -> {
LOG.info("Restarting container: " + stmgrId);
boolean b = schedulerClient.restartTopology(
RestartTopologyRequest.newBuilder()
- .setContainerIndex(Integer.valueOf(stmgrId))
- .setTopologyName(topologyName)
- .build());
+ .setContainerIndex(Integer.valueOf(stmgrId))
+ .setTopologyName(topologyName)
+ .build());
LOG.info("Restarted container result: " + b);
+ });
- ContainerRestart action = new ContainerRestart();
- LOG.info("Broadcasting container restart event");
- eventManager.onEvent(action);
-
- actions.add(action);
- return actions;
- }
-
+ LOG.info("Broadcasting container restart event");
+ ContainerRestart action = new ContainerRestart(current, stmgrIds);
+ eventManager.onEvent(action);
+ actions.add(action);
return actions;
}
-
- @Override
- public void close() {
- }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
index 4f37489..3ed7ef2 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
@@ -13,8 +13,11 @@
// limitations under the License.
package com.twitter.heron.healthmgr.resolvers;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -24,12 +27,12 @@ import javax.inject.Inject;
import com.google.common.annotations.VisibleForTesting;
import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
+import com.microsoft.dhalion.core.MeasurementsTable;
import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
import com.twitter.heron.api.generated.TopologyAPI.Topology;
import com.twitter.heron.common.basics.SysUtils;
@@ -46,7 +49,7 @@ import com.twitter.heron.spi.packing.PackingPlan;
import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
import com.twitter.heron.spi.utils.ReflectionUtils;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
@@ -58,6 +61,7 @@ public class ScaleUpResolver implements IResolver {
private ISchedulerClient schedulerClient;
private EventManager eventManager;
private Config config;
+ private ExecutionContext context;
@Inject
public ScaleUpResolver(TopologyProvider topologyProvider,
@@ -73,75 +77,85 @@ public class ScaleUpResolver implements IResolver {
}
@Override
- public List<Action> resolve(List<Diagnosis> diagnosis) {
- for (Diagnosis diagnoses : diagnosis) {
- Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_UNDER_PROVISIONING.text());
- if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
- // nothing to fix as there is no back pressure
- continue;
- }
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
- if (bpSymptom.getComponents().size() > 1) {
- throw new UnsupportedOperationException("Multiple components with back pressure symptom");
- }
+ @Override
+ public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
+ List<Action> actions = new ArrayList<>();
- ComponentMetrics bpComponent = bpSymptom.getComponent();
- int newParallelism = computeScaleUpFactor(bpComponent);
- Map<String, Integer> changeRequest = new HashMap<>();
- changeRequest.put(bpComponent.getName(), newParallelism);
+ DiagnosisTable table = DiagnosisTable.of(diagnosis);
+ table = table.type(DIAGNOSIS_UNDER_PROVISIONING.text());
- PackingPlan currentPackingPlan = packingPlanProvider.get();
- PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
- if (newPlan == null) {
- return null;
- }
+ if (table.size() == 0) {
+ LOG.fine("No under-previsioning diagnosis present, ending as there's nothing to fix");
+ return actions;
+ }
- Scheduler.UpdateTopologyRequest updateTopologyRequest =
- Scheduler.UpdateTopologyRequest.newBuilder()
- .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
- .setProposedPackingPlan(getSerializedPlan(newPlan))
- .build();
+ // Scale the first assigned component
+ Diagnosis diagnoses = table.first();
+ // verify diagnoses instance is valid
+ if (diagnoses.assignments().isEmpty()) {
+ LOG.warning(String.format("Diagnosis %s is missing assignments", diagnoses.id()));
+ return actions;
+ }
+ String component = diagnoses.assignments().iterator().next();
- LOG.info("Sending Updating topology request: " + updateTopologyRequest);
- if (!schedulerClient.updateTopology(updateTopologyRequest)) {
- throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
- + "updateTopologyRequest=%s", updateTopologyRequest));
- }
+ int newParallelism = computeScaleUpFactor(component);
+ Map<String, Integer> changeRequest = new HashMap<>();
+ changeRequest.put(component, newParallelism);
- TopologyUpdate action = new TopologyUpdate();
- LOG.info("Broadcasting topology update event");
- eventManager.onEvent(action);
+ PackingPlan currentPackingPlan = packingPlanProvider.get();
+ PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
+ if (newPlan == null) {
+ return null;
+ }
- LOG.info("Scheduler updated topology successfully.");
+ Scheduler.UpdateTopologyRequest updateTopologyRequest =
+ Scheduler.UpdateTopologyRequest.newBuilder()
+ .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
+ .setProposedPackingPlan(getSerializedPlan(newPlan))
+ .build();
- List<Action> actions = new ArrayList<>();
- actions.add(action);
- return actions;
+ LOG.info("Sending Updating topology request: " + updateTopologyRequest);
+ if (!schedulerClient.updateTopology(updateTopologyRequest)) {
+ throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
+ + "updateTopologyRequest=%s", updateTopologyRequest));
}
+ LOG.info("Scheduler updated topology successfully.");
+
+ LOG.info("Broadcasting topology update event");
+ TopologyUpdate action
+ = new TopologyUpdate(context.checkpoint(), Collections.singletonList(component));
+ eventManager.onEvent(action);
- return null;
+ actions.add(action);
+ return actions;
}
@VisibleForTesting
- int computeScaleUpFactor(ComponentMetrics componentMetrics) {
- double totalCompBpTime = 0;
- String compName = componentMetrics.getName();
- for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
- double instanceBp = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
- LOG.info(String.format("Instance:%s, bpTime:%.0f", instanceMetrics.getName(), instanceBp));
- totalCompBpTime += instanceBp;
- }
-
+ int computeScaleUpFactor(String compName) {
+ Instant newest = context.checkpoint();
+ Instant oldest = context.previousCheckpoint();
+ MeasurementsTable table = context.measurements()
+ .component(compName)
+ .type(METRIC_BACK_PRESSURE.text())
+ .between(oldest, newest);
+
+ double totalCompBpTime = table.sum();
LOG.info(String.format("Component: %s, bpTime: %.0f", compName, totalCompBpTime));
+
if (totalCompBpTime >= 1000) {
totalCompBpTime = 999;
+ LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
}
- LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
double unusedCapacity = (1.0 * totalCompBpTime) / (1000 - totalCompBpTime);
+
// scale up fencing: do not scale more than 4 times the current size
unusedCapacity = unusedCapacity > 4.0 ? 4.0 : unusedCapacity;
- int parallelism = (int) Math.ceil(componentMetrics.getMetrics().size() * (1 + unusedCapacity));
+ int parallelism = (int) Math.ceil(table.uniqueInstances().size() * (1 + unusedCapacity));
LOG.info(String.format("Component's, %s, unused capacity is: %.3f. New parallelism: %d",
compName, unusedCapacity, parallelism));
return parallelism;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
index 275084f..76829cf 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
@@ -16,16 +16,15 @@
package com.twitter.heron.healthmgr.sensors;
import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.inject.Inject;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
@@ -34,8 +33,6 @@ import com.twitter.heron.healthmgr.common.TopologyProvider;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
public class BackPressureSensor extends BaseSensor {
- private static final Logger LOG = Logger.getLogger(BackPressureSensor.class.getName());
-
private final MetricsProvider metricsProvider;
private final PackingPlanProvider packingPlanProvider;
private final TopologyProvider topologyProvider;
@@ -51,72 +48,46 @@ public class BackPressureSensor extends BaseSensor {
this.metricsProvider = metricsProvider;
}
- @Override
- public Map<String, ComponentMetrics> get(String... components) {
- return get();
- }
-
/**
* Computes the average (millis/sec) back-pressure caused by instances in the configured window
*
- * @return the average value
+ * @return the average value measurements
*/
- public Map<String, ComponentMetrics> get() {
- Map<String, ComponentMetrics> result = new HashMap<>();
+ @Override
+ public Collection<Measurement> fetch() {
+ Collection<Measurement> result = new ArrayList<>();
+ Instant now = context.checkpoint();
String[] boltComponents = topologyProvider.getBoltNames();
- for (String boltComponent : boltComponents) {
- String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+ Duration duration = getDuration();
+ for (String component : boltComponents) {
+ String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
- Duration duration = getDuration();
- Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
- for (String boltInstanceName : boltInstanceNames) {
- String metric = getMetricName() + boltInstanceName;
- Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
- metric, duration, COMPONENT_STMGR);
+ for (String instance : boltInstanceNames) {
+ String metric = getMetricName() + instance;
- if (stmgrResult.get(COMPONENT_STMGR) == null) {
+ Collection<Measurement> stmgrResult
+ = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+ if (stmgrResult.isEmpty()) {
continue;
}
- HashMap<String, InstanceMetrics> streamManagerResult =
- stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
- if (streamManagerResult.isEmpty()) {
+ MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+ if (table.size() == 0) {
continue;
}
-
- // since a bolt instance belongs to one stream manager,
- // for tracker rest api: expect just one metrics manager instance in the result;
- // for tmaster/metricscache stat interface: expect a list
- Double valueSum = 0.0;
- for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
- it.hasNext();) {
- InstanceMetrics stmgrInstanceResult = it.next();
-
- Double val = stmgrInstanceResult.getMetricValueSum(metric);
- if (val == null) {
- continue;
- } else {
- valueSum += val;
- }
- }
- double averageBp = valueSum / duration.getSeconds();
+ double averageBp = table.type(metric).sum() / duration.getSeconds();
// The maximum value of averageBp should be 1000, i.e. 1000 millis of BP per second. Due to
// a bug in Heron (Issue: 1753), this value could be higher in some cases. The following
// check partially corrects the reported BP value
averageBp = averageBp > 1000 ? 1000 : averageBp;
- InstanceMetrics boltInstanceMetric
- = new InstanceMetrics(boltInstanceName, getMetricName(), averageBp);
- instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+ Measurement measurement
+ = new Measurement(component, instance, getMetricName(), now, averageBp);
+ result.add(measurement);
}
-
- ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
- result.put(boltComponent, componentMetrics);
}
-
return result;
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
index 61e2b0b..18b175a 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
@@ -15,8 +15,11 @@
package com.twitter.heron.healthmgr.sensors;
import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
import com.microsoft.dhalion.api.ISensor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
@@ -24,12 +27,13 @@ import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
public abstract class BaseSensor implements ISensor {
static final Duration DEFAULT_METRIC_DURATION = Duration.ofSeconds(300);
static final String COMPONENT_STMGR = "__stmgr__";
+ protected ExecutionContext context;
public enum MetricName {
METRIC_EXE_COUNT("__execute-count/default"),
METRIC_BACK_PRESSURE("__time_spent_back_pressure_by_compid/"),
- METRIC_BUFFER_SIZE("__connection_buffer_by_instanceid/"),
- METRIC_BUFFER_SIZE_SUFFIX("/bytes"),
+ METRIC_WAIT_Q_SIZE("__connection_buffer_by_instanceid/"),
+ METRIC_WAIT_Q_SIZE_SUFFIX("/bytes"),
METRIC_WAIT_Q_GROWTH_RATE("METRIC_WAIT_Q_GROWTH_RATE");
private String text;
@@ -78,7 +82,17 @@ public abstract class BaseSensor implements ISensor {
return value;
}
- public String getMetricName() {
+ @Override
+ public void initialize(ExecutionContext ctxt) {
+ this.context = ctxt;
+ }
+
+ @Override
+ public Collection<String> getMetricTypes() {
+ return Collections.singletonList(metricName);
+ }
+
+ String getMetricName() {
return metricName;
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
index 2657e62..aa81b37 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
@@ -15,24 +15,22 @@
package com.twitter.heron.healthmgr.sensors;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
import javax.inject.Inject;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.common.TopologyProvider;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
public class BufferSizeSensor extends BaseSensor {
private final MetricsProvider metricsProvider;
@@ -44,80 +42,46 @@ public class BufferSizeSensor extends BaseSensor {
PackingPlanProvider packingPlanProvider,
TopologyProvider topologyProvider,
MetricsProvider metricsProvider) {
- super(policyConfig, METRIC_BUFFER_SIZE.text(), BufferSizeSensor.class.getSimpleName());
+ super(policyConfig, METRIC_WAIT_Q_SIZE.text(), BufferSizeSensor.class.getSimpleName());
this.packingPlanProvider = packingPlanProvider;
this.topologyProvider = topologyProvider;
this.metricsProvider = metricsProvider;
}
- @Override
- public Map<String, ComponentMetrics> get() {
- return get(topologyProvider.getBoltNames());
- }
-
/**
* The buffer size as provided by tracker
*
- * @return buffer size
+ * @return buffer size measurements
*/
- public Map<String, ComponentMetrics> get(String... desiredBoltNames) {
- Map<String, ComponentMetrics> result = new HashMap<>();
-
- Set<String> boltNameFilter = new HashSet<>();
- if (desiredBoltNames.length > 0) {
- boltNameFilter.addAll(Arrays.asList(desiredBoltNames));
- }
+ @Override
+ public Collection<Measurement> fetch() {
+ Collection<Measurement> result = new ArrayList<>();
+ Instant now = context.checkpoint();
String[] boltComponents = topologyProvider.getBoltNames();
- for (String boltComponent : boltComponents) {
- if (!boltNameFilter.isEmpty() && !boltNameFilter.contains(boltComponent)) {
- continue;
- }
+ Duration duration = getDuration();
- String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+ for (String component : boltComponents) {
+ String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
+ for (String instance : boltInstanceNames) {
+ String metric = getMetricName() + instance + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
- Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
- for (String boltInstanceName : boltInstanceNames) {
- String metric = getMetricName() + boltInstanceName + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
-
- Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
- metric,
- getDuration(),
- COMPONENT_STMGR);
-
- if (stmgrResult.get(COMPONENT_STMGR) == null) {
+ Collection<Measurement> stmgrResult
+ = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+ if (stmgrResult.isEmpty()) {
continue;
}
- HashMap<String, InstanceMetrics> streamManagerResult =
- stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
- if (streamManagerResult.isEmpty()) {
+ MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+ if (table.size() == 0) {
continue;
}
+ double totalSize = table.type(metric).sum();
- // since a bolt instance belongs to one stream manager, expect just one metrics
- // manager instance in the result
- Double stmgrInstanceResult = 0.0;
- for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
- it.hasNext();) {
- InstanceMetrics iMetrics = it.next();
- Double val = iMetrics.getMetricValueSum(metric);
- if (val == null) {
- continue;
- } else {
- stmgrInstanceResult += val;
- }
- }
-
- InstanceMetrics boltInstanceMetric =
- new InstanceMetrics(boltInstanceName, getMetricName(), stmgrInstanceResult);
-
- instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+ Measurement measurement
+ = new Measurement(component, instance, getMetricName(), now, totalSize);
+ result.add(measurement);
}
-
- ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
- result.put(boltComponent, componentMetrics);
}
return result;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
index 47119c1..21ee98a 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
@@ -15,12 +15,15 @@
package com.twitter.heron.healthmgr.sensors;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import javax.inject.Inject;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
import com.twitter.heron.healthmgr.common.TopologyProvider;
@@ -30,6 +33,7 @@ import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_E
public class ExecuteCountSensor extends BaseSensor {
private final TopologyProvider topologyProvider;
private final MetricsProvider metricsProvider;
+ private Instant now;
@Inject
ExecuteCountSensor(TopologyProvider topologyProvider,
@@ -40,14 +44,10 @@ public class ExecuteCountSensor extends BaseSensor {
this.metricsProvider = metricsProvider;
}
- public Map<String, ComponentMetrics> get() {
- String[] boltNames = topologyProvider.getBoltNames();
- return get(boltNames);
- }
-
- public Map<String, ComponentMetrics> get(String... boltNames) {
- return metricsProvider.getComponentMetrics(getMetricName(),
- getDuration(),
- boltNames);
+ @Override
+ public Collection<Measurement> fetch() {
+ List<String> bolts = Arrays.asList(topologyProvider.getBoltNames());
+ now = context.checkpoint();
+ return metricsProvider.getMeasurements(now, getDuration(), getMetricTypes(), bolts);
}
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
index f3e7024..44728ae 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
@@ -18,8 +18,8 @@ package com.twitter.heron.healthmgr.sensors;
import java.net.HttpURLConnection;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -29,8 +29,7 @@ import javax.inject.Named;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.InvalidProtocolBufferException;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
import com.twitter.heron.proto.system.Common.StatusCode;
import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -51,7 +50,6 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
private final SchedulerStateManagerAdaptor stateManagerAdaptor;
private final String topologyName;
- private Clock clock = new Clock();
private String metricsCacheLocation;
@Inject
@@ -64,35 +62,29 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
}
@Override
- public Map<String, ComponentMetrics> getComponentMetrics(String metric,
- Instant startTime,
- Duration duration,
- String... components) {
- Map<String, ComponentMetrics> result = new HashMap<>();
- for (String component : components) {
- TopologyMaster.MetricResponse response =
- getMetricsFromMetricsCache(metric, component, startTime, duration);
-
- Map<String, InstanceMetrics> metrics = parse(response, component, metric, startTime);
- ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
- result.put(component, componentMetric);
+ public Collection<Measurement> getMeasurements(Instant startTime,
+ Duration duration,
+ Collection<String> metricNames,
+ Collection<String> components) {
+ Collection<Measurement> result = new ArrayList<>();
+ for (String metric : metricNames) {
+ for (String component : components) {
+ TopologyMaster.MetricResponse response =
+ getMetricsFromMetricsCache(metric, component, startTime, duration);
+ Collection<Measurement> measurements = parse(response, component, metric, startTime);
+ LOG.fine(String.format("%d measurements received for %s/%s",
+ measurements.size(), component, metric));
+ result.addAll(measurements);
+ }
}
return result;
}
- @Override
- public Map<String, ComponentMetrics> getComponentMetrics(String metric,
- Duration duration,
- String... components) {
- Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
- return getComponentMetrics(metric, start, duration, components);
- }
-
@VisibleForTesting
@SuppressWarnings("unchecked")
- Map<String, InstanceMetrics> parse(
+ Collection<Measurement> parse(
TopologyMaster.MetricResponse response, String component, String metric, Instant startTime) {
- Map<String, InstanceMetrics> metricsData = new HashMap<>();
+ Collection<Measurement> metricsData = new ArrayList();
if (response == null || !response.getStatus().getStatus().equals(StatusCode.OK)) {
LOG.info(String.format(
@@ -109,29 +101,32 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
// convert heron.protobuf.taskMetrics to dhalion.InstanceMetrics
for (TaskMetric tm : response.getMetricList()) {
String instanceId = tm.getInstanceId();
- InstanceMetrics instanceMetrics = new InstanceMetrics(instanceId);
-
for (IndividualMetric im : tm.getMetricList()) {
String metricName = im.getName();
- Map<Instant, Double> values = new HashMap<>();
// case 1
for (IntervalValue iv : im.getIntervalValuesList()) {
MetricInterval mi = iv.getInterval();
String value = iv.getValue();
- values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
+ Measurement measurement = new Measurement(
+ component,
+ instanceId,
+ metricName,
+ Instant.ofEpochSecond(mi.getStart()),
+ Double.parseDouble(value));
+ metricsData.add(measurement);
}
// case 2
if (im.hasValue()) {
- values.put(startTime, Double.parseDouble(im.getValue()));
- }
-
- if (!values.isEmpty()) {
- instanceMetrics.addMetric(metricName, values);
+ Measurement measurement = new Measurement(
+ component,
+ instanceId,
+ metricName,
+ startTime,
+ Double.parseDouble(im.getValue()));
+ metricsData.add(measurement);
}
}
-
- metricsData.put(instanceId, instanceMetrics);
}
return metricsData;
@@ -145,8 +140,8 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
.setComponentName(component)
.setExplicitInterval(
MetricInterval.newBuilder()
- .setStart(start.getEpochSecond())
- .setEnd(start.plus(duration).getEpochSecond())
+ .setStart(start.minus(duration).getEpochSecond())
+ .setEnd(start.getEpochSecond())
.build())
.addMetric(metric)
.build();
@@ -179,11 +174,6 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
}
}
- @VisibleForTesting
- void setClock(Clock clock) {
- this.clock = clock;
- }
-
/* returns last known location of metrics cache
*/
private synchronized String getCacheLocation() {
@@ -200,10 +190,4 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
private synchronized void resetCacheLocation() {
metricsCacheLocation = null;
}
-
- static class Clock {
- long currentTime() {
- return System.currentTimeMillis();
- }
- }
}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
index f311842..8964a67 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
@@ -17,7 +17,8 @@ package com.twitter.heron.healthmgr.sensors;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -34,8 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.jayway.jsonpath.DocumentContext;
import com.jayway.jsonpath.JsonPath;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
import net.minidev.json.JSONArray;
@@ -49,8 +49,6 @@ public class TrackerMetricsProvider implements MetricsProvider {
private static final Logger LOG = Logger.getLogger(TrackerMetricsProvider.class.getName());
private final WebTarget baseTarget;
- private Clock clock = new Clock();
-
@Inject
public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
@Named(CONF_TOPOLOGY_NAME) String topologyName,
@@ -68,35 +66,26 @@ public class TrackerMetricsProvider implements MetricsProvider {
}
@Override
- public Map<String, ComponentMetrics> getComponentMetrics(String metric,
- Instant startTime,
- Duration duration,
- String... components) {
- Map<String, ComponentMetrics> result = new HashMap<>();
- for (String component : components) {
- String response = getMetricsFromTracker(metric, component, startTime, duration);
- Map<String, InstanceMetrics> metrics = parse(response, component, metric);
- ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
- result.put(component, componentMetric);
+ public Collection<Measurement> getMeasurements(Instant startTime,
+ Duration duration,
+ Collection<String> metricNames,
+ Collection<String> components) {
+ Collection<Measurement> result = new ArrayList<>();
+ for (String metric : metricNames) {
+ for (String component : components) {
+ String response = getMetricsFromTracker(metric, component, startTime, duration);
+ Collection<Measurement> measurements = parse(response, component, metric);
+ LOG.fine(String.format("%d measurements received for %s/%s",
+ measurements.size(), component, metric));
+ result.addAll(measurements);
+ }
}
return result;
}
- @Override
- public Map<String, ComponentMetrics> getComponentMetrics(String metric,
- Duration duration,
- String... components) {
- Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
- return getComponentMetrics(metric, start, duration, components);
- }
-
@SuppressWarnings("unchecked")
- private Map<String, InstanceMetrics> parse(String response, String component, String metric) {
- Map<String, InstanceMetrics> metricsData = new HashMap<>();
-
- if (response == null || response.isEmpty()) {
- return metricsData;
- }
+ private Collection<Measurement> parse(String response, String component, String metric) {
+ Collection<Measurement> metricsData = new ArrayList();
DocumentContext result = JsonPath.parse(response);
JSONArray jsonArray = result.read("$.." + metric);
@@ -113,14 +102,15 @@ public class TrackerMetricsProvider implements MetricsProvider {
for (String instanceName : metricsMap.keySet()) {
Map<String, String> tmpValues = (Map<String, String>) metricsMap.get(instanceName);
- Map<Instant, Double> values = new HashMap<>();
for (String timeStamp : tmpValues.keySet()) {
- values.put(Instant.ofEpochSecond(Long.parseLong(timeStamp)),
+ Measurement measurement = new Measurement(
+ component,
+ instanceName,
+ metric,
+ Instant.ofEpochSecond(Long.parseLong(timeStamp)),
Double.parseDouble(tmpValues.get(timeStamp)));
+ metricsData.add(measurement);
}
- InstanceMetrics instanceMetrics = new InstanceMetrics(instanceName);
- instanceMetrics.addMetric(metric, values);
- metricsData.put(instanceName, instanceMetrics);
}
return metricsData;
@@ -131,23 +121,12 @@ public class TrackerMetricsProvider implements MetricsProvider {
WebTarget target = baseTarget
.queryParam("metricname", metric)
.queryParam("component", component)
- .queryParam("starttime", start.getEpochSecond())
- .queryParam("endtime", start.getEpochSecond() + duration.getSeconds());
+ .queryParam("starttime", start.getEpochSecond() - duration.getSeconds())
+ .queryParam("endtime", start.getEpochSecond());
LOG.log(Level.FINE, "Tracker Query URI: {0}", target.getUri());
Response r = target.request(MediaType.APPLICATION_JSON_TYPE).get();
return r.readEntity(String.class);
}
-
- @VisibleForTesting
- void setClock(Clock clock) {
- this.clock = clock;
- }
-
- static class Clock {
- long currentTime() {
- return System.currentTimeMillis();
- }
- }
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
deleted file mode 100644
index b75c803..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
-
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
-
-public class TestUtils {
- public static List<Symptom> createBpSymptomList(int... bpValues) {
- return createListFromSymptom(createBPSymptom(bpValues));
- }
-
- public static Symptom createExeCountSymptom(int... exeCounts) {
- return createSymptom(SYMPTOM_PROCESSING_RATE_SKEW, METRIC_EXE_COUNT, exeCounts);
- }
-
- public static Symptom createWaitQueueDisparitySymptom(int... bufferSizes) {
- return createSymptom(SYMPTOM_WAIT_Q_DISPARITY, METRIC_BUFFER_SIZE, bufferSizes);
- }
-
- private static Symptom createBPSymptom(int... bpValues) {
- return createSymptom(SYMPTOM_BACK_PRESSURE, METRIC_BACK_PRESSURE, bpValues);
- }
-
- private static void addInstanceMetric(ComponentMetrics metrics, int i, double val, String metric) {
- InstanceMetrics instanceMetric = new InstanceMetrics("container_1_bolt_" + i, metric, val);
- metrics.addInstanceMetric(instanceMetric);
- }
-
- private static Symptom createSymptom(SymptomName symptom, MetricName metric, int... values) {
- ComponentMetrics compMetrics = new ComponentMetrics("bolt");
- for (int i = 0; i < values.length; i++) {
- addInstanceMetric(compMetrics, i, values[i], metric.text());
- }
- return new Symptom(symptom.text(), compMetrics);
- }
-
- private static List<Symptom> createListFromSymptom(Symptom symptom) {
- List<Symptom> symptoms = new ArrayList<>();
- symptoms.add(symptom);
- return symptoms;
- }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
deleted file mode 100644
index 7df7fce..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-import static org.junit.Assert.assertEquals;
-
-public class ComponentMetricsHelperTest {
-
- @Test
- public void detectsMultipleCompIncreasingBuffer() {
- ComponentMetrics compMetrics;
- InstanceMetrics instanceMetrics;
- Map<Instant, Double> bufferSizes;
-
- compMetrics = new ComponentMetrics("bolt");
-
- instanceMetrics = new InstanceMetrics("i1");
- bufferSizes = new HashMap<>();
- bufferSizes.put(Instant.ofEpochSecond(1497892210), 0.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892330), 600.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892390), 900.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892450), 1200.0);
- instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
- compMetrics.addInstanceMetric(instanceMetrics);
-
- instanceMetrics = new InstanceMetrics("i2");
- bufferSizes = new HashMap<>();
- bufferSizes.put(Instant.ofEpochSecond(1497892270), 0.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892330), 180.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892390), 360.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892450), 540.0);
- instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
- compMetrics.addInstanceMetric(instanceMetrics);
-
- ComponentMetricsHelper helper = new ComponentMetricsHelper(compMetrics);
- helper.computeBufferSizeTrend();
- assertEquals(5, helper.getMaxBufferChangeRate(), 0.1);
-
- HashMap<String, InstanceMetrics> metrics = compMetrics.getMetrics();
- assertEquals(1, metrics.get("i1").getMetrics().get(METRIC_WAIT_Q_GROWTH_RATE.text()).size());
- assertEquals(5, metrics.get("i1").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
- assertEquals(3, metrics.get("i2").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
- }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
index 3d45ec9..2316416 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class PackingPlanProviderTest {
- String topologyName = "topologyName";
+ private String topologyName = "topologyName";
private EventManager eventManager = new EventManager();
@Test
@@ -67,7 +67,7 @@ public class PackingPlanProviderTest {
PackingPlan packing = provider.get();
Assert.assertEquals(1, packing.getContainers().size());
- provider.onEvent(new TopologyUpdate());
+ provider.onEvent(new TopologyUpdate(null, null));
provider.get();
verify(adaptor, times(2)).getPackingPlan(topologyName);
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
index 3b7a538..bff793b 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
@@ -58,7 +58,7 @@ public class TopologyProviderTest {
Assert.assertEquals(2, provider.get().getBoltsCount());
// once fetched it is cached
- provider.onEvent(new TopologyUpdate());
+ provider.onEvent(new TopologyUpdate(null, null));
provider.get();
verify(adaptor, times(2)).getPhysicalPlan(topology);
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
index b5dd2f2..ad6c266 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
@@ -14,51 +14,82 @@
package com.twitter.heron.healthmgr.detectors;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class BackPressureDetectorTest {
+ Instant now;
+
+ @Before
+ public void setup() {
+ now = Instant.now();
+ }
@Test
public void testConfigAndFilter() {
HealthPolicyConfig config = mock(HealthPolicyConfig.class);
when(config.getConfig(CONF_NOISE_FILTER, 20)).thenReturn(50);
- ComponentMetrics compMetrics =
- new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 55);
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
- topologyMetrics.put("bolt", compMetrics);
- BackPressureSensor sensor = mock(BackPressureSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 55);
+ Measurement measurement2
+ = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+ Measurement measurement3
+ = new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0);
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+ metrics.add(measurement3);
+
+ BackPressureDetector detector = new BackPressureDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+ detector.initialize(context);
+ Collection<Symptom> symptoms = detector.detect(metrics);
+
+ Assert.assertEquals(2, symptoms.size());
+ SymptomsTable compSymptom = SymptomsTable.of(symptoms).type(SYMPTOM_COMP_BACK_PRESSURE.text());
+ Assert.assertEquals(1,compSymptom.size());
+ Assert.assertEquals(1, compSymptom.get().iterator().next().assignments().size());
- BackPressureDetector detector = new BackPressureDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
+ SymptomsTable instanceSymptom
+ = SymptomsTable.of(symptoms).type(SYMPTOM_INSTANCE_BACK_PRESSURE.text());
+ Assert.assertEquals(1, instanceSymptom.size());
+ Assert.assertEquals(1, instanceSymptom.get().iterator().next().assignments().size());
- Assert.assertEquals(1, symptoms.size());
+ Symptom symptom = symptoms.iterator().next();
- compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 45);
- topologyMetrics.put("bolt", compMetrics);
- sensor = mock(BackPressureSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
+ measurement1
+ = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 45);
+ measurement2
+ = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+ metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
- detector = new BackPressureDetector(sensor, config);
- symptoms = detector.detect();
+ detector = new BackPressureDetector(config);
+ detector.initialize(context);
+ symptoms = detector.detect(metrics);
Assert.assertEquals(0, symptoms.size());
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
index 8508e74..26b48bf 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
@@ -15,21 +15,19 @@
package com.twitter.heron.healthmgr.detectors;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Test;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
import static com.twitter.heron.healthmgr.detectors.GrowingWaitQueueDetector.CONF_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -40,52 +38,64 @@ public class GrowingWaitQueueDetectorTest {
HealthPolicyConfig config = mock(HealthPolicyConfig.class);
when(config.getConfig(CONF_LIMIT, 10.0)).thenReturn(5.0);
- ComponentMetrics compMetrics;
- InstanceMetrics instanceMetrics;
- Map<Instant, Double> bufferSizes;
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-
- instanceMetrics = new InstanceMetrics("i1");
- bufferSizes = new HashMap<>();
- bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892330), 700.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892390), 1000.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892450), 1300.0);
- instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
- compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(instanceMetrics);
-
- topologyMetrics.put("bolt", compMetrics);
-
- BufferSizeSensor sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
-
- GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 0.0);
+ Measurement measurement2
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892270), 300.0);
+ Measurement measurement3
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892330), 700.0);
+ Measurement measurement4
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892390), 1000.0);
+ Measurement measurement5
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892450), 1300.0);
+
+
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+ metrics.add(measurement3);
+ metrics.add(measurement4);
+ metrics.add(measurement5);
+
+ GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ detector.initialize(context);
+ Collection<Symptom> symptoms = detector.detect(metrics);
assertEquals(1, symptoms.size());
-
- instanceMetrics = new InstanceMetrics("i1");
- bufferSizes = new HashMap<>();
- bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892270), 200.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892330), 400.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892390), 600.0);
- bufferSizes.put(Instant.ofEpochSecond(1497892450), 800.0);
- instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
- compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(instanceMetrics);
-
- topologyMetrics.put("bolt", compMetrics);
-
- sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
-
- detector = new GrowingWaitQueueDetector(sensor, config);
- symptoms = detector.detect();
+ assertEquals(1, symptoms.iterator().next().assignments().size());
+
+ measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 0.0);
+ measurement2
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892270), 200.0);
+ measurement3
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892330), 400.0);
+ measurement4
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892390), 600.0);
+ measurement5
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892450), 800.0);
+
+ metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+ metrics.add(measurement3);
+ metrics.add(measurement4);
+ metrics.add(measurement5);
+
+ detector = new GrowingWaitQueueDetector(config);
+ symptoms = detector.detect(metrics);
assertEquals(0, symptoms.size());
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
index a081c9e..1e1bb6a 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
@@ -14,20 +14,20 @@
package com.twitter.heron.healthmgr.detectors;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Test;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
import static com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector.CONF_SIZE_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -38,28 +38,40 @@ public class LargeWaitQueueDetectorTest {
HealthPolicyConfig config = mock(HealthPolicyConfig.class);
when(config.getConfig(CONF_SIZE_LIMIT, 1000)).thenReturn(20);
- ComponentMetrics compMetrics
- = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 21);
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 21);
+ Measurement measurement2
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892322), 21);
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
- topologyMetrics.put("bolt", compMetrics);
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
- BufferSizeSensor sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
-
- LargeWaitQueueDetector detector = new LargeWaitQueueDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
+ LargeWaitQueueDetector detector = new LargeWaitQueueDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ detector.initialize(context);
+ Collection<Symptom> symptoms = detector.detect(metrics);
assertEquals(1, symptoms.size());
+ assertEquals(1, symptoms.iterator().next().assignments().size());
+
- compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 19);
- topologyMetrics.put("bolt", compMetrics);
+ measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 11);
+ measurement2
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892322), 10);
- sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
+ metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
- detector = new LargeWaitQueueDetector(sensor, config);
- symptoms = detector.detect();
+ detector = new LargeWaitQueueDetector(config);
+ symptoms = detector.detect(metrics);
assertEquals(0, symptoms.size());
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
index edf8115..f97dcd2 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
@@ -14,59 +14,111 @@
package com.twitter.heron.healthmgr.detectors;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Test;
import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
import static com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector.CONF_SKEW_RATIO;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ProcessingRateSkewDetectorTest {
+
@Test
- public void testConfigAndFilter() {
+ public void testGetMaxMin() {
HealthPolicyConfig config = mock(HealthPolicyConfig.class);
when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
- ComponentMetrics compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
- compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ Measurement measurement2
+ = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892122), 3000);
+ Measurement measurement3
+ = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 200.0);
+ Measurement measurement4
+ = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 400.0);
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
- topologyMetrics.put("bolt", compMetrics);
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+ metrics.add(measurement3);
+ metrics.add(measurement4);
- ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
- when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
+ MeasurementsTable metricsTable = MeasurementsTable.of(metrics);
- ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
- assertEquals(1, symptoms.size());
+ ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
- compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
- compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
- topologyMetrics.put("bolt", compMetrics);
+ assertEquals(2000, (int) detector.getMaxOfAverage(metricsTable));
+ assertEquals(300, (int) detector.getMinOfAverage(metricsTable));
- sensor = mock(ExecuteCountSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
+ }
- detector = new ProcessingRateSkewDetector(sensor, config);
- symptoms = detector.detect();
+ @Test
+ public void testConfigAndFilter() {
+ HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+ when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
+
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ Measurement measurement2
+ = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 200.0);
+
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+
+ ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ detector.initialize(context);
+
+ Collection<Symptom> symptoms = detector.detect(metrics);
+
+ assertEquals(3, symptoms.size());
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).size());
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+
+ measurement1
+ = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ measurement2
+ = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 500.0);
+
+ metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+
+ detector = new ProcessingRateSkewDetector(config);
+ detector.initialize(context);
+ symptoms = detector.detect(metrics);
assertEquals(0, symptoms.size());
}
@@ -76,43 +128,60 @@ public class ProcessingRateSkewDetectorTest {
HealthPolicyConfig config = mock(HealthPolicyConfig.class);
when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
- ComponentMetrics compMetrics1 = new ComponentMetrics("bolt-1");
- compMetrics1.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
- compMetrics1.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
- ComponentMetrics compMetrics2 = new ComponentMetrics("bolt-2");
- compMetrics2.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
- compMetrics2.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
- ComponentMetrics compMetrics3 = new ComponentMetrics("bolt-3");
- compMetrics3.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
- compMetrics3.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
-
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
- topologyMetrics.put("bolt-1", compMetrics1);
- topologyMetrics.put("bolt-2", compMetrics2);
- topologyMetrics.put("bolt-3", compMetrics3);
-
- ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
- when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
-
- ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
-
- assertEquals(2, symptoms.size());
- for (Symptom symptom : symptoms) {
- if (symptom.getComponent().getName().equals("bolt-1")) {
- compMetrics1 = null;
- } else if (symptom.getComponent().getName().equals("bolt-2")) {
- compMetrics2 = null;
- } else if (symptom.getComponent().getName().equals("bolt-3")) {
- compMetrics3 = null;
- }
- }
-
- assertNull(compMetrics1);
- assertNull(compMetrics2);
- assertNotNull(compMetrics3);
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ Measurement measurement2
+ = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 200.0);
+
+
+ Measurement measurement3
+ = new Measurement("bolt2", "i3", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ Measurement measurement4
+ = new Measurement("bolt2", "i4", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 200.0);
+
+
+ Measurement measurement5
+ = new Measurement("bolt3", "i5", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 1000);
+ Measurement measurement6
+ = new Measurement("bolt3", "i6", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+ (1497892222), 500.0);
+
+
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+ metrics.add(measurement3);
+ metrics.add(measurement4);
+ metrics.add(measurement5);
+ metrics.add(measurement6);
+
+ ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ detector.initialize(context);
+
+ Collection<Symptom> symptoms = detector.detect(metrics);
+
+ assertEquals(6, symptoms.size());
+
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+ assertEquals(2, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).size());
+ assertEquals(2, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).size());
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i3").size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i4").size());
+
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
deleted file mode 100644
index a22e303..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.detectors;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
-
-import static com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector.CONF_DISPARITY_RATIO;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class WaitQueueDisparityDetectorTest {
- @Test
- public void testConfigAndFilter() {
- HealthPolicyConfig config = mock(HealthPolicyConfig.class);
- when(config.getConfig(CONF_DISPARITY_RATIO, 20.0)).thenReturn(15.0);
-
- ComponentMetrics compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1501));
- compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 100));
-
- Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
- topologyMetrics.put("bolt", compMetrics);
-
- BufferSizeSensor sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
- when(sensor.getMetricName()).thenReturn(METRIC_BUFFER_SIZE.text());
-
- WaitQueueDisparityDetector detector = new WaitQueueDisparityDetector(sensor, config);
- List<Symptom> symptoms = detector.detect();
-
- assertEquals(1, symptoms.size());
-
- compMetrics = new ComponentMetrics("bolt");
- compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1500));
- compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 110));
- topologyMetrics.put("bolt", compMetrics);
-
- sensor = mock(BufferSizeSensor.class);
- when(sensor.get()).thenReturn(topologyMetrics);
-
- detector = new WaitQueueDisparityDetector(sensor, config);
- symptoms = detector.detect();
-
- assertEquals(0, symptoms.size());
- }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
new file mode 100644
index 0000000..e73d1c6
--- /dev/null
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.healthmgr.detectors;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+
+import org.junit.Test;
+
+import com.twitter.heron.healthmgr.HealthPolicyConfig;
+
+import static com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector.CONF_SKEW_RATIO;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WaitQueueSkewDetectorTest {
+ @Test
+ public void testConfigAndFilter() {
+ HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+ when(config.getConfig(CONF_SKEW_RATIO, 20.0)).thenReturn(15.0);
+
+ Measurement measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 1501);
+ Measurement measurement2
+ = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 100.0);
+
+ Collection<Measurement> metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+
+ WaitQueueSkewDetector detector = new WaitQueueSkewDetector(config);
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ detector.initialize(context);
+ Collection<Symptom> symptoms = detector.detect(metrics);
+
+ assertEquals(3, symptoms.size());
+ SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+ assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i1").size());
+ assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+ .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i2").size());
+
+
+ measurement1
+ = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 1500);
+ measurement2
+ = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+ (1497892222), 110.0);
+
+ metrics = new ArrayList<>();
+ metrics.add(measurement1);
+ metrics.add(measurement2);
+
+ detector = new WaitQueueSkewDetector(config);
+ detector.initialize(context);
+ symptoms = detector.detect(metrics);
+
+ assertEquals(0, symptoms.size());
+ }
+}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
index 8a1b481..3602548 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
@@ -14,62 +14,122 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
+import org.junit.Before;
import org.junit.Test;
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class DataSkewDiagnoserTest {
+ private final String comp = "comp";
+ private Instant now = Instant.now();
+ private Collection<Measurement> measurements = new ArrayList<>();
+ private ExecutionContext context;
+ private IDiagnoser diagnoser;
+
+ @Before
+ public void initTestData() {
+ now = Instant.now();
+ measurements = new ArrayList<>();
+
+ context = mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+
+ diagnoser = new DataSkewDiagnoser();
+ diagnoser.initialize(context);
+ }
+
@Test
public void failsIfNoDataSkewSymptom() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123);
- Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
- assertNull(result);
+ Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+ Collection<Symptom> symptoms = Collections.singletonList(symptom);
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
}
@Test
public void diagnosis1DataSkewInstance() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
-
- Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
- assertNotNull(result);
- assertEquals(DIAGNOSIS_DATA_SKEW.text(), result.getName());
- assertEquals(1, result.getSymptoms().size());
- Symptom symptom = result.getSymptoms().values().iterator().next();
-
- assertEquals(123, symptom.getComponent()
- .getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text()).intValue());
+ addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+ addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+ addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+ when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(1, result.size());
+ Diagnosis diagnoses = result.iterator().next();
+ assertEquals(DIAGNOSIS_DATA_SKEW.text(), diagnoses.type());
+ assertEquals(1, diagnoses.assignments().size());
+ assertEquals("i1", diagnoses.assignments().iterator().next());
+ // TODO
+// assertEquals(1, diagnoses.symptoms().size());
}
@Test
public void diagnosisNoDataSkewLowBufferSize() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1, 500, 500));
+ addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+ addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+ addMeasurements(METRIC_WAIT_Q_SIZE, 1, 500, 500);
+ when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
- Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
- assertNull(result);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
}
@Test
public void diagnosisNoDataSkewLowRate() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createExeCountSymptom(100, 2000, 2000));
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
+ addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+ addMeasurements(METRIC_EXE_COUNT, 100, 2000, 2000);
+ addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+ when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
+ }
- Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
- assertNull(result);
+ private void addMeasurements(MetricName metricExeCount, int i1, int i2, int i3) {
+ measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+ measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+ measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
index 50aab9d..cb6a3cb 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
@@ -14,47 +14,99 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
+import org.junit.Before;
import org.junit.Test;
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class SlowInstanceDiagnoserTest {
+ private final String comp = "comp";
+ private IDiagnoser diagnoser;
+ private Instant now = Instant.now();
+ private Collection<Measurement> measurements = new ArrayList<>();
+ private ExecutionContext context;
+
+ @Before
+ public void initTestData() {
+ now = Instant.now();
+ measurements = new ArrayList<>();
+
+ context = mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+
+ diagnoser = new SlowInstanceDiagnoser();
+ diagnoser.initialize(context);
+ }
+
@Test
- public void failsIfNoBufferSizeDiaparity() {
- SlowInstanceDiagnoser diagnoser = new SlowInstanceDiagnoser();
- Diagnosis result = diagnoser.diagnose(TestUtils.createBpSymptomList(123));
- assertNull(result);
+ public void failsIfNoBufferSizeDisparity() {
+ Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+ Collection<Symptom> symptoms = Collections.singletonList(symptom);
+
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
}
@Test
public void diagnosis1of3SlowInstances() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1000, 20, 20));
-
- Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
- assertEquals(1, result.getSymptoms().size());
- ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
- assertEquals(123,
- data.getMetricValueSum("container_1_bolt_0",METRIC_BACK_PRESSURE.text())
- .intValue());
+ addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+ addMeasurements(METRIC_WAIT_Q_SIZE, 1000, 20, 20);
+ when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+
+ assertEquals(1, result.size());
+ Diagnosis diagnoses = result.iterator().next();
+ assertEquals(DIAGNOSIS_SLOW_INSTANCE.text(), diagnoses.type());
+ assertEquals(1, diagnoses.assignments().size());
+ assertEquals("i1", diagnoses.assignments().iterator().next());
+ // TODO
+// assertEquals(1, diagnoses.symptoms().size());
}
@Test
public void failIfInstanceWithBpHasSmallBuffer() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+ Symptom exeDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom, exeDisparitySymptom);
+
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
+ }
- Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
- assertNull(result);
+ private void addMeasurements(BaseSensor.MetricName metricExeCount, int i1, int i2, int i3) {
+ measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+ measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+ measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
index 0355308..e180ae6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
@@ -14,51 +14,83 @@
package com.twitter.heron.healthmgr.diagnosers;
-import java.util.List;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
+import org.junit.Before;
import org.junit.Test;
-import com.twitter.heron.healthmgr.TestUtils;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class UnderProvisioningDiagnoserTest {
+ private final String comp = "comp";
+ private IDiagnoser diagnoser;
+ private Instant now = Instant.now();
+ private ExecutionContext context;
+
+ @Before
+ public void initTestData() {
+ now = Instant.now();
+
+ context = mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+
+ diagnoser = new UnderProvisioningDiagnoser();
+ diagnoser.initialize(context);
+ }
+
@Test
public void diagnosisWhen1Of1InstanceInBP() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0);
- //symptoms.add(TestUtils.createLargeWaitQSymptom(5000));
- Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Collection<Symptom> symptoms = Collections.singletonList(symptom);
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
validateDiagnosis(result);
}
@Test
public void diagnosisFailsNotSimilarQueueSizes() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
- Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
- assertNull(result);
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
}
@Test
public void diagnosisFailsNotSimilarProcessingRates() {
- List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
- symptoms.add(TestUtils.createExeCountSymptom(100, 500, 500));
+ // TODO BP instance should be same as the one with high processing rate
+ Collection<String> assign = Collections.singleton(comp);
+ Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+ Symptom qDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+ Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
- Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
- assertNull(result);
+ Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+ assertEquals(0, result.size());
}
- private void validateDiagnosis(Diagnosis result) {
- assertEquals(1, result.getSymptoms().size());
- ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
- assertEquals(123,
- data.getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text())
- .intValue());
+ private void validateDiagnosis(Collection<Diagnosis> result) {
+ assertEquals(1, result.size());
+ Diagnosis diagnoses = result.iterator().next();
+ assertEquals(DIAGNOSIS_UNDER_PROVISIONING.text(), diagnoses.type());
+ assertEquals(1, diagnoses.assignments().size());
+ assertEquals(comp, diagnoses.assignments().iterator().next());
+ // TODO
+// Assert.assertEquals(1, result.getSymptoms().size());
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
index 91c634f..6bceaf1 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
@@ -14,21 +14,26 @@
package com.twitter.heron.healthmgr.resolvers;
+import java.time.Instant;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
import org.junit.Test;
+import org.mockito.Mockito;
import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.common.utils.topology.TopologyTests;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.common.TopologyProvider;
import com.twitter.heron.packing.roundrobin.RoundRobinPacking;
@@ -38,9 +43,8 @@ import com.twitter.heron.spi.common.Config;
import com.twitter.heron.spi.common.Key;
import com.twitter.heron.spi.packing.IRepacking;
import com.twitter.heron.spi.packing.PackingPlan;
-import com.twitter.heron.common.utils.topology.TopologyTests;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
@@ -67,20 +71,26 @@ public class ScaleUpResolverTest {
ISchedulerClient scheduler = mock(ISchedulerClient.class);
when(scheduler.updateTopology(any(UpdateTopologyRequest.class))).thenReturn(true);
- ComponentMetrics metrics
- = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 123);
- Symptom symptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), metrics);
- List<Diagnosis> diagnosis = new ArrayList<>();
- diagnosis.add(new Diagnosis("test", symptom));
+ Instant now = Instant.now();
+ Collections.singletonList(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 123));
+
+ List<String> assignments = Collections.singletonList("bolt");
+ Diagnosis diagnoses =
+ new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), now, assignments, null);
+ List<Diagnosis> diagnosis = Collections.singletonList(diagnoses);
+
+ ExecutionContext context = mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
ScaleUpResolver resolver
= new ScaleUpResolver(null, packingPlanProvider, scheduler, eventManager, null);
+ resolver.initialize(context);
ScaleUpResolver spyResolver = spy(resolver);
- doReturn(2).when(spyResolver).computeScaleUpFactor(metrics);
+ doReturn(2).when(spyResolver).computeScaleUpFactor("bolt");
doReturn(currentPlan).when(spyResolver).buildNewPackingPlan(any(HashMap.class), eq(currentPlan));
- List<Action> result = spyResolver.resolve(diagnosis);
+ Collection<Action> result = spyResolver.resolve(diagnosis);
verify(scheduler, times(1)).updateTopology(any(UpdateTopologyRequest.class));
assertEquals(1, result.size());
}
@@ -141,28 +151,35 @@ public class ScaleUpResolverTest {
@Test
public void testScaleUpFactorComputation() {
- ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+ Instant now = Instant.now();
+ Collection<Measurement> result = new ArrayList<>();
- ComponentMetrics metrics = new ComponentMetrics("bolt");
- metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 500));
- metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
+ ExecutionContext context = Mockito.mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+ when(context.previousCheckpoint()).thenReturn(now);
- int result = resolver.computeScaleUpFactor(metrics);
- assertEquals(4, result);
-
- metrics = new ComponentMetrics("bolt");
- metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 750));
- metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
-
- result = resolver.computeScaleUpFactor(metrics);
- assertEquals(8, result);
-
- metrics = new ComponentMetrics("bolt");
- metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 400));
- metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 100));
- metrics.addInstanceMetric(new InstanceMetrics("i3", METRIC_BACK_PRESSURE.text(), 0));
-
- result = resolver.computeScaleUpFactor(metrics);
- assertEquals(6, result);
+ ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+ resolver.initialize(context);
+
+ result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 500));
+ result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+ when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+ int factor = resolver.computeScaleUpFactor("bolt");
+ assertEquals(4, factor);
+
+ result.clear();
+ result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 750));
+ result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+ when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+ factor = resolver.computeScaleUpFactor("bolt");
+ assertEquals(8, factor);
+
+ result.clear();
+ result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 400));
+ result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 100));
+ result.add(new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0));
+ when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+ factor = resolver.computeScaleUpFactor("bolt");
+ assertEquals(6, factor);
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
index 75256f0..9091175 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
@@ -14,19 +14,27 @@
package com.twitter.heron.healthmgr.sensors;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
import org.junit.Test;
+import org.mockito.Mockito;
import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.common.TopologyProvider;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -49,27 +57,44 @@ public class BackPressureSensorTest {
MetricsProvider metricsProvider = mock(MetricsProvider.class);
for (String boltId : boltIds) {
- String metric = MetricName.METRIC_BACK_PRESSURE + boltId;
+ String metric = METRIC_BACK_PRESSURE + boltId;
// the back pressure sensor will return average bp per second, so multiply by duration
- BufferSizeSensorTest.registerStMgrInstanceMetricResponse(metricsProvider,
+ registerStMgrInstanceMetricResponse(metricsProvider,
metric,
boltId.length() * DEFAULT_METRIC_DURATION.getSeconds());
}
+
BackPressureSensor backPressureSensor =
new BackPressureSensor(packingPlanProvider, topologyProvider, null, metricsProvider);
- Map<String, ComponentMetrics> componentMetrics = backPressureSensor.get();
- assertEquals(2, componentMetrics.size());
+ ExecutionContext context = mock(ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ backPressureSensor.initialize(context);
+
+ Collection<Measurement> componentMetrics = backPressureSensor.fetch();
+ assertEquals(3, componentMetrics.size());
+ MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+ assertEquals(1, table.component("bolt-1").size());
+ assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+ .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+
+ assertEquals(2, table.component("bolt-2").size());
+ assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+ .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+ assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+ .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+ }
- assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
- assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
- .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+ static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
+ String metric,
+ long value) {
+ Instant instant = Instant.ofEpochSecond(10);
+ Measurement measurement = new Measurement("__stmgr__", "stmgr-1", metric, instant, value);
+ Collection<Measurement> result = Collections.singletonList(measurement);
- assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
- assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
- .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
- assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
- .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+ when(metricsProvider.getMeasurements(
+ any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(metric), eq("__stmgr__")))
+ .thenReturn(result);
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
index 22f62fe..b395d9c 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
@@ -14,12 +14,13 @@
package com.twitter.heron.healthmgr.sensors;
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Test;
@@ -27,7 +28,7 @@ import com.twitter.heron.healthmgr.common.PackingPlanProvider;
import com.twitter.heron.healthmgr.common.TopologyProvider;
import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -51,39 +52,31 @@ public class BufferSizeSensorTest {
MetricsProvider metricsProvider = mock(MetricsProvider.class);
for (String boltId : boltIds) {
- String metric = MetricName.METRIC_BUFFER_SIZE
- + boltId + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
- registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
+ String metric = METRIC_WAIT_Q_SIZE
+ + boltId + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
+ BackPressureSensorTest
+ .registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
}
BufferSizeSensor bufferSizeSensor =
new BufferSizeSensor(null, packingPlanProvider, topologyProvider, metricsProvider);
- Map<String, ComponentMetrics> componentMetrics = bufferSizeSensor.get();
- assertEquals(2, componentMetrics.size());
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(Instant.now());
+ bufferSizeSensor.initialize(context);
- assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
- assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
- .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
+ Collection<Measurement> componentMetrics = bufferSizeSensor.fetch();
+ assertEquals(3, componentMetrics.size());
- assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
- assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
- .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
- assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
- .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
- }
+ MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+ assertEquals(1, table.component("bolt-1").size());
+ assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+ .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
- static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
- String metric,
- long value) {
- Map<String, ComponentMetrics> result = new HashMap<>();
- ComponentMetrics metrics = new ComponentMetrics("__stmgr__");
- InstanceMetrics instanceMetrics = new InstanceMetrics("stmgr-1");
- instanceMetrics.addMetric(metric, value);
- metrics.addInstanceMetric(instanceMetrics);
- result.put("__stmgr__", metrics);
-
- when(metricsProvider.getComponentMetrics(metric, DEFAULT_METRIC_DURATION, "__stmgr__"))
- .thenReturn(result);
+ assertEquals(2, table.component("bolt-2").size());
+ assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+ .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
+ assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+ .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
index 2f5a6cc..88fcc03 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
@@ -14,12 +14,16 @@
package com.twitter.heron.healthmgr.sensors;
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
import org.junit.Test;
@@ -28,54 +32,48 @@ import com.twitter.heron.healthmgr.common.TopologyProvider;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class ExecuteCountSensorTest {
@Test
public void providesBoltExecutionCountMetrics() {
+ Instant now = Instant.now();
+ String metric = METRIC_EXE_COUNT.text();
TopologyProvider topologyProvider = mock(TopologyProvider.class);
when(topologyProvider.getBoltNames()).thenReturn(new String[]{"bolt-1", "bolt-2"});
MetricsProvider metricsProvider = mock(MetricsProvider.class);
- Map<String, ComponentMetrics> result = new HashMap<>();
+ Collection<Measurement> result = new ArrayList<>();
+ result.add(new Measurement("bolt-1", "container_1_bolt-1_1", metric, now, 123));
+ result.add(new Measurement("bolt-1", "container_1_bolt-1_2", metric, now, 345));
+ result.add(new Measurement("bolt-2", "container_1_bolt-2_3", metric, now, 321));
+ result.add(new Measurement("bolt-2", "container_1_bolt-2_4", metric, now, 543));
- ComponentMetrics metrics = new ComponentMetrics("bolt-1");
- metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_1", 123));
- metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_2", 345));
- result.put("bolt-1", metrics);
-
- metrics = new ComponentMetrics("bolt-2");
- metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_3", 321));
- metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_4", 543));
- result.put("bolt-2", metrics);
-
- when(metricsProvider
- .getComponentMetrics(METRIC_EXE_COUNT.text(), DEFAULT_METRIC_DURATION, "bolt-1", "bolt-2"))
+ Collection<String> comps = Arrays.asList("bolt-1", "bolt-2");
+ when(metricsProvider.getMeasurements(
+ any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(Collections.singletonList(metric)), eq(comps)))
.thenReturn(result);
ExecuteCountSensor executeCountSensor
= new ExecuteCountSensor(topologyProvider, null, metricsProvider);
- Map<String, ComponentMetrics> componentMetrics = executeCountSensor.get();
- assertEquals(2, componentMetrics.size());
- assertEquals(123, componentMetrics.get("bolt-1")
- .getMetrics("container_1_bolt-1_1")
- .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
- assertEquals(345, componentMetrics.get("bolt-1")
- .getMetrics("container_1_bolt-1_2")
- .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
- assertEquals(321, componentMetrics.get("bolt-2")
- .getMetrics("container_1_bolt-2_3")
- .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
- assertEquals(543, componentMetrics.get("bolt-2")
- .getMetrics("container_1_bolt-2_4")
- .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
- }
+ PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+ when(context.checkpoint()).thenReturn(now);
+ executeCountSensor.initialize(context);
- private InstanceMetrics createTestInstanceMetric(String name, int value) {
- InstanceMetrics instanceMetrics = new InstanceMetrics(name);
- instanceMetrics.addMetric(METRIC_EXE_COUNT.text(), value);
- return instanceMetrics;
+ Collection<Measurement> componentMetrics = executeCountSensor.fetch();
+ assertEquals(4, componentMetrics.size());
+ MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+ assertEquals(123, table.component("bolt-1").instance("container_1_bolt-1_1")
+ .type(metric).sum(), 0.01);
+ assertEquals(345, table.component("bolt-1").instance("container_1_bolt-1_2")
+ .type(metric).sum(), 0.01);
+ assertEquals(321, table.component("bolt-2").instance("container_1_bolt-2_3")
+ .type(metric).sum(), 0.01);
+ assertEquals(543, table.component("bolt-2").instance("container_1_bolt-2_4")
+ .type(metric).sum(), 0.01);
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
index ea087c3..60f7ef6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
@@ -17,11 +17,16 @@ package com.twitter.heron.healthmgr.sensors;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+
+import org.junit.Test;
+import org.mockito.Mockito;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
import com.twitter.heron.proto.system.Common.Status;
import com.twitter.heron.proto.system.Common.StatusCode;
import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -32,18 +37,14 @@ import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
-import org.junit.Test;
-import org.mockito.Mockito;
-
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
public class MetricsCacheMetricsProviderTest {
@Test
- public void provides1Comp2InstanceMetricsFromeMetricsCache() {
+ public void provides1Comp2InstanceMetricsFromMetricsCache() {
MetricsCacheMetricsProvider spyMetricsProvider = createMetricsProviderSpy();
String metric = "count";
@@ -83,18 +84,20 @@ public class MetricsCacheMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics =
- spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
- assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(2, metrics.get(comp).getMetrics().size());
-
- HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
- assertEquals(104,
- componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
- assertEquals(17,
- componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
+
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(4, table.component(comp).size());
+ assertEquals(2, table.uniqueInstances().size());
+ assertEquals(1, table.uniqueTypes().size());
+ assertEquals(1, table.instance("container_1_bolt_1").size());
+ assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+ assertEquals(3, table.instance("container_1_bolt_2").size());
+ assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
}
@Test
@@ -146,19 +149,22 @@ public class MetricsCacheMetricsProviderTest {
doReturn(response2).when(spyMetricsProvider)
.getMetricsFromMetricsCache(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
- assertEquals(2, metrics.size());
- assertNotNull(metrics.get(comp1));
- assertEquals(1, metrics.get(comp1).getMetrics().size());
- assertEquals(104,
- metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
- assertNotNull(metrics.get(comp2));
- assertEquals(1, metrics.get(comp2).getMetrics().size());
- assertEquals(17,
- metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ Collections.singletonList(metric),
+ Arrays.asList(comp1, comp2));
+
+ assertEquals(4, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(2, table.uniqueComponents().size());
+ assertEquals(1, table.component(comp1).size());
+ assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+ assertEquals(3, table.component(comp2).size());
+ assertEquals(1, table.uniqueTypes().size());
+ assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+ assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
}
@Test
@@ -182,15 +188,16 @@ public class MetricsCacheMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(1, metrics.get(comp).getMetrics().size());
-
- HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
- assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(1, table.component(comp).size());
+ assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
}
@Test
@@ -205,12 +212,13 @@ public class MetricsCacheMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
- assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(0, metrics.get(comp).getMetrics().size());
+ assertEquals(0, metrics.size());
}
private MetricsCacheMetricsProvider createMetricsProviderSpy() {
@@ -228,9 +236,7 @@ public class MetricsCacheMetricsProviderTest {
MetricsCacheMetricsProvider metricsProvider
= new MetricsCacheMetricsProvider(stateMgr, "testTopo");
- MetricsCacheMetricsProvider spyMetricsProvider = spy(metricsProvider);
- spyMetricsProvider.setClock(new TestClock(70000));
- return spyMetricsProvider;
+ return spy(metricsProvider);
}
@Test
@@ -274,44 +280,24 @@ public class MetricsCacheMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics =
- spyMetricsProvider
- .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
- assertEquals(1, metrics.size());
- ComponentMetrics componentMetrics = metrics.get(comp);
- assertNotNull(componentMetrics);
- assertEquals(2, componentMetrics.getMetrics().size());
-
- InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
- assertNotNull(instanceMetrics);
- assertEquals(1, instanceMetrics.getMetrics().size());
-
- Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
- assertEquals(1, metricValues.size());
- assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
- instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
- assertNotNull(instanceMetrics);
- assertEquals(1, instanceMetrics.getMetrics().size());
-
- metricValues = instanceMetrics.getMetrics().get(metric);
- assertEquals(3, metricValues.size());
- assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
- assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
- assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
- }
-
- private class TestClock extends MetricsCacheMetricsProvider.Clock {
- long timeStamp;
-
- TestClock(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- @Override
- long currentTime() {
- return timeStamp;
- }
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
+
+ assertEquals(4, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(4, table.component(comp).size());
+
+ MeasurementsTable result = table.instance("container_1_bolt_1");
+ assertEquals(1, result.size());
+ assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+ result = table.instance("container_1_bolt_2");
+ assertEquals(3, result.size());
+ assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+ assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+ assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
}
}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
index 01c71d2..2e7625f 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
@@ -17,16 +17,16 @@ package com.twitter.heron.healthmgr.sensors;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@@ -48,18 +48,21 @@ public class TrackerMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics =
- spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
- assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(2, metrics.get(comp).getMetrics().size());
-
- HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
- assertEquals(104,
- componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
- assertEquals(17,
- componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
+
+ assertEquals(4, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(4, table.component(comp).size());
+ assertEquals(2, table.uniqueInstances().size());
+ assertEquals(1, table.uniqueTypes().size());
+ assertEquals(1, table.instance("container_1_bolt_1").size());
+ assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+ assertEquals(3, table.instance("container_1_bolt_2").size());
+ assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
}
@Test
@@ -88,19 +91,22 @@ public class TrackerMetricsProviderTest {
doReturn(response2).when(spyMetricsProvider)
.getMetricsFromTracker(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
- assertEquals(2, metrics.size());
- assertNotNull(metrics.get(comp1));
- assertEquals(1, metrics.get(comp1).getMetrics().size());
- assertEquals(104,
- metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
- assertNotNull(metrics.get(comp2));
- assertEquals(1, metrics.get(comp2).getMetrics().size());
- assertEquals(17,
- metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ Collections.singletonList(metric),
+ Arrays.asList(comp1, comp2));
+
+ assertEquals(4, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(2, table.uniqueComponents().size());
+ assertEquals(1, table.component(comp1).size());
+ assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+ assertEquals(3, table.component(comp2).size());
+ assertEquals(1, table.uniqueTypes().size());
+ assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+ assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
}
@Test
@@ -118,15 +124,17 @@ public class TrackerMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
- assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(1, metrics.get(comp).getMetrics().size());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
- HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
- assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+ assertEquals(1, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(1, table.component(comp).size());
+ assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
}
@Test
@@ -141,12 +149,14 @@ public class TrackerMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics
- = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
- assertEquals(1, metrics.size());
- assertNotNull(metrics.get(comp));
- assertEquals(0, metrics.get(comp).getMetrics().size());
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
+
+ assertEquals(0, metrics.size());
}
private TrackerMetricsProvider createMetricsProviderSpy() {
@@ -154,7 +164,6 @@ public class TrackerMetricsProviderTest {
= new TrackerMetricsProvider("127.0.0.1", "topology", "dev", "env");
TrackerMetricsProvider spyMetricsProvider = spy(metricsProvider);
- spyMetricsProvider.setClock(new TestClock(70000));
return spyMetricsProvider;
}
@@ -175,44 +184,24 @@ public class TrackerMetricsProviderTest {
doReturn(response).when(spyMetricsProvider)
.getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
- Map<String, ComponentMetrics> metrics =
- spyMetricsProvider
- .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
- assertEquals(1, metrics.size());
- ComponentMetrics componentMetrics = metrics.get(comp);
- assertNotNull(componentMetrics);
- assertEquals(2, componentMetrics.getMetrics().size());
-
- InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
- assertNotNull(instanceMetrics);
- assertEquals(1, instanceMetrics.getMetrics().size());
-
- Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
- assertEquals(1, metricValues.size());
- assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
- instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
- assertNotNull(instanceMetrics);
- assertEquals(1, instanceMetrics.getMetrics().size());
-
- metricValues = instanceMetrics.getMetrics().get(metric);
- assertEquals(3, metricValues.size());
- assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
- assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
- assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
- }
-
- private class TestClock extends TrackerMetricsProvider.Clock {
- long timeStamp;
-
- TestClock(long timeStamp) {
- this.timeStamp = timeStamp;
- }
-
- @Override
- long currentTime() {
- return timeStamp;
- }
+ Collection<Measurement> metrics =
+ spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+ Duration.ofSeconds(60),
+ metric,
+ comp);
+
+ assertEquals(4, metrics.size());
+ MeasurementsTable table = MeasurementsTable.of(metrics);
+ assertEquals(4, table.component(comp).size());
+
+ MeasurementsTable result = table.instance("container_1_bolt_1");
+ assertEquals(1, result.size());
+ assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+ result = table.instance("container_1_bolt_2");
+ assertEquals(3, result.size());
+ assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+ assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+ assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
}
}
--
To stop receiving notification emails like this one, please contact
ashvin@apache.org.