You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by as...@apache.org on 2018/04/09 18:09:44 UTC

[incubator-heron] branch master updated: Update Dhalion dependency version (#2821)

This is an automated email from the ASF dual-hosted git repository.

ashvin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new cc4b2d7  Update Dhalion dependency version (#2821)
cc4b2d7 is described below

commit cc4b2d7d016e138999ab5476e178e76eac922b78
Author: Ashvin <as...@users.noreply.github.com>
AuthorDate: Mon Apr 9 11:09:41 2018 -0700

    Update Dhalion dependency version (#2821)
    
    * Refactor healthmgr for dhalion-v2 compatibility
    
    * Update health manager's metrics collection for v2
    
    * Added detectors, diagnosers
    
    * Make health resolvers dhalion-v2 compatible
    
    * Make health diagnosers dhalion-v2 compatible
    
    * Make health policy dhalion-v2 api compatible
    
    * Update MetricsCache provider as per dhalion-v2 api
    
    * Modified skew detector and tests
    
    * Tag facts with execution timestamp
    
    * Update Dhalion dependency version
    
    * Address review comments in PR #2821
    
    * Fix checkstyle and unittest errors in HealthMgr
---
 WORKSPACE                                          |  17 +-
 heron/executor/src/python/heron_executor.py        |   2 +-
 .../tests/python/heron_executor_unittest.py        |   2 +-
 heron/healthmgr/src/java/BUILD                     |   3 +
 .../healthmgr/common/ComponentMetricsHelper.java   | 111 -----------
 .../healthmgr/common/HealthManagerEvents.java      |  13 +-
 .../healthmgr/common/PhysicalPlanProvider.java     |   6 +-
 .../healthmgr/detectors/BackPressureDetector.java  |  59 +++---
 .../heron/healthmgr/detectors/BaseDetector.java    |  17 +-
 .../detectors/GrowingWaitQueueDetector.java        |  79 +++++---
 .../detectors/LargeWaitQueueDetector.java          |  64 ++++---
 .../detectors/ProcessingRateSkewDetector.java      |  11 +-
 .../heron/healthmgr/detectors/SkewDetector.java    | 101 +++++++---
 ...ityDetector.java => WaitQueueSkewDetector.java} |  15 +-
 .../heron/healthmgr/diagnosers/BaseDiagnoser.java  |  60 +-----
 .../healthmgr/diagnosers/DataSkewDiagnoser.java    |  95 +++++-----
 .../diagnosers/SlowInstanceDiagnoser.java          |  93 +++++-----
 .../diagnosers/UnderProvisioningDiagnoser.java     |  66 ++++---
 .../AutoRestartBackpressureContainerPolicy.java    |  52 ++----
 .../policy/DynamicResourceAllocationPolicy.java    |  46 +++--
 .../resolvers/RestartContainerResolver.java        | 105 ++++++-----
 .../heron/healthmgr/resolvers/ScaleUpResolver.java | 122 +++++++------
 .../healthmgr/sensors/BackPressureSensor.java      |  77 +++-----
 .../heron/healthmgr/sensors/BaseSensor.java        |  20 +-
 .../heron/healthmgr/sensors/BufferSizeSensor.java  |  90 +++------
 .../healthmgr/sensors/ExecuteCountSensor.java      |  22 +--
 .../sensors/MetricsCacheMetricsProvider.java       |  86 ++++-----
 .../healthmgr/sensors/TrackerMetricsProvider.java  |  73 +++-----
 .../com/twitter/heron/healthmgr/TestUtils.java     |  69 -------
 .../common/ComponentMetricsHelperTest.java         |  70 -------
 .../healthmgr/common/PackingPlanProviderTest.java  |   4 +-
 .../healthmgr/common/TopologyProviderTest.java     |   2 +-
 .../detectors/BackPressureDetectorTest.java        |  73 +++++---
 .../detectors/GrowingWaitQueueDetectorTest.java    | 114 ++++++------
 .../detectors/LargeWaitQueueDetectorTest.java      |  56 +++---
 .../detectors/ProcessingRateSkewDetectorTest.java  | 203 ++++++++++++++-------
 .../detectors/WaitQueueDisparityDetectorTest.java  |  71 -------
 .../detectors/WaitQueueSkewDetectorTest.java       |  88 +++++++++
 .../diagnosers/DataSkewDiagnoserTest.java          | 124 +++++++++----
 .../diagnosers/SlowInstanceDiagnoserTest.java      |  98 +++++++---
 .../diagnosers/UnderProvisioningDiagnoserTest.java |  82 ++++++---
 .../healthmgr/resolvers/ScaleUpResolverTest.java   |  87 +++++----
 .../healthmgr/sensors/BackPressureSensorTest.java  |  55 ++++--
 .../healthmgr/sensors/BufferSizeSensorTest.java    |  55 +++---
 .../healthmgr/sensors/ExecuteCountSensorTest.java  |  70 ++++---
 .../sensors/MetricsCacheMetricsProviderTest.java   | 162 ++++++++--------
 .../sensors/TrackerMetricsProviderTest.java        | 153 +++++++---------
 47 files changed, 1592 insertions(+), 1551 deletions(-)

diff --git a/WORKSPACE b/WORKSPACE
index c0c40bb..72b582f 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -460,7 +460,7 @@ maven_jar(
 
 maven_jar(
   name = "com_microsoft_dhalion",
-  artifact = "com.microsoft.dhalion:dhalion:0.0.1_2",
+  artifact = "com.microsoft.dhalion:dhalion:0.2.1",
 )
 
 maven_jar(
@@ -468,6 +468,21 @@ maven_jar(
   artifact = "org.apache.commons:commons-math3:3.6.1"
 )
 
+maven_jar(
+  name = "tech_tablesaw",
+  artifact = "tech.tablesaw:tablesaw-core:0.11.4"
+)
+
+maven_jar(
+  name = "it_unimi_dsi_fastutil",
+  artifact = "it.unimi.dsi:fastutil:8.1.1"
+)
+
+maven_jar(
+  name = "org_roaringbitmap",
+  artifact = "org.roaringbitmap:RoaringBitmap:0.6.51"
+)
+
 # Google Cloud
 maven_jar(
   name = "google_api_services_storage",
diff --git a/heron/executor/src/python/heron_executor.py b/heron/executor/src/python/heron_executor.py
index 04db9b8..3f36e5d 100755
--- a/heron/executor/src/python/heron_executor.py
+++ b/heron/executor/src/python/heron_executor.py
@@ -498,7 +498,7 @@ class HeronExecutor(object):
                      "--cluster", self.cluster,
                      "--role", self.role,
                      "--environment", self.environment,
-                     "--topology_name", self.topology_name, "--verbose"]
+                     "--topology_name", self.topology_name]
 
     return healthmgr_cmd
 
diff --git a/heron/executor/tests/python/heron_executor_unittest.py b/heron/executor/tests/python/heron_executor_unittest.py
index f6155c0..5b661e7 100644
--- a/heron/executor/tests/python/heron_executor_unittest.py
+++ b/heron/executor/tests/python/heron_executor_unittest.py
@@ -131,7 +131,7 @@ class HeronExecutorTest(unittest.TestCase):
              "-Xloggc:log-files/gc.healthmgr.log -Djava.net.preferIPv4Stack=true " \
              "-cp scheduler_classpath:healthmgr_classpath " \
              "com.twitter.heron.healthmgr.HealthManager --cluster cluster --role role " \
-             "--environment environ --topology_name topname --verbose"
+             "--environment environ --topology_name topname"
 
   def get_expected_instance_command(component_name, instance_id, container_id):
     instance_name = "container_%d_%s_%d" % (container_id, component_name, instance_id)
diff --git a/heron/healthmgr/src/java/BUILD b/heron/healthmgr/src/java/BUILD
index 2587c8f..c081a08 100644
--- a/heron/healthmgr/src/java/BUILD
+++ b/heron/healthmgr/src/java/BUILD
@@ -42,6 +42,9 @@ healthmgr_deps_files = [
     "@com_microsoft_dhalion//jar",
     "@aopalliance_aopalliance//jar",
     "@org_apache_commons_commons_math3//jar",
+    "@tech_tablesaw//jar",
+    "@it_unimi_dsi_fastutil//jar",
+    "@org_roaringbitmap//jar",
 ]
 
 filegroup(
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
deleted file mode 100644
index 6a85188..0000000
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.apache.commons.math3.stat.regression.SimpleRegression;
-
-import com.twitter.heron.healthmgr.sensors.BaseSensor;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-
-
-/**
- * A helper class to compute and hold metrics derived from component metrics
- */
-public class ComponentMetricsHelper {
-  private final ComponentMetrics componentMetrics;
-
-  private List<InstanceMetrics> boltsWithBackpressure = new ArrayList<>();
-  private double maxBufferChangeRate = 0;
-  private double totalBackpressure = 0;
-
-  public ComponentMetricsHelper(ComponentMetrics compMetrics) {
-    this.componentMetrics = compMetrics;
-  }
-
-  public void computeBpStats() {
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      Double bpValue = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (bpValue != null && bpValue > 0) {
-        boltsWithBackpressure.add(instanceMetrics);
-        totalBackpressure += bpValue;
-      }
-    }
-  }
-
-  public void computeBufferSizeTrend() {
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      Map<Instant, Double> bufferMetrics
-          = instanceMetrics.getMetrics().get(METRIC_BUFFER_SIZE.text());
-      if (bufferMetrics == null || bufferMetrics.size() < 3) {
-        // missing of insufficient data for creating a trend line
-        continue;
-      }
-
-      SimpleRegression simpleRegression = new SimpleRegression(true);
-      for (Instant timestamp : bufferMetrics.keySet()) {
-        simpleRegression.addData(timestamp.getEpochSecond(), bufferMetrics.get(timestamp));
-      }
-
-      double slope = simpleRegression.getSlope();
-      instanceMetrics.addMetric(METRIC_WAIT_Q_GROWTH_RATE.text(), slope);
-
-      if (maxBufferChangeRate < slope) {
-        maxBufferChangeRate = slope;
-      }
-    }
-  }
-
-  public MetricsStats computeMinMaxStats(BaseSensor.MetricName metric) {
-    return computeMinMaxStats(metric.text());
-  }
-
-  public MetricsStats computeMinMaxStats(String metric) {
-    double metricMax = 0;
-    double metricMin = Double.MAX_VALUE;
-    for (InstanceMetrics instance : componentMetrics.getMetrics().values()) {
-
-      Double metricValue = instance.getMetricValueSum(metric);
-      if (metricValue == null) {
-        continue;
-      }
-      metricMax = metricMax < metricValue ? metricValue : metricMax;
-      metricMin = metricMin > metricValue ? metricValue : metricMin;
-    }
-    return new MetricsStats(metricMin, metricMax);
-  }
-
-  public double getTotalBackpressure() {
-    return totalBackpressure;
-  }
-
-  public double getMaxBufferChangeRate() {
-    return maxBufferChangeRate;
-  }
-
-  public List<InstanceMetrics> getBoltsWithBackpressure() {
-    return boltsWithBackpressure;
-  }
-}
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
index 5a207ee..7c81b5b 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/HealthManagerEvents.java
@@ -14,7 +14,10 @@
 
 package com.twitter.heron.healthmgr.common;
 
-import com.microsoft.dhalion.resolver.Action;
+import java.time.Instant;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Action;
 
 public class HealthManagerEvents {
 
@@ -22,8 +25,8 @@ public class HealthManagerEvents {
    * This event is created when a resolver executes topology update action
    */
   public static class TopologyUpdate extends Action {
-    public TopologyUpdate() {
-      super(TopologyUpdate.class.getSimpleName());
+    public TopologyUpdate(Instant timestamp, Collection<String> assignments) {
+      super(TopologyUpdate.class.getSimpleName(), timestamp, assignments, null);
     }
   }
 
@@ -31,8 +34,8 @@ public class HealthManagerEvents {
    * This event is created when a resolver executes restart container action
    */
   public static class ContainerRestart extends Action {
-    public ContainerRestart() {
-      super(ContainerRestart.class.getSimpleName());
+    public ContainerRestart(Instant timestamp, Collection<String> assignments) {
+      super(ContainerRestart.class.getSimpleName(), timestamp, assignments, null);
     }
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
index b23bc2a..12f12f0 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/common/PhysicalPlanProvider.java
@@ -54,7 +54,7 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
       @Override
       public synchronized void onEvent(TopologyUpdate event) {
         LOG.info(
-            "Received topology update event, invalidating cached PhysicalPlan: " + event.getName());
+            "Received topology update event, invalidating cached PhysicalPlan: " + event.type());
         physicalPlan = null;
       }
     });
@@ -64,8 +64,8 @@ public class PhysicalPlanProvider implements Provider<PhysicalPlan> {
        */
       @Override
       public synchronized void onEvent(ContainerRestart event) {
-        LOG.info("Received conatiner restart event, invalidating cached PhysicalPlan: "
-            + event.getName());
+        LOG.info("Received container restart event, invalidating cached PhysicalPlan: "
+            + event.type());
         physicalPlan = null;
       }
     });
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
index fd3b50b..74845a8 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BackPressureDetector.java
@@ -15,34 +15,33 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
-public class BackPressureDetector implements IDetector {
-  public static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
+public class BackPressureDetector extends BaseDetector {
+  static final String CONF_NOISE_FILTER = "BackPressureDetector.noiseFilterMillis";
 
   private static final Logger LOG = Logger.getLogger(BackPressureDetector.class.getName());
-  private final BackPressureSensor bpSensor;
   private final int noiseFilterMillis;
 
   @Inject
-  BackPressureDetector(BackPressureSensor bpSensor,
-                       HealthPolicyConfig policyConfig) {
-    this.bpSensor = bpSensor;
+  BackPressureDetector(HealthPolicyConfig policyConfig) {
     noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
   }
 
@@ -50,23 +49,33 @@ public class BackPressureDetector implements IDetector {
    * Detects all components initiating backpressure above the configured limit. Normally there
    * will be only one component
    *
-   * @return A collection of all components causing backpressure.
+   * @return A collection of symptoms each one corresponding to a components with backpressure.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
-    Map<String, ComponentMetrics> backpressureMetrics = bpSensor.get();
-    for (ComponentMetrics compMetrics : backpressureMetrics.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      compStats.computeBpStats();
-      if (compStats.getTotalBackpressure() > noiseFilterMillis) {
-        LOG.info(String.format("Detected back pressure for %s, total back pressure is %f",
-            compMetrics.getName(), compStats.getTotalBackpressure()));
-        result.add(new Symptom(SYMPTOM_BACK_PRESSURE.text(), compMetrics));
+    MeasurementsTable bpMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_BACK_PRESSURE.text());
+    for (String component : bpMetrics.uniqueComponents()) {
+      double compBackPressure = bpMetrics.component(component).sum();
+      if (compBackPressure > noiseFilterMillis) {
+        LOG.info(String.format("Detected component back-pressure for %s, total back pressure is %f",
+            component, compBackPressure));
+        List<String> addresses = Collections.singletonList(component);
+        result.add(new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, addresses));
+      }
+    }
+    for (String instance : bpMetrics.uniqueInstances()) {
+      double totalBP = bpMetrics.instance(instance).sum();
+      if (totalBP > noiseFilterMillis) {
+        LOG.info(String.format("Detected instance back-pressure for %s, total back pressure is %f",
+            instance, totalBP));
+        List<String> addresses = Collections.singletonList(instance);
+        result.add(new Symptom(SYMPTOM_INSTANCE_BACK_PRESSURE.text(), now, addresses));
       }
     }
-
     return result;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
index 9af1ef7..334857e 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/BaseDetector.java
@@ -15,18 +15,22 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import com.microsoft.dhalion.api.IDetector;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 public abstract class BaseDetector implements IDetector {
-  public enum SymptomName {
-    SYMPTOM_BACK_PRESSURE(BackPressureDetector.class.getSimpleName()),
+  protected ExecutionContext context;
+
+  public enum SymptomType {
+    SYMPTOM_COMP_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Component"),
+    SYMPTOM_INSTANCE_BACK_PRESSURE(BackPressureDetector.class.getSimpleName() + "Instance"),
     SYMPTOM_GROWING_WAIT_Q(GrowingWaitQueueDetector.class.getSimpleName()),
     SYMPTOM_LARGE_WAIT_Q(LargeWaitQueueDetector.class.getSimpleName()),
     SYMPTOM_PROCESSING_RATE_SKEW(ProcessingRateSkewDetector.class.getSimpleName()),
-    SYMPTOM_WAIT_Q_DISPARITY(WaitQueueDisparityDetector.class.getSimpleName());
+    SYMPTOM_WAIT_Q_SIZE_SKEW(WaitQueueSkewDetector.class.getSimpleName());
 
     private String text;
 
-    SymptomName(String name) {
+    SymptomType(String name) {
       this.text = name;
     }
 
@@ -39,4 +43,9 @@ public abstract class BaseDetector implements IDetector {
       return text();
     }
   }
+
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
index 0cf305e..855cd12 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetector.java
@@ -16,34 +16,33 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+
+import org.apache.commons.math3.stat.regression.SimpleRegression;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_GROWING_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 
-public class GrowingWaitQueueDetector implements IDetector {
-  static final String CONF_LIMIT = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
+public class GrowingWaitQueueDetector extends BaseDetector {
+  static final String CONF_LIMIT
+      = GrowingWaitQueueDetector.class.getSimpleName() + ".limit";
 
   private static final Logger LOG = Logger.getLogger(GrowingWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final double rateLimit;
 
   @Inject
-  GrowingWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-                           HealthPolicyConfig policyConfig) {
-    this.pendingBufferSensor = pendingBufferSensor;
+  GrowingWaitQueueDetector(HealthPolicyConfig policyConfig) {
     rateLimit = (double) policyConfig.getConfig(CONF_LIMIT, 10.0);
   }
 
@@ -51,23 +50,53 @@ public class GrowingWaitQueueDetector implements IDetector {
    * Detects all components unable to keep up with input load, hence having a growing pending buffer
    * or wait queue
    *
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to a components executing slower
+   * than input rate.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
-    for (ComponentMetrics compMetrics : bufferSizes.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      compStats.computeBufferSizeTrend();
-      if (compStats.getMaxBufferChangeRate() > rateLimit) {
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable waitQueueMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+
+    for (String component : waitQueueMetrics.uniqueComponents()) {
+      double maxSlope = computeWaitQueueSizeTrend(waitQueueMetrics.component(component));
+      if (maxSlope > rateLimit) {
         LOG.info(String.format("Detected growing wait queues for %s, max rate %f",
-            compMetrics.getName(), compStats.getMaxBufferChangeRate()));
-        result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), compMetrics));
+            component, maxSlope));
+        Collection<String> addresses = Collections.singletonList(component);
+        result.add(new Symptom(SYMPTOM_GROWING_WAIT_Q.text(), context.checkpoint(), addresses));
       }
     }
 
     return result;
   }
+
+
+  private double computeWaitQueueSizeTrend(MeasurementsTable metrics) {
+    double maxSlope = 0;
+    for (String instance : metrics.uniqueInstances()) {
+
+      if (metrics.instance(instance) == null || metrics.instance(instance).size() < 3) {
+        // missing of insufficient data for creating a trend line
+        continue;
+      }
+
+      Collection<Measurement> measurements
+          = metrics.instance(instance).sort(false, MeasurementsTable.SortKey.TIME_STAMP).get();
+      SimpleRegression simpleRegression = new SimpleRegression(true);
+
+      for (Measurement m : measurements) {
+        simpleRegression.addData(m.instant().getEpochSecond(), m.value());
+      }
+
+      double slope = simpleRegression.getSlope();
+
+      if (maxSlope < slope) {
+        maxSlope = slope;
+      }
+    }
+    return maxSlope;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
index 9a720e0..bd5a93c 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetector.java
@@ -16,59 +16,65 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.logging.Logger;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IDetector;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_LARGE_WAIT_Q;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_LARGE_WAIT_Q;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
-public class LargeWaitQueueDetector implements IDetector {
+public class LargeWaitQueueDetector extends BaseDetector {
   static final String CONF_SIZE_LIMIT = "LargeWaitQueueDetector.limit";
 
   private static final Logger LOG = Logger.getLogger(LargeWaitQueueDetector.class.getName());
-  private final BufferSizeSensor pendingBufferSensor;
   private final int sizeLimit;
 
   @Inject
-  LargeWaitQueueDetector(BufferSizeSensor pendingBufferSensor,
-                         HealthPolicyConfig policyConfig) {
-    this.pendingBufferSensor = pendingBufferSensor;
+  LargeWaitQueueDetector(HealthPolicyConfig policyConfig) {
     sizeLimit = (int) policyConfig.getConfig(CONF_SIZE_LIMIT, 1000);
   }
 
   /**
-   * Detects all components unable to keep up with input load, hence having a large pending buffer
-   * or wait queue
+   * Detects all components having a large pending buffer or wait queue
    *
-   * @return A collection of all components executing slower than input rate.
+   * @return A collection of symptoms each one corresponding to components with
+   * large wait queues.
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> bufferSizes = pendingBufferSensor.get();
-    for (ComponentMetrics compMetrics : bufferSizes.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      MetricsStats stats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE.text());
-      if (stats.getMetricMin() > sizeLimit) {
-        LOG.info(String.format("Detected large wait queues for %s, smallest queue is %f",
-            compMetrics.getName(), stats.getMetricMin()));
-        result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), compMetrics));
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable waitQueueMetrics
+        = MeasurementsTable.of(measurements).type(METRIC_WAIT_Q_SIZE.text());
+    for (String component : waitQueueMetrics.uniqueComponents()) {
+      Set<String> addresses = new HashSet<>();
+      MeasurementsTable instanceMetrics = waitQueueMetrics.component(component);
+      for (String instance : instanceMetrics.uniqueInstances()) {
+        double avgWaitQSize = instanceMetrics.instance(instance).mean();
+        if (avgWaitQSize > sizeLimit) {
+          LOG.info(String.format("Detected large wait queues for instance"
+              + "%s, smallest queue is + %f", instance, avgWaitQSize));
+          addresses.add(instance);
+        }
+      }
+      if (addresses.size() > 0) {
+        result.add(new Symptom(SYMPTOM_LARGE_WAIT_Q.text(), context.checkpoint(), addresses));
       }
     }
 
     return result;
   }
 }
+
+
+
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
index 78c0ca7..f32c7be 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetector.java
@@ -18,16 +18,15 @@ package com.twitter.heron.healthmgr.detectors;
 import javax.inject.Inject;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class ProcessingRateSkewDetector extends SkewDetector {
   public static final String CONF_SKEW_RATIO = "ProcessingRateSkewDetector.skewRatio";
 
   @Inject
-  ProcessingRateSkewDetector(ExecuteCountSensor exeCountSensor,
-                             HealthPolicyConfig policyConfig) {
-    super(exeCountSensor,
-        (double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
-        BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW);
+  ProcessingRateSkewDetector(HealthPolicyConfig policyConfig) {
+    super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 1.5),
+        BaseSensor.MetricName.METRIC_EXE_COUNT,
+        BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
index fa26c32..473435d 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/SkewDetector.java
@@ -15,53 +15,100 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
+import java.time.Instant;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.google.common.annotations.VisibleForTesting;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
 import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
 public class SkewDetector extends BaseDetector {
-  private static final Logger LOG = Logger.getLogger(SkewDetector.class.getName());
-  private final BaseSensor sensor;
   private final double skewRatio;
-  private final BaseDetector.SymptomName symptomName;
+  private final String metricName;
+  private final BaseDetector.SymptomType symptomType;
 
   @Inject
-  SkewDetector(BaseSensor sensor, double skewRatio, BaseDetector.SymptomName symptom) {
-    this.sensor = sensor;
+  SkewDetector(double skewRatio, BaseSensor.MetricName metricName, BaseDetector.SymptomType
+      symptomType) {
     this.skewRatio = skewRatio;
-    this.symptomName = symptom;
+    this.metricName = metricName.text();
+    this.symptomType = symptomType;
   }
 
   /**
-   * Detects components experiencing data skew, instances with vastly different execute counts.
+   * Detects components experiencing skew on a specific metric
    *
-   * @return A collection of affected components
+   * @return At most two symptoms corresponding to each affected component -- one for positive skew
+   * and one for negative skew
    */
   @Override
-  public List<Symptom> detect() {
-    ArrayList<Symptom> result = new ArrayList<>();
-
-    Map<String, ComponentMetrics> metrics = sensor.get();
-    for (ComponentMetrics compMetrics : metrics.values()) {
-      ComponentMetricsHelper compStats = new ComponentMetricsHelper(compMetrics);
-      MetricsStats stats = compStats.computeMinMaxStats(sensor.getMetricName());
-      if (stats.getMetricMax() > skewRatio * stats.getMetricMin()) {
-        LOG.info(String.format("Detected skew for %s, min = %f, max = %f",
-            compMetrics.getName(), stats.getMetricMin(), stats.getMetricMax()));
-        result.add(new Symptom(symptomName.text(), compMetrics));
+  public Collection<Symptom> detect(Collection<Measurement> measurements) {
+    Collection<Symptom> result = new ArrayList<>();
+
+    MeasurementsTable metrics = MeasurementsTable.of(measurements).type(metricName);
+    Instant now = context.checkpoint();
+    for (String component : metrics.uniqueComponents()) {
+      Set<String> addresses = new HashSet<>();
+      Set<String> positiveAddresses = new HashSet<>();
+      Set<String> negativeAddresses = new HashSet<>();
+
+      double componentMax = getMaxOfAverage(metrics.component(component));
+      double componentMin = getMinOfAverage(metrics.component(component));
+      if (componentMax > skewRatio * componentMin) {
+        //there is skew
+        addresses.add(component);
+        result.add(new Symptom(symptomType.text(), now, addresses));
+
+        for (String instance : metrics.component(component).uniqueInstances()) {
+          if (metrics.instance(instance).mean() >= 0.90 * componentMax) {
+            positiveAddresses.add(instance);
+          }
+          if (metrics.instance(instance).mean() <= 1.10 * componentMin) {
+            negativeAddresses.add(instance);
+          }
+        }
+
+        if (!positiveAddresses.isEmpty()) {
+          result.add(new Symptom("POSITIVE " + symptomType.text(), now, positiveAddresses));
+        }
+        if (!negativeAddresses.isEmpty()) {
+          result.add(new Symptom("NEGATIVE " + symptomType.text(), now, negativeAddresses));
+        }
       }
-    }
 
+    }
     return result;
   }
+
+  @VisibleForTesting
+  double getMaxOfAverage(MeasurementsTable table) {
+    double max = 0;
+    for (String instance : table.uniqueInstances()) {
+      double instanceMean = table.instance(instance).mean();
+      if (instanceMean > max) {
+        max = instanceMean;
+      }
+    }
+    return max;
+  }
+
+  @VisibleForTesting
+  double getMinOfAverage(MeasurementsTable table) {
+    double min = Double.MAX_VALUE;
+    for (String instance : table.uniqueInstances()) {
+      double instanceMean = table.instance(instance).mean();
+      if (instanceMean < min) {
+        min = instanceMean;
+      }
+    }
+    return min;
+  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
similarity index 59%
rename from heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
rename to heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
index c8e62d2..133e342 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetector.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetector.java
@@ -18,16 +18,15 @@ package com.twitter.heron.healthmgr.detectors;
 import javax.inject.Inject;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
-public class WaitQueueDisparityDetector extends SkewDetector {
-  public static final String CONF_DISPARITY_RATIO = "WaitQueueDisparityDetector.disparityRatio";
+public class WaitQueueSkewDetector extends SkewDetector {
+  static final String CONF_SKEW_RATIO = "WaitQueueSkewDetector.skewRatio";
 
   @Inject
-  WaitQueueDisparityDetector(BufferSizeSensor pendingBufferSensor,
-                             HealthPolicyConfig policyConfig) {
-    super(pendingBufferSensor,
-        (double) policyConfig.getConfig(CONF_DISPARITY_RATIO, 20.0),
-        BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY);
+  WaitQueueSkewDetector(HealthPolicyConfig policyConfig) {
+    super((double) policyConfig.getConfig(CONF_SKEW_RATIO, 20.0),
+        BaseSensor.MetricName.METRIC_WAIT_Q_SIZE,
+        BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
index 1075984..b56e488 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/BaseDiagnoser.java
@@ -14,26 +14,19 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import com.microsoft.dhalion.api.IDiagnoser;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
 
 public abstract class BaseDiagnoser implements IDiagnoser {
-  public enum DiagnosisName {
-    SYMPTOM_UNDER_PROVISIONING("SYMPTOM_UNDER_PROVISIONING"),
-    SYMPTOM_DATA_SKEW("SYMPTOM_DATA_SKEW"),
-    SYMPTOM_SLOW_INSTANCE("SYMPTOM_SLOW_INSTANCE"),
+  protected ExecutionContext context;
+
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  public enum DiagnosisType {
 
     DIAGNOSIS_UNDER_PROVISIONING(UnderProvisioningDiagnoser.class.getSimpleName()),
     DIAGNOSIS_SLOW_INSTANCE(SlowInstanceDiagnoser.class.getSimpleName()),
@@ -41,7 +34,7 @@ public abstract class BaseDiagnoser implements IDiagnoser {
 
     private String text;
 
-    DiagnosisName(String name) {
+    DiagnosisType(String name) {
       this.text = name;
     }
 
@@ -54,37 +47,4 @@ public abstract class BaseDiagnoser implements IDiagnoser {
       return text();
     }
   }
-
-  List<Symptom> getBackPressureSymptoms(List<Symptom> symptoms) {
-    return getFilteredSymptoms(symptoms, SYMPTOM_BACK_PRESSURE);
-  }
-
-  Map<String, ComponentMetrics> getProcessingRateSkewComponents(List<Symptom> symptoms) {
-    return getFilteredComponents(symptoms, SYMPTOM_PROCESSING_RATE_SKEW);
-  }
-
-  Map<String, ComponentMetrics> getWaitQDisparityComponents(List<Symptom> symptoms) {
-    return getFilteredComponents(symptoms, SYMPTOM_WAIT_Q_DISPARITY);
-  }
-
-  private List<Symptom> getFilteredSymptoms(List<Symptom> symptoms, SymptomName type) {
-    List<Symptom> result = new ArrayList<>();
-    for (Symptom symptom : symptoms) {
-      if (symptom.getName().equals(type.text())) {
-        result.add(symptom);
-      }
-    }
-    return result;
-  }
-
-  private Map<String, ComponentMetrics> getFilteredComponents(List<Symptom> symptoms,
-                                                              SymptomName type) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (Symptom symptom : symptoms) {
-      if (symptom.getName().equals(type.text())) {
-        result.putAll(symptom.getComponents());
-      }
-    }
-    return result;
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
index 0082f2d..564dcfb 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoser.java
@@ -15,73 +15,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_DATA_SKEW;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class DataSkewDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(DataSkewDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-    if (bpSymptoms.isEmpty() || processingRateSkewComponents.isEmpty()
-        || waitQDisparityComponents.isEmpty()) {
-      // Since there is no back pressure or disparate execute count, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
+
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
 
     // verify data skew, larger queue size and back pressure for the same component exists
-    ComponentMetrics exeCountMetrics = processingRateSkewComponents.get(bpMetrics.getName());
-    ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
-    if (exeCountMetrics == null || pendingBufferMetrics == null) {
-      // no processing rate skew and buffer size skew
-      // for the component with back pressure. This is not a data skew case
-      return null;
+    if (waitQSkew.assignment(bpComponent).size() == 0
+        || processingRateSkew.assignment(bpComponent).size() == 0) {
+      return diagnoses;
     }
 
-    ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics,
-        ComponentMetrics.merge(exeCountMetrics, pendingBufferMetrics));
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-    compStats.computeBpStats();
-    MetricsStats exeStats = compStats.computeMinMaxStats(METRIC_EXE_COUNT);
-    MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+    Collection<String> assignments = new ArrayList<>();
 
-    Symptom resultSymptom = null;
-    for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-      double exeCount = boltMetrics.getMetricValueSum(METRIC_EXE_COUNT.text());
-      double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
-      double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (exeStats.getMetricMax() < 1.10 * exeCount
-          && bufferStats.getMetricMax() < 2 * bufferSize) {
-        LOG.info(String.format("DataSkew: %s back-pressure(%s), high execution count: %s and "
-            + "high buffer size %s", boltMetrics.getName(), bpValue, exeCount, bufferSize));
-        resultSymptom = new Symptom(SYMPTOM_DATA_SKEW.text(), mergedData);
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable measurements = context.measurements()
+        .between(oldest, newest)
+        .component(bpComponent);
+
+    for (String instance : measurements.uniqueInstances()) {
+      MeasurementsTable instanceMeasurements = measurements.instance(instance);
+      double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+      double processingRate = instanceMeasurements.type(METRIC_EXE_COUNT.text()).mean();
+      if ((measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2)
+          && (measurements.type(METRIC_EXE_COUNT.text()).max() < 1.10 * processingRate)) {
+        assignments.add(instance);
+        LOG.info(String.format("DataSkew: %s back-pressure, high execution count: %s and "
+            + "high buffer size %s", instance, processingRate, waitQSize));
       }
     }
 
-    return resultSymptom != null ? new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), resultSymptom) : null;
+    if (assignments.size() > 0) {
+      diagnoses.add(new Diagnosis(DIAGNOSIS_DATA_SKEW.text(), context.checkpoint(), assignments));
+    }
+
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
index af1ae4c..673c3b6 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoser.java
@@ -14,69 +14,74 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-import com.twitter.heron.healthmgr.common.MetricsStats;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class SlowInstanceDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
 
-    if (bpSymptoms.isEmpty() || waitQDisparityComponents.isEmpty()
-        || !processingRateSkewComponents.isEmpty()) {
-      // Since there is no back pressure or disparate wait count or similar
-      // execution count, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
 
-    // verify wait Q disparity and back pressure for the same component exists
-    ComponentMetrics pendingBufferMetrics = waitQDisparityComponents.get(bpMetrics.getName());
-    if (pendingBufferMetrics == null) {
-      // no wait Q disparity for the component with back pressure. There is no slow instance
-      return null;
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+    // verify wait Q disparity, similar processing rates and back pressure for the same component
+    // exist
+    if (waitQSkew.assignment(bpComponent).size() == 0
+        || processingRateSkew.assignment(bpComponent).size() > 0) {
+      // TODO in a short window rate skew could exist
+      return diagnoses;
     }
 
-    ComponentMetrics mergedData = ComponentMetrics.merge(bpMetrics, pendingBufferMetrics);
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(mergedData);
-    compStats.computeBpStats();
-    MetricsStats bufferStats = compStats.computeMinMaxStats(METRIC_BUFFER_SIZE);
+    Collection<String> assignments = new ArrayList<>();
+
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable measurements = context.measurements()
+        .between(oldest, newest)
+        .component(bpComponent);
 
-    Symptom resultSymptom = null;
-    for (InstanceMetrics boltMetrics : compStats.getBoltsWithBackpressure()) {
-      double bufferSize = boltMetrics.getMetricValueSum(METRIC_BUFFER_SIZE.text());
-      double bpValue = boltMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      if (bufferStats.getMetricMax() < bufferSize * 2) {
-        LOG.info(String.format("SLOW: %s back-pressure(%s) and high buffer size: %s "
+    for (String instance : measurements.uniqueInstances()) {
+      MeasurementsTable instanceMeasurements = measurements.instance(instance);
+      double waitQSize = instanceMeasurements.type(METRIC_WAIT_Q_SIZE.text()).mean();
+      if (measurements.type(METRIC_WAIT_Q_SIZE.text()).max() < waitQSize * 2) {
+        assignments.add(instance);
+        LOG.info(String.format("SLOW: %s back-pressure and high buffer size: %s "
                 + "and similar processing rates",
-            boltMetrics.getName(), bpValue, bufferSize));
-        resultSymptom = new Symptom(SYMPTOM_SLOW_INSTANCE.text(), mergedData);
+            instance, waitQSize));
       }
     }
 
-    return resultSymptom != null
-        ? new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), resultSymptom) : null;
+    if (assignments.size() > 0) {
+      Instant now = context.checkpoint();
+      diagnoses.add(new Diagnosis(DIAGNOSIS_SLOW_INSTANCE.text(), now, assignments));
+    }
+
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
index 033002f..445a188 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoser.java
@@ -15,48 +15,56 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.logging.Logger;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
 
-import com.twitter.heron.healthmgr.common.ComponentMetricsHelper;
-
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 
 public class UnderProvisioningDiagnoser extends BaseDiagnoser {
   private static final Logger LOG = Logger.getLogger(SlowInstanceDiagnoser.class.getName());
 
   @Override
-  public Diagnosis diagnose(List<Symptom> symptoms) {
-    List<Symptom> bpSymptoms = getBackPressureSymptoms(symptoms);
-    Map<String, ComponentMetrics> processingRateSkewComponents =
-        getProcessingRateSkewComponents(symptoms);
-    Map<String, ComponentMetrics> waitQDisparityComponents = getWaitQDisparityComponents(symptoms);
-
-    if (bpSymptoms.isEmpty() || !processingRateSkewComponents.isEmpty()
-        || !waitQDisparityComponents.isEmpty()) {
-      // Since there is no back pressure or similar processing rates
-      // and buffer sizes, no action is needed
-      return null;
-    } else if (bpSymptoms.size() > 1) {
+  public Collection<Diagnosis> diagnose(Collection<Symptom> symptoms) {
+    Collection<Diagnosis> diagnoses = new ArrayList<>();
+
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    SymptomsTable bp = symptomsTable.type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    if (bp.size() > 1) {
       // TODO handle cases where multiple detectors create back pressure symptom
       throw new IllegalStateException("Multiple back-pressure symptoms case");
     }
-    ComponentMetrics bpMetrics = bpSymptoms.iterator().next().getComponent();
 
-    ComponentMetricsHelper compStats = new ComponentMetricsHelper(bpMetrics);
-    compStats.computeBpStats();
-    LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure(%s) and similar processing rates "
-            + "and buffer sizes",
-        bpMetrics.getName(), compStats.getTotalBackpressure()));
+    if (bp.size() == 0) {
+      return diagnoses;
+    }
+    String bpComponent = bp.first().assignments().iterator().next();
+
+    SymptomsTable processingRateSkew = symptomsTable.type(SYMPTOM_PROCESSING_RATE_SKEW.text());
+    SymptomsTable waitQSkew = symptomsTable.type(SYMPTOM_WAIT_Q_SIZE_SKEW.text());
+
+    if (waitQSkew.assignment(bpComponent).size() != 0
+        || processingRateSkew.assignment(bpComponent).size() != 0) {
+      return diagnoses;
+    }
+
+    Collection<String> assignments = Collections.singletonList(bpComponent);
+    LOG.info(String.format("UNDER_PROVISIONING: %s back-pressure and similar processing rates "
+        + "and wait queue sizes", bpComponent));
+
+    diagnoses.add(
+        new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), context.checkpoint(), assignments));
 
+    //TODO verify large wait queue for all instances
 
-    Symptom resultSymptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), bpMetrics);
-    return new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), resultSymptom);
+    return diagnoses;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
index 57e18b9..3dc1ef7 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/AutoRestartBackpressureContainerPolicy.java
@@ -15,36 +15,27 @@
 
 package com.twitter.heron.healthmgr.policy;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 import java.util.logging.Logger;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
-import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
 import com.microsoft.dhalion.events.EventHandler;
 import com.microsoft.dhalion.events.EventManager;
 import com.microsoft.dhalion.policy.HealthPolicyImpl;
 
-import com.twitter.heron.common.basics.TypeUtils;
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
 import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
-import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
-import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
 import com.twitter.heron.healthmgr.resolvers.RestartContainerResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
 import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
 
 /**
  * This Policy class
  * 1. detector: find out the container that has been in backpressure
- *              state for long time, which we believe the container cannot recover.
+ * state for long time, which we believe the container cannot recover.
  * 2. resolver: try to restart the backpressure container so as to be rescheduled.
  */
 public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
@@ -56,44 +47,29 @@ public class AutoRestartBackpressureContainerPolicy extends HealthPolicyImpl
       Logger.getLogger(AutoRestartBackpressureContainerPolicy.class.getName());
 
   private final HealthPolicyConfig policyConfig;
-  private final RestartContainerResolver restartContainerResolver;
 
   @Inject
-  AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig, EventManager eventManager,
-      BackPressureDetector backPressureDetector,
-      ProcessingRateSkewDetector processingRateSkewDetector,
-      WaitQueueDisparityDetector waitQueueDisparityDetector,
-      SlowInstanceDiagnoser slowInstanceDiagnoser,
-      RestartContainerResolver restartContainerResolver) {
+  AutoRestartBackpressureContainerPolicy(HealthPolicyConfig policyConfig,
+                                         EventManager eventManager,
+                                         BackPressureSensor backPressureSensor,
+                                         BackPressureDetector backPressureDetector,
+                                         RestartContainerResolver restartContainerResolver) {
     this.policyConfig = policyConfig;
-    this.restartContainerResolver = restartContainerResolver;
 
-    registerDetectors(backPressureDetector, waitQueueDisparityDetector, processingRateSkewDetector);
-    registerDiagnosers(slowInstanceDiagnoser);
+    registerSensors(backPressureSensor);
+    registerDetectors(backPressureDetector);
+    registerResolvers(restartContainerResolver);
 
-    setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
-        TypeUtils.getInteger(policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
+    setPolicyExecutionInterval(
+        Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
 
     eventManager.addEventListener(ContainerRestart.class, this);
   }
 
   @Override
-  public IResolver selectResolver(List<Diagnosis> diagnosis) {
-    Map<String, Diagnosis> diagnosisMap =
-        diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
-
-    if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
-      return restartContainerResolver;
-    }
-
-    LOG.warning("Unknown diagnoses. None resolver selected.");
-    return null;
-  }
-
-  @Override
   public void onEvent(ContainerRestart event) {
     int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
     LOG.info("Received container restart action event: " + event);
-    setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+    setOneTimeDelay(Duration.ofMillis(interval));
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
index 61c0048..b060b40 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/policy/DynamicResourceAllocationPolicy.java
@@ -15,16 +15,15 @@
 
 package com.twitter.heron.healthmgr.policy;
 
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
+import java.util.Collection;
 import java.util.logging.Logger;
-import java.util.stream.Collectors;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
 import com.microsoft.dhalion.events.EventHandler;
 import com.microsoft.dhalion.events.EventManager;
 import com.microsoft.dhalion.policy.HealthPolicyImpl;
@@ -34,16 +33,19 @@ import com.twitter.heron.healthmgr.common.HealthManagerEvents.TopologyUpdate;
 import com.twitter.heron.healthmgr.detectors.BackPressureDetector;
 import com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector;
 import com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector;
-import com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector;
+import com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector;
 import com.twitter.heron.healthmgr.diagnosers.DataSkewDiagnoser;
 import com.twitter.heron.healthmgr.diagnosers.SlowInstanceDiagnoser;
 import com.twitter.heron.healthmgr.diagnosers.UnderProvisioningDiagnoser;
 import com.twitter.heron.healthmgr.resolvers.ScaleUpResolver;
+import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
+import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
+import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
 
 import static com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey.HEALTH_POLICY_INTERVAL;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 
 public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
     implements EventHandler<TopologyUpdate> {
@@ -59,10 +61,13 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
   @Inject
   DynamicResourceAllocationPolicy(HealthPolicyConfig policyConfig,
                                   EventManager eventManager,
+                                  BackPressureSensor backPressureSensor,
+                                  BufferSizeSensor bufferSizeSensor,
+                                  ExecuteCountSensor executeCountSensor,
                                   BackPressureDetector backPressureDetector,
                                   LargeWaitQueueDetector largeWaitQueueDetector,
                                   ProcessingRateSkewDetector dataSkewDetector,
-                                  WaitQueueDisparityDetector waitQueueDisparityDetector,
+                                  WaitQueueSkewDetector waitQueueSkewDetector,
                                   UnderProvisioningDiagnoser underProvisioningDiagnoser,
                                   DataSkewDiagnoser dataSkewDiagnoser,
                                   SlowInstanceDiagnoser slowInstanceDiagnoser,
@@ -70,26 +75,27 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
     this.policyConfig = policyConfig;
     this.scaleUpResolver = scaleUpResolver;
 
+    registerSensors(backPressureSensor, bufferSizeSensor, executeCountSensor);
     registerDetectors(backPressureDetector, largeWaitQueueDetector,
-        waitQueueDisparityDetector, dataSkewDetector);
+        waitQueueSkewDetector, dataSkewDetector);
     registerDiagnosers(underProvisioningDiagnoser, dataSkewDiagnoser, slowInstanceDiagnoser);
+    registerResolvers(scaleUpResolver);
 
-    setPolicyExecutionInterval(TimeUnit.MILLISECONDS,
-        (int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000));
+    setPolicyExecutionInterval(
+        Duration.ofMillis((int) policyConfig.getConfig(HEALTH_POLICY_INTERVAL.key(), 60000)));
 
     eventManager.addEventListener(TopologyUpdate.class, this);
   }
 
   @Override
-  public IResolver selectResolver(List<Diagnosis> diagnosis) {
-    Map<String, Diagnosis> diagnosisMap
-        = diagnosis.stream().collect(Collectors.toMap(Diagnosis::getName, d -> d));
+  public IResolver selectResolver(Collection<Diagnosis> diagnosis) {
+    DiagnosisTable diagnosisTable = DiagnosisTable.of(diagnosis);
 
-    if (diagnosisMap.containsKey(DIAGNOSIS_DATA_SKEW.text())) {
+    if (diagnosisTable.type(DIAGNOSIS_DATA_SKEW.text()).size() > 0) {
       LOG.warning("Data Skew diagnoses. This diagnosis does not have any resolver.");
-    } else if (diagnosisMap.containsKey(DIAGNOSIS_SLOW_INSTANCE.text())) {
+    } else if (diagnosisTable.type(DIAGNOSIS_SLOW_INSTANCE.text()).size() > 0) {
       LOG.warning("Slow Instance diagnoses. This diagnosis does not have any resolver.");
-    } else if (diagnosisMap.containsKey(DIAGNOSIS_UNDER_PROVISIONING.text())) {
+    } else if (diagnosisTable.type(DIAGNOSIS_UNDER_PROVISIONING.text()).size() > 0) {
       return scaleUpResolver;
     }
 
@@ -100,6 +106,6 @@ public class DynamicResourceAllocationPolicy extends HealthPolicyImpl
   public void onEvent(TopologyUpdate event) {
     int interval = (int) policyConfig.getConfig(CONF_WAIT_INTERVAL_MILLIS, 180000);
     LOG.info("Received topology update action event: " + event);
-    setOneTimeDelay(TimeUnit.MILLISECONDS, interval);
+    setOneTimeDelay(Duration.ofMillis(interval));
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
index b7b7a31..7bc7833 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/RestartContainerResolver.java
@@ -13,7 +13,10 @@
 // limitations under the License.
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.logging.Logger;
 
@@ -21,89 +24,85 @@ import javax.inject.Inject;
 import javax.inject.Named;
 
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.SymptomsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.HealthManagerEvents.ContainerRestart;
-import com.twitter.heron.healthmgr.common.PhysicalPlanProvider;
 import com.twitter.heron.proto.scheduler.Scheduler.RestartTopologyRequest;
 import com.twitter.heron.scheduler.client.ISchedulerClient;
 
 import static com.twitter.heron.healthmgr.HealthManager.CONF_TOPOLOGY_NAME;
-import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_SLOW_INSTANCE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
 
 public class RestartContainerResolver implements IResolver {
   private static final Logger LOG = Logger.getLogger(RestartContainerResolver.class.getName());
 
-  private final PhysicalPlanProvider physicalPlanProvider;
   private final EventManager eventManager;
   private final String topologyName;
   private final ISchedulerClient schedulerClient;
-  private final int noiseFilterMillis;
+  private ExecutionContext context;
 
   @Inject
   public RestartContainerResolver(@Named(CONF_TOPOLOGY_NAME) String topologyName,
-      PhysicalPlanProvider physicalPlanProvider, EventManager eventManager,
-      ISchedulerClient schedulerClient, HealthPolicyConfig policyConfig) {
+                                  EventManager eventManager,
+                                  ISchedulerClient schedulerClient) {
     this.topologyName = topologyName;
-    this.physicalPlanProvider = physicalPlanProvider;
     this.eventManager = eventManager;
     this.schedulerClient = schedulerClient;
-    this.noiseFilterMillis = (int) policyConfig.getConfig(CONF_NOISE_FILTER, 20);
   }
 
   @Override
-  public List<Action> resolve(List<Diagnosis> diagnosis) {
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  @Override
+  public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
     List<Action> actions = new ArrayList<>();
 
-    for (Diagnosis diagnoses : diagnosis) {
-      Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_SLOW_INSTANCE.text());
-      if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
-        // nothing to fix as there is no back pressure
-        continue;
-      }
-
-      if (bpSymptom.getComponents().size() > 1) {
-        throw new UnsupportedOperationException("Multiple components with back pressure symptom");
-      }
-
-      // want to know which stmgr has backpressure
-      String stmgrId = null;
-      for (InstanceMetrics im : bpSymptom.getComponent().getMetrics().values()) {
-        if (im.hasMetricAboveLimit(METRIC_BACK_PRESSURE.text(), noiseFilterMillis)) {
-          String instanceId = im.getName();
-          int fromIndex = instanceId.indexOf('_') + 1;
-          int toIndex = instanceId.indexOf('_', fromIndex);
-          stmgrId = instanceId.substring(fromIndex, toIndex);
-          break;
-        }
-      }
+    // find all back pressure measurements reported in this execution cycle
+    Instant current = context.checkpoint();
+    Instant previous = context.previousCheckpoint();
+    SymptomsTable bpSymptoms = context.symptoms()
+        .type(SYMPTOM_INSTANCE_BACK_PRESSURE.text())
+        .between(previous, current);
+
+    if (bpSymptoms.size() == 0) {
+      LOG.fine("No back-pressure measurements found, ending as there's nothing to fix");
+      return actions;
+    }
+
+    Collection<String> allBpInstances = new HashSet<>();
+    bpSymptoms.get().forEach(symptom -> allBpInstances.addAll(symptom.assignments()));
+
+    LOG.info(String.format("%d instances caused back-pressure", allBpInstances.size()));
+
+    Collection<String> stmgrIds = new HashSet<>();
+    allBpInstances.forEach(instanceId -> {
+      LOG.info("Id of instance causing back-pressure: " + instanceId);
+      int fromIndex = instanceId.indexOf('_') + 1;
+      int toIndex = instanceId.indexOf('_', fromIndex);
+      String stmgrId = instanceId.substring(fromIndex, toIndex);
+      stmgrIds.add(stmgrId);
+    });
+
+    stmgrIds.forEach(stmgrId -> {
       LOG.info("Restarting container: " + stmgrId);
       boolean b = schedulerClient.restartTopology(
           RestartTopologyRequest.newBuilder()
-          .setContainerIndex(Integer.valueOf(stmgrId))
-          .setTopologyName(topologyName)
-          .build());
+              .setContainerIndex(Integer.valueOf(stmgrId))
+              .setTopologyName(topologyName)
+              .build());
       LOG.info("Restarted container result: " + b);
+    });
 
-      ContainerRestart action = new ContainerRestart();
-      LOG.info("Broadcasting container restart event");
-      eventManager.onEvent(action);
-
-      actions.add(action);
-      return actions;
-    }
-
+    LOG.info("Broadcasting container restart event");
+    ContainerRestart action = new ContainerRestart(current, stmgrIds);
+    eventManager.onEvent(action);
+    actions.add(action);
     return actions;
   }
-
-  @Override
-  public void close() {
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
index 4f37489..3ed7ef2 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolver.java
@@ -13,8 +13,11 @@
 // limitations under the License.
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -24,12 +27,12 @@ import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.microsoft.dhalion.api.IResolver;
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.DiagnosisTable;
+import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import com.twitter.heron.api.generated.TopologyAPI.Topology;
 import com.twitter.heron.common.basics.SysUtils;
@@ -46,7 +49,7 @@ import com.twitter.heron.spi.packing.PackingPlan;
 import com.twitter.heron.spi.packing.PackingPlanProtoSerializer;
 import com.twitter.heron.spi.utils.ReflectionUtils;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 
@@ -58,6 +61,7 @@ public class ScaleUpResolver implements IResolver {
   private ISchedulerClient schedulerClient;
   private EventManager eventManager;
   private Config config;
+  private ExecutionContext context;
 
   @Inject
   public ScaleUpResolver(TopologyProvider topologyProvider,
@@ -73,75 +77,85 @@ public class ScaleUpResolver implements IResolver {
   }
 
   @Override
-  public List<Action> resolve(List<Diagnosis> diagnosis) {
-    for (Diagnosis diagnoses : diagnosis) {
-      Symptom bpSymptom = diagnoses.getSymptoms().get(SYMPTOM_UNDER_PROVISIONING.text());
-      if (bpSymptom == null || bpSymptom.getComponents().isEmpty()) {
-        // nothing to fix as there is no back pressure
-        continue;
-      }
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
 
-      if (bpSymptom.getComponents().size() > 1) {
-        throw new UnsupportedOperationException("Multiple components with back pressure symptom");
-      }
+  @Override
+  public Collection<Action> resolve(Collection<Diagnosis> diagnosis) {
+    List<Action> actions = new ArrayList<>();
 
-      ComponentMetrics bpComponent = bpSymptom.getComponent();
-      int newParallelism = computeScaleUpFactor(bpComponent);
-      Map<String, Integer> changeRequest = new HashMap<>();
-      changeRequest.put(bpComponent.getName(), newParallelism);
+    DiagnosisTable table = DiagnosisTable.of(diagnosis);
+    table = table.type(DIAGNOSIS_UNDER_PROVISIONING.text());
 
-      PackingPlan currentPackingPlan = packingPlanProvider.get();
-      PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
-      if (newPlan == null) {
-        return null;
-      }
+    if (table.size() == 0) {
+      LOG.fine("No under-previsioning diagnosis present, ending as there's nothing to fix");
+      return actions;
+    }
 
-      Scheduler.UpdateTopologyRequest updateTopologyRequest =
-          Scheduler.UpdateTopologyRequest.newBuilder()
-              .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
-              .setProposedPackingPlan(getSerializedPlan(newPlan))
-              .build();
+    // Scale the first assigned component
+    Diagnosis diagnoses = table.first();
+    // verify diagnoses instance is valid
+    if (diagnoses.assignments().isEmpty()) {
+      LOG.warning(String.format("Diagnosis %s is missing assignments", diagnoses.id()));
+      return actions;
+    }
+    String component = diagnoses.assignments().iterator().next();
 
-      LOG.info("Sending Updating topology request: " + updateTopologyRequest);
-      if (!schedulerClient.updateTopology(updateTopologyRequest)) {
-        throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
-            + "updateTopologyRequest=%s", updateTopologyRequest));
-      }
+    int newParallelism = computeScaleUpFactor(component);
+    Map<String, Integer> changeRequest = new HashMap<>();
+    changeRequest.put(component, newParallelism);
 
-      TopologyUpdate action = new TopologyUpdate();
-      LOG.info("Broadcasting topology update event");
-      eventManager.onEvent(action);
+    PackingPlan currentPackingPlan = packingPlanProvider.get();
+    PackingPlan newPlan = buildNewPackingPlan(changeRequest, currentPackingPlan);
+    if (newPlan == null) {
+      return null;
+    }
 
-      LOG.info("Scheduler updated topology successfully.");
+    Scheduler.UpdateTopologyRequest updateTopologyRequest =
+        Scheduler.UpdateTopologyRequest.newBuilder()
+            .setCurrentPackingPlan(getSerializedPlan(currentPackingPlan))
+            .setProposedPackingPlan(getSerializedPlan(newPlan))
+            .build();
 
-      List<Action> actions = new ArrayList<>();
-      actions.add(action);
-      return actions;
+    LOG.info("Sending Updating topology request: " + updateTopologyRequest);
+    if (!schedulerClient.updateTopology(updateTopologyRequest)) {
+      throw new RuntimeException(String.format("Failed to update topology with Scheduler, "
+          + "updateTopologyRequest=%s", updateTopologyRequest));
     }
+    LOG.info("Scheduler updated topology successfully.");
+
+    LOG.info("Broadcasting topology update event");
+    TopologyUpdate action
+        = new TopologyUpdate(context.checkpoint(), Collections.singletonList(component));
+    eventManager.onEvent(action);
 
-    return null;
+    actions.add(action);
+    return actions;
   }
 
   @VisibleForTesting
-  int computeScaleUpFactor(ComponentMetrics componentMetrics) {
-    double totalCompBpTime = 0;
-    String compName = componentMetrics.getName();
-    for (InstanceMetrics instanceMetrics : componentMetrics.getMetrics().values()) {
-      double instanceBp = instanceMetrics.getMetricValueSum(METRIC_BACK_PRESSURE.text());
-      LOG.info(String.format("Instance:%s, bpTime:%.0f", instanceMetrics.getName(), instanceBp));
-      totalCompBpTime += instanceBp;
-    }
-
+  int computeScaleUpFactor(String compName) {
+    Instant newest = context.checkpoint();
+    Instant oldest = context.previousCheckpoint();
+    MeasurementsTable table = context.measurements()
+        .component(compName)
+        .type(METRIC_BACK_PRESSURE.text())
+        .between(oldest, newest);
+
+    double totalCompBpTime = table.sum();
     LOG.info(String.format("Component: %s, bpTime: %.0f", compName, totalCompBpTime));
+
     if (totalCompBpTime >= 1000) {
       totalCompBpTime = 999;
+      LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
     }
-    LOG.warning(String.format("Comp:%s, bpTime after filter: %.0f", compName, totalCompBpTime));
 
     double unusedCapacity = (1.0 * totalCompBpTime) / (1000 - totalCompBpTime);
+
     // scale up fencing: do not scale more than 4 times the current size
     unusedCapacity = unusedCapacity > 4.0 ? 4.0 : unusedCapacity;
-    int parallelism = (int) Math.ceil(componentMetrics.getMetrics().size() * (1 + unusedCapacity));
+    int parallelism = (int) Math.ceil(table.uniqueInstances().size() * (1 + unusedCapacity));
     LOG.info(String.format("Component's, %s, unused capacity is: %.3f. New parallelism: %d",
         compName, unusedCapacity, parallelism));
     return parallelism;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
index 275084f..76829cf 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BackPressureSensor.java
@@ -16,16 +16,15 @@
 package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.logging.Logger;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
@@ -34,8 +33,6 @@ import com.twitter.heron.healthmgr.common.TopologyProvider;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 
 public class BackPressureSensor extends BaseSensor {
-  private static final Logger LOG = Logger.getLogger(BackPressureSensor.class.getName());
-
   private final MetricsProvider metricsProvider;
   private final PackingPlanProvider packingPlanProvider;
   private final TopologyProvider topologyProvider;
@@ -51,72 +48,46 @@ public class BackPressureSensor extends BaseSensor {
     this.metricsProvider = metricsProvider;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> get(String... components) {
-    return get();
-  }
-
   /**
    * Computes the average (millis/sec) back-pressure caused by instances in the configured window
    *
-   * @return the average value
+   * @return the average value measurements
    */
-  public Map<String, ComponentMetrics> get() {
-    Map<String, ComponentMetrics> result = new HashMap<>();
+  @Override
+  public Collection<Measurement> fetch() {
+    Collection<Measurement> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
     String[] boltComponents = topologyProvider.getBoltNames();
-    for (String boltComponent : boltComponents) {
-      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+    Duration duration = getDuration();
+    for (String component : boltComponents) {
+      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
 
-      Duration duration = getDuration();
-      Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
-      for (String boltInstanceName : boltInstanceNames) {
-        String metric = getMetricName() + boltInstanceName;
-        Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
-            metric, duration, COMPONENT_STMGR);
+      for (String instance : boltInstanceNames) {
+        String metric = getMetricName() + instance;
 
-        if (stmgrResult.get(COMPONENT_STMGR) == null) {
+        Collection<Measurement> stmgrResult
+            = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+        if (stmgrResult.isEmpty()) {
           continue;
         }
 
-        HashMap<String, InstanceMetrics> streamManagerResult =
-            stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
-        if (streamManagerResult.isEmpty()) {
+        MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+        if (table.size() == 0) {
           continue;
         }
-
-        // since a bolt instance belongs to one stream manager,
-        // for tracker rest api: expect just one metrics manager instance in the result;
-        // for tmaster/metricscache stat interface: expect a list
-        Double valueSum = 0.0;
-        for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
-            it.hasNext();) {
-          InstanceMetrics stmgrInstanceResult = it.next();
-
-          Double val = stmgrInstanceResult.getMetricValueSum(metric);
-          if (val == null) {
-            continue;
-          } else {
-            valueSum += val;
-          }
-        }
-        double averageBp = valueSum / duration.getSeconds();
+        double averageBp = table.type(metric).sum() / duration.getSeconds();
 
         // The maximum value of averageBp should be 1000, i.e. 1000 millis of BP per second. Due to
         // a bug in Heron (Issue: 1753), this value could be higher in some cases. The following
         // check partially corrects the reported BP value
         averageBp = averageBp > 1000 ? 1000 : averageBp;
-        InstanceMetrics boltInstanceMetric
-            = new InstanceMetrics(boltInstanceName, getMetricName(), averageBp);
 
-        instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+        Measurement measurement
+            = new Measurement(component, instance, getMetricName(), now, averageBp);
+        result.add(measurement);
       }
-
-      ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
-      result.put(boltComponent, componentMetrics);
     }
-
     return result;
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
index 61e2b0b..18b175a 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BaseSensor.java
@@ -15,8 +15,11 @@
 package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.ISensor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
@@ -24,12 +27,13 @@ import com.twitter.heron.healthmgr.HealthPolicyConfigReader.PolicyConfigKey;
 public abstract class BaseSensor implements ISensor {
   static final Duration DEFAULT_METRIC_DURATION = Duration.ofSeconds(300);
   static final String COMPONENT_STMGR = "__stmgr__";
+  protected ExecutionContext context;
 
   public enum MetricName {
     METRIC_EXE_COUNT("__execute-count/default"),
     METRIC_BACK_PRESSURE("__time_spent_back_pressure_by_compid/"),
-    METRIC_BUFFER_SIZE("__connection_buffer_by_instanceid/"),
-    METRIC_BUFFER_SIZE_SUFFIX("/bytes"),
+    METRIC_WAIT_Q_SIZE("__connection_buffer_by_instanceid/"),
+    METRIC_WAIT_Q_SIZE_SUFFIX("/bytes"),
     METRIC_WAIT_Q_GROWTH_RATE("METRIC_WAIT_Q_GROWTH_RATE");
 
     private String text;
@@ -78,7 +82,17 @@ public abstract class BaseSensor implements ISensor {
     return value;
   }
 
-  public String getMetricName() {
+  @Override
+  public void initialize(ExecutionContext ctxt) {
+    this.context = ctxt;
+  }
+
+  @Override
+  public Collection<String> getMetricTypes() {
+    return Collections.singletonList(metricName);
+  }
+
+  String getMetricName() {
     return metricName;
   }
 
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
index 2657e62..aa81b37 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensor.java
@@ -15,24 +15,22 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 
 public class BufferSizeSensor extends BaseSensor {
   private final MetricsProvider metricsProvider;
@@ -44,80 +42,46 @@ public class BufferSizeSensor extends BaseSensor {
                           PackingPlanProvider packingPlanProvider,
                           TopologyProvider topologyProvider,
                           MetricsProvider metricsProvider) {
-    super(policyConfig, METRIC_BUFFER_SIZE.text(), BufferSizeSensor.class.getSimpleName());
+    super(policyConfig, METRIC_WAIT_Q_SIZE.text(), BufferSizeSensor.class.getSimpleName());
     this.packingPlanProvider = packingPlanProvider;
     this.topologyProvider = topologyProvider;
     this.metricsProvider = metricsProvider;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> get() {
-    return get(topologyProvider.getBoltNames());
-  }
-
   /**
    * The buffer size as provided by tracker
    *
-   * @return buffer size
+   * @return buffer size measurements
    */
-  public Map<String, ComponentMetrics> get(String... desiredBoltNames) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-
-    Set<String> boltNameFilter = new HashSet<>();
-    if (desiredBoltNames.length > 0) {
-      boltNameFilter.addAll(Arrays.asList(desiredBoltNames));
-    }
+  @Override
+  public Collection<Measurement> fetch() {
+    Collection<Measurement> result = new ArrayList<>();
+    Instant now = context.checkpoint();
 
     String[] boltComponents = topologyProvider.getBoltNames();
-    for (String boltComponent : boltComponents) {
-      if (!boltNameFilter.isEmpty() && !boltNameFilter.contains(boltComponent)) {
-        continue;
-      }
+    Duration duration = getDuration();
 
-      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(boltComponent);
+    for (String component : boltComponents) {
+      String[] boltInstanceNames = packingPlanProvider.getBoltInstanceNames(component);
+      for (String instance : boltInstanceNames) {
+        String metric = getMetricName() + instance + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
 
-      Map<String, InstanceMetrics> instanceMetrics = new HashMap<>();
-      for (String boltInstanceName : boltInstanceNames) {
-        String metric = getMetricName() + boltInstanceName + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
-
-        Map<String, ComponentMetrics> stmgrResult = metricsProvider.getComponentMetrics(
-            metric,
-            getDuration(),
-            COMPONENT_STMGR);
-
-        if (stmgrResult.get(COMPONENT_STMGR) == null) {
+        Collection<Measurement> stmgrResult
+            = metricsProvider.getMeasurements(now, duration, metric, COMPONENT_STMGR);
+        if (stmgrResult.isEmpty()) {
           continue;
         }
 
-        HashMap<String, InstanceMetrics> streamManagerResult =
-            stmgrResult.get(COMPONENT_STMGR).getMetrics();
-
-        if (streamManagerResult.isEmpty()) {
+        MeasurementsTable table = MeasurementsTable.of(stmgrResult).component(COMPONENT_STMGR);
+        if (table.size() == 0) {
           continue;
         }
+        double totalSize = table.type(metric).sum();
 
-        // since a bolt instance belongs to one stream manager, expect just one metrics
-        // manager instance in the result
-        Double stmgrInstanceResult = 0.0;
-        for (Iterator<InstanceMetrics> it = streamManagerResult.values().iterator();
-            it.hasNext();) {
-          InstanceMetrics iMetrics = it.next();
-          Double val = iMetrics.getMetricValueSum(metric);
-          if (val == null) {
-            continue;
-          } else {
-            stmgrInstanceResult += val;
-          }
-        }
-
-        InstanceMetrics boltInstanceMetric =
-            new InstanceMetrics(boltInstanceName, getMetricName(), stmgrInstanceResult);
-
-        instanceMetrics.put(boltInstanceName, boltInstanceMetric);
+        Measurement measurement
+            = new Measurement(component, instance, getMetricName(), now, totalSize);
+        result.add(measurement);
       }
-
-      ComponentMetrics componentMetrics = new ComponentMetrics(boltComponent, instanceMetrics);
-      result.put(boltComponent, componentMetrics);
     }
 
     return result;
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
index 47119c1..21ee98a 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensor.java
@@ -15,12 +15,15 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Map;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 
 import javax.inject.Inject;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
@@ -30,6 +33,7 @@ import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_E
 public class ExecuteCountSensor extends BaseSensor {
   private final TopologyProvider topologyProvider;
   private final MetricsProvider metricsProvider;
+  private Instant now;
 
   @Inject
   ExecuteCountSensor(TopologyProvider topologyProvider,
@@ -40,14 +44,10 @@ public class ExecuteCountSensor extends BaseSensor {
     this.metricsProvider = metricsProvider;
   }
 
-  public Map<String, ComponentMetrics> get() {
-    String[] boltNames = topologyProvider.getBoltNames();
-    return get(boltNames);
-  }
-
-  public Map<String, ComponentMetrics> get(String... boltNames) {
-    return metricsProvider.getComponentMetrics(getMetricName(),
-        getDuration(),
-        boltNames);
+  @Override
+  public Collection<Measurement> fetch() {
+    List<String> bolts = Arrays.asList(topologyProvider.getBoltNames());
+    now = context.checkpoint();
+    return metricsProvider.getMeasurements(now, getDuration(), getMetricTypes(), bolts);
   }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
index f3e7024..44728ae 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProvider.java
@@ -18,8 +18,8 @@ package com.twitter.heron.healthmgr.sensors;
 import java.net.HttpURLConnection;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -29,8 +29,7 @@ import javax.inject.Named;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.InvalidProtocolBufferException;
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import com.twitter.heron.proto.system.Common.StatusCode;
 import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -51,7 +50,6 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
   private final SchedulerStateManagerAdaptor stateManagerAdaptor;
   private final String topologyName;
 
-  private Clock clock = new Clock();
   private String metricsCacheLocation;
 
   @Inject
@@ -64,35 +62,29 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
   }
 
   @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Instant startTime,
-                                                           Duration duration,
-                                                           String... components) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (String component : components) {
-      TopologyMaster.MetricResponse response =
-          getMetricsFromMetricsCache(metric, component, startTime, duration);
-
-      Map<String, InstanceMetrics> metrics = parse(response, component, metric, startTime);
-      ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
-      result.put(component, componentMetric);
+  public Collection<Measurement> getMeasurements(Instant startTime,
+                                                 Duration duration,
+                                                 Collection<String> metricNames,
+                                                 Collection<String> components) {
+    Collection<Measurement> result = new ArrayList<>();
+    for (String metric : metricNames) {
+      for (String component : components) {
+        TopologyMaster.MetricResponse response =
+            getMetricsFromMetricsCache(metric, component, startTime, duration);
+        Collection<Measurement> measurements = parse(response, component, metric, startTime);
+        LOG.fine(String.format("%d measurements received for %s/%s",
+            measurements.size(), component, metric));
+        result.addAll(measurements);
+      }
     }
     return result;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Duration duration,
-                                                           String... components) {
-    Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
-    return getComponentMetrics(metric, start, duration, components);
-  }
-
   @VisibleForTesting
   @SuppressWarnings("unchecked")
-  Map<String, InstanceMetrics> parse(
+  Collection<Measurement> parse(
       TopologyMaster.MetricResponse response, String component, String metric, Instant startTime) {
-    Map<String, InstanceMetrics> metricsData = new HashMap<>();
+    Collection<Measurement> metricsData = new ArrayList();
 
     if (response == null || !response.getStatus().getStatus().equals(StatusCode.OK)) {
       LOG.info(String.format(
@@ -109,29 +101,32 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
     // convert heron.protobuf.taskMetrics to dhalion.InstanceMetrics
     for (TaskMetric tm : response.getMetricList()) {
       String instanceId = tm.getInstanceId();
-      InstanceMetrics instanceMetrics = new InstanceMetrics(instanceId);
-
       for (IndividualMetric im : tm.getMetricList()) {
         String metricName = im.getName();
-        Map<Instant, Double> values = new HashMap<>();
 
         // case 1
         for (IntervalValue iv : im.getIntervalValuesList()) {
           MetricInterval mi = iv.getInterval();
           String value = iv.getValue();
-          values.put(Instant.ofEpochSecond(mi.getStart()), Double.parseDouble(value));
+          Measurement measurement = new Measurement(
+              component,
+              instanceId,
+              metricName,
+              Instant.ofEpochSecond(mi.getStart()),
+              Double.parseDouble(value));
+          metricsData.add(measurement);
         }
         // case 2
         if (im.hasValue()) {
-          values.put(startTime, Double.parseDouble(im.getValue()));
-        }
-
-        if (!values.isEmpty()) {
-          instanceMetrics.addMetric(metricName, values);
+          Measurement measurement = new Measurement(
+              component,
+              instanceId,
+              metricName,
+              startTime,
+              Double.parseDouble(im.getValue()));
+          metricsData.add(measurement);
         }
       }
-
-      metricsData.put(instanceId, instanceMetrics);
     }
 
     return metricsData;
@@ -145,8 +140,8 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
         .setComponentName(component)
         .setExplicitInterval(
             MetricInterval.newBuilder()
-                .setStart(start.getEpochSecond())
-                .setEnd(start.plus(duration).getEpochSecond())
+                .setStart(start.minus(duration).getEpochSecond())
+                .setEnd(start.getEpochSecond())
                 .build())
         .addMetric(metric)
         .build();
@@ -179,11 +174,6 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
     }
   }
 
-  @VisibleForTesting
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
   /* returns last known location of metrics cache
    */
   private synchronized String getCacheLocation() {
@@ -200,10 +190,4 @@ public class MetricsCacheMetricsProvider implements MetricsProvider {
   private synchronized void resetCacheLocation() {
     metricsCacheLocation = null;
   }
-
-  static class Clock {
-    long currentTime() {
-      return System.currentTimeMillis();
-    }
-  }
 }
diff --git a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
index f311842..8964a67 100644
--- a/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
+++ b/heron/healthmgr/src/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProvider.java
@@ -17,7 +17,8 @@ package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -34,8 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.jayway.jsonpath.DocumentContext;
 import com.jayway.jsonpath.JsonPath;
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
 
 import net.minidev.json.JSONArray;
 
@@ -49,8 +49,6 @@ public class TrackerMetricsProvider implements MetricsProvider {
   private static final Logger LOG = Logger.getLogger(TrackerMetricsProvider.class.getName());
   private final WebTarget baseTarget;
 
-  private Clock clock = new Clock();
-
   @Inject
   public TrackerMetricsProvider(@Named(CONF_METRICS_SOURCE_URL) String trackerURL,
                                 @Named(CONF_TOPOLOGY_NAME) String topologyName,
@@ -68,35 +66,26 @@ public class TrackerMetricsProvider implements MetricsProvider {
   }
 
   @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Instant startTime,
-                                                           Duration duration,
-                                                           String... components) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    for (String component : components) {
-      String response = getMetricsFromTracker(metric, component, startTime, duration);
-      Map<String, InstanceMetrics> metrics = parse(response, component, metric);
-      ComponentMetrics componentMetric = new ComponentMetrics(component, metrics);
-      result.put(component, componentMetric);
+  public Collection<Measurement> getMeasurements(Instant startTime,
+                                                 Duration duration,
+                                                 Collection<String> metricNames,
+                                                 Collection<String> components) {
+    Collection<Measurement> result = new ArrayList<>();
+    for (String metric : metricNames) {
+      for (String component : components) {
+        String response = getMetricsFromTracker(metric, component, startTime, duration);
+        Collection<Measurement> measurements = parse(response, component, metric);
+        LOG.fine(String.format("%d measurements received for %s/%s",
+            measurements.size(), component, metric));
+        result.addAll(measurements);
+      }
     }
     return result;
   }
 
-  @Override
-  public Map<String, ComponentMetrics> getComponentMetrics(String metric,
-                                                           Duration duration,
-                                                           String... components) {
-    Instant start = Instant.ofEpochMilli(clock.currentTime() - duration.toMillis());
-    return getComponentMetrics(metric, start, duration, components);
-  }
-
   @SuppressWarnings("unchecked")
-  private Map<String, InstanceMetrics> parse(String response, String component, String metric) {
-    Map<String, InstanceMetrics> metricsData = new HashMap<>();
-
-    if (response == null || response.isEmpty()) {
-      return metricsData;
-    }
+  private Collection<Measurement> parse(String response, String component, String metric) {
+    Collection<Measurement> metricsData = new ArrayList();
 
     DocumentContext result = JsonPath.parse(response);
     JSONArray jsonArray = result.read("$.." + metric);
@@ -113,14 +102,15 @@ public class TrackerMetricsProvider implements MetricsProvider {
 
     for (String instanceName : metricsMap.keySet()) {
       Map<String, String> tmpValues = (Map<String, String>) metricsMap.get(instanceName);
-      Map<Instant, Double> values = new HashMap<>();
       for (String timeStamp : tmpValues.keySet()) {
-        values.put(Instant.ofEpochSecond(Long.parseLong(timeStamp)),
+        Measurement measurement = new Measurement(
+            component,
+            instanceName,
+            metric,
+            Instant.ofEpochSecond(Long.parseLong(timeStamp)),
             Double.parseDouble(tmpValues.get(timeStamp)));
+        metricsData.add(measurement);
       }
-      InstanceMetrics instanceMetrics = new InstanceMetrics(instanceName);
-      instanceMetrics.addMetric(metric, values);
-      metricsData.put(instanceName, instanceMetrics);
     }
 
     return metricsData;
@@ -131,23 +121,12 @@ public class TrackerMetricsProvider implements MetricsProvider {
     WebTarget target = baseTarget
         .queryParam("metricname", metric)
         .queryParam("component", component)
-        .queryParam("starttime", start.getEpochSecond())
-        .queryParam("endtime", start.getEpochSecond() + duration.getSeconds());
+        .queryParam("starttime", start.getEpochSecond() - duration.getSeconds())
+        .queryParam("endtime", start.getEpochSecond());
 
     LOG.log(Level.FINE, "Tracker Query URI: {0}", target.getUri());
 
     Response r = target.request(MediaType.APPLICATION_JSON_TYPE).get();
     return r.readEntity(String.class);
   }
-
-  @VisibleForTesting
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
-  static class Clock {
-    long currentTime() {
-      return System.currentTimeMillis();
-    }
-  }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
deleted file mode 100644
index b75c803..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/TestUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
-
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_PROCESSING_RATE_SKEW;
-import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomName.SYMPTOM_WAIT_Q_DISPARITY;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
-
-public class TestUtils {
-  public static List<Symptom> createBpSymptomList(int... bpValues) {
-    return createListFromSymptom(createBPSymptom(bpValues));
-  }
-
-  public static Symptom createExeCountSymptom(int... exeCounts) {
-    return createSymptom(SYMPTOM_PROCESSING_RATE_SKEW, METRIC_EXE_COUNT, exeCounts);
-  }
-
-  public static Symptom createWaitQueueDisparitySymptom(int... bufferSizes) {
-    return createSymptom(SYMPTOM_WAIT_Q_DISPARITY, METRIC_BUFFER_SIZE, bufferSizes);
-  }
-
-  private static Symptom createBPSymptom(int... bpValues) {
-    return createSymptom(SYMPTOM_BACK_PRESSURE, METRIC_BACK_PRESSURE, bpValues);
-  }
-
-  private static void addInstanceMetric(ComponentMetrics metrics, int i, double val, String metric) {
-    InstanceMetrics instanceMetric = new InstanceMetrics("container_1_bolt_" + i, metric, val);
-    metrics.addInstanceMetric(instanceMetric);
-  }
-
-  private static Symptom createSymptom(SymptomName symptom, MetricName metric, int... values) {
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    for (int i = 0; i < values.length; i++) {
-      addInstanceMetric(compMetrics, i, values[i], metric.text());
-    }
-    return new Symptom(symptom.text(), compMetrics);
-  }
-
-  private static List<Symptom> createListFromSymptom(Symptom symptom) {
-    List<Symptom> symptoms = new ArrayList<>();
-    symptoms.add(symptom);
-    return symptoms;
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
deleted file mode 100644
index 7df7fce..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/ComponentMetricsHelperTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.common;
-
-import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_GROWTH_RATE;
-import static org.junit.Assert.assertEquals;
-
-public class ComponentMetricsHelperTest {
-
-  @Test
-  public void detectsMultipleCompIncreasingBuffer() {
-    ComponentMetrics compMetrics;
-    InstanceMetrics instanceMetrics;
-    Map<Instant, Double> bufferSizes;
-
-    compMetrics = new ComponentMetrics("bolt");
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892210), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 600.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 900.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 1200.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    instanceMetrics = new InstanceMetrics("i2");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 180.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 360.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 540.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    ComponentMetricsHelper helper = new ComponentMetricsHelper(compMetrics);
-    helper.computeBufferSizeTrend();
-    assertEquals(5, helper.getMaxBufferChangeRate(), 0.1);
-
-    HashMap<String, InstanceMetrics> metrics = compMetrics.getMetrics();
-    assertEquals(1, metrics.get("i1").getMetrics().get(METRIC_WAIT_Q_GROWTH_RATE.text()).size());
-    assertEquals(5, metrics.get("i1").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
-    assertEquals(3, metrics.get("i2").getMetricValueSum(METRIC_WAIT_Q_GROWTH_RATE.text()), 0.1);
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
index 3d45ec9..2316416 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/PackingPlanProviderTest.java
@@ -35,7 +35,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class PackingPlanProviderTest {
-  String topologyName = "topologyName";
+  private String topologyName = "topologyName";
   private EventManager eventManager = new EventManager();
 
   @Test
@@ -67,7 +67,7 @@ public class PackingPlanProviderTest {
     PackingPlan packing = provider.get();
     Assert.assertEquals(1, packing.getContainers().size());
 
-    provider.onEvent(new TopologyUpdate());
+    provider.onEvent(new TopologyUpdate(null, null));
     provider.get();
     verify(adaptor, times(2)).getPackingPlan(topologyName);
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
index 3b7a538..bff793b 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/common/TopologyProviderTest.java
@@ -58,7 +58,7 @@ public class TopologyProviderTest {
     Assert.assertEquals(2, provider.get().getBoltsCount());
 
     // once fetched it is cached
-    provider.onEvent(new TopologyUpdate());
+    provider.onEvent(new TopologyUpdate(null, null));
     provider.get();
     verify(adaptor, times(2)).getPhysicalPlan(topology);
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
index b5dd2f2..ad6c266 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/BackPressureDetectorTest.java
@@ -14,51 +14,82 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BackPressureSensor;
 
 import static com.twitter.heron.healthmgr.detectors.BackPressureDetector.CONF_NOISE_FILTER;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_INSTANCE_BACK_PRESSURE;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class BackPressureDetectorTest {
+  Instant now;
+
+  @Before
+  public void setup() {
+    now = Instant.now();
+  }
   @Test
   public void testConfigAndFilter() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_NOISE_FILTER, 20)).thenReturn(50);
 
-    ComponentMetrics compMetrics =
-        new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 55);
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
 
-    BackPressureSensor sensor = mock(BackPressureSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 55);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+    Measurement measurement3
+        = new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+
+    BackPressureDetector detector = new BackPressureDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    Assert.assertEquals(2, symptoms.size());
+    SymptomsTable compSymptom = SymptomsTable.of(symptoms).type(SYMPTOM_COMP_BACK_PRESSURE.text());
+    Assert.assertEquals(1,compSymptom.size());
+    Assert.assertEquals(1, compSymptom.get().iterator().next().assignments().size());
 
-    BackPressureDetector detector = new BackPressureDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    SymptomsTable instanceSymptom
+        = SymptomsTable.of(symptoms).type(SYMPTOM_INSTANCE_BACK_PRESSURE.text());
+    Assert.assertEquals(1, instanceSymptom.size());
+    Assert.assertEquals(1, instanceSymptom.get().iterator().next().assignments().size());
 
-    Assert.assertEquals(1, symptoms.size());
+    Symptom symptom = symptoms.iterator().next();
 
-    compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 45);
-    topologyMetrics.put("bolt", compMetrics);
 
-    sensor = mock(BackPressureSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 45);
+    measurement2
+        = new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 3);
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    detector = new BackPressureDetector(sensor, config);
-    symptoms = detector.detect();
+    detector = new BackPressureDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
 
     Assert.assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
index 8508e74..26b48bf 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/GrowingWaitQueueDetectorTest.java
@@ -15,21 +15,19 @@
 package com.twitter.heron.healthmgr.detectors;
 
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
 import static com.twitter.heron.healthmgr.detectors.GrowingWaitQueueDetector.CONF_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -40,52 +38,64 @@ public class GrowingWaitQueueDetectorTest {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_LIMIT, 10.0)).thenReturn(5.0);
 
-    ComponentMetrics compMetrics;
-    InstanceMetrics instanceMetrics;
-    Map<Instant, Double> bufferSizes;
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 300.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 700.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 1000.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 1300.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    topologyMetrics.put("bolt", compMetrics);
-
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 0.0);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892270), 300.0);
+    Measurement measurement3
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892330), 700.0);
+    Measurement measurement4
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892390), 1000.0);
+    Measurement measurement5
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892450), 1300.0);
+
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+
+    GrowingWaitQueueDetector detector = new GrowingWaitQueueDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
 
     assertEquals(1, symptoms.size());
-
-    instanceMetrics = new InstanceMetrics("i1");
-    bufferSizes = new HashMap<>();
-    bufferSizes.put(Instant.ofEpochSecond(1497892222), 0.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892270), 200.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892330), 400.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892390), 600.0);
-    bufferSizes.put(Instant.ofEpochSecond(1497892450), 800.0);
-    instanceMetrics.addMetric(METRIC_BUFFER_SIZE.text(), bufferSizes);
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(instanceMetrics);
-
-    topologyMetrics.put("bolt", compMetrics);
-
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    detector = new GrowingWaitQueueDetector(sensor, config);
-    symptoms = detector.detect();
+    assertEquals(1, symptoms.iterator().next().assignments().size());
+    
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 0.0);
+    measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892270), 200.0);
+    measurement3
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892330), 400.0);
+    measurement4
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892390), 600.0);
+    measurement5
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892450), 800.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+
+    detector = new GrowingWaitQueueDetector(config);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
index a081c9e..1e1bb6a 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/LargeWaitQueueDetectorTest.java
@@ -14,20 +14,20 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
 
 import static com.twitter.heron.healthmgr.detectors.LargeWaitQueueDetector.CONF_SIZE_LIMIT;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -38,28 +38,40 @@ public class LargeWaitQueueDetectorTest {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SIZE_LIMIT, 1000)).thenReturn(20);
 
-    ComponentMetrics compMetrics
-        = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 21);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 21);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892322), 21);
 
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    LargeWaitQueueDetector detector = new LargeWaitQueueDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
+    LargeWaitQueueDetector detector = new LargeWaitQueueDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
 
     assertEquals(1, symptoms.size());
+    assertEquals(1, symptoms.iterator().next().assignments().size());
+
 
-    compMetrics = new ComponentMetrics("bolt", "i1", METRIC_BUFFER_SIZE.text(), 19);
-    topologyMetrics.put("bolt", compMetrics);
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 11);
+    measurement2
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892322), 10);
 
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
 
-    detector = new LargeWaitQueueDetector(sensor, config);
-    symptoms = detector.detect();
+    detector = new LargeWaitQueueDetector(config);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
index edf8115..f97dcd2 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/ProcessingRateSkewDetectorTest.java
@@ -14,59 +14,111 @@
 
 package com.twitter.heron.healthmgr.detectors;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
 import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.ExecuteCountSensor;
 
 import static com.twitter.heron.healthmgr.detectors.ProcessingRateSkewDetector.CONF_SKEW_RATIO;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ProcessingRateSkewDetectorTest {
+
   @Test
-  public void testConfigAndFilter() {
+  public void testGetMaxMin() {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
 
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892122), 3000);
+    Measurement measurement3
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+    Measurement measurement4
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 400.0);
 
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
 
-    ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
+    MeasurementsTable metricsTable = MeasurementsTable.of(metrics);
 
-    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
 
-    assertEquals(1, symptoms.size());
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
 
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
-    topologyMetrics.put("bolt", compMetrics);
+    assertEquals(2000, (int) detector.getMaxOfAverage(metricsTable));
+    assertEquals(300, (int) detector.getMinOfAverage(metricsTable));
 
-    sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
+  }
 
-    detector = new ProcessingRateSkewDetector(sensor, config);
-    symptoms = detector.detect();
+  @Test
+  public void testConfigAndFilter() {
+    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+    when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
+
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(3, symptoms.size());
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+
+    measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 500.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    detector = new ProcessingRateSkewDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
 
     assertEquals(0, symptoms.size());
   }
@@ -76,43 +128,60 @@ public class ProcessingRateSkewDetectorTest {
     HealthPolicyConfig config = mock(HealthPolicyConfig.class);
     when(config.getConfig(CONF_SKEW_RATIO, 1.5)).thenReturn(2.5);
 
-    ComponentMetrics compMetrics1 = new ComponentMetrics("bolt-1");
-    compMetrics1.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics1.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
-    ComponentMetrics compMetrics2 = new ComponentMetrics("bolt-2");
-    compMetrics2.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics2.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 200));
-
-    ComponentMetrics compMetrics3 = new ComponentMetrics("bolt-3");
-    compMetrics3.addInstanceMetric(new InstanceMetrics("i1", METRIC_EXE_COUNT.text(), 1000));
-    compMetrics3.addInstanceMetric(new InstanceMetrics("i2", METRIC_EXE_COUNT.text(), 500));
-
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt-1", compMetrics1);
-    topologyMetrics.put("bolt-2", compMetrics2);
-    topologyMetrics.put("bolt-3", compMetrics3);
-
-    ExecuteCountSensor sensor = mock(ExecuteCountSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_EXE_COUNT.text());
-
-    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
-
-    assertEquals(2, symptoms.size());
-    for (Symptom symptom : symptoms) {
-      if (symptom.getComponent().getName().equals("bolt-1")) {
-        compMetrics1 = null;
-      } else if (symptom.getComponent().getName().equals("bolt-2")) {
-        compMetrics2 = null;
-      } else if (symptom.getComponent().getName().equals("bolt-3")) {
-        compMetrics3 = null;
-      }
-    }
-
-    assertNull(compMetrics1);
-    assertNull(compMetrics2);
-    assertNotNull(compMetrics3);
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+
+    Measurement measurement3
+        = new Measurement("bolt2", "i3", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement4
+        = new Measurement("bolt2", "i4", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 200.0);
+
+
+    Measurement measurement5
+        = new Measurement("bolt3", "i5", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 1000);
+    Measurement measurement6
+        = new Measurement("bolt3", "i6", METRIC_EXE_COUNT.text(), Instant.ofEpochSecond
+        (1497892222), 500.0);
+
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+    metrics.add(measurement3);
+    metrics.add(measurement4);
+    metrics.add(measurement5);
+    metrics.add(measurement6);
+
+    ProcessingRateSkewDetector detector = new ProcessingRateSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(6, symptoms.size());
+
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(2, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(2, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i3").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i2").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_PROCESSING_RATE_SKEW).assignment("i4").size());
+
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
deleted file mode 100644
index a22e303..0000000
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueDisparityDetectorTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-// Copyright 2016 Twitter. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package com.twitter.heron.healthmgr.detectors;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-
-import org.junit.Test;
-
-import com.twitter.heron.healthmgr.HealthPolicyConfig;
-import com.twitter.heron.healthmgr.sensors.BufferSizeSensor;
-
-import static com.twitter.heron.healthmgr.detectors.WaitQueueDisparityDetector.CONF_DISPARITY_RATIO;
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BUFFER_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class WaitQueueDisparityDetectorTest {
-  @Test
-  public void testConfigAndFilter() {
-    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
-    when(config.getConfig(CONF_DISPARITY_RATIO, 20.0)).thenReturn(15.0);
-
-    ComponentMetrics compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1501));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 100));
-
-    Map<String, ComponentMetrics> topologyMetrics = new HashMap<>();
-    topologyMetrics.put("bolt", compMetrics);
-
-    BufferSizeSensor sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-    when(sensor.getMetricName()).thenReturn(METRIC_BUFFER_SIZE.text());
-
-    WaitQueueDisparityDetector detector = new WaitQueueDisparityDetector(sensor, config);
-    List<Symptom> symptoms = detector.detect();
-
-    assertEquals(1, symptoms.size());
-
-    compMetrics = new ComponentMetrics("bolt");
-    compMetrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BUFFER_SIZE.text(), 1500));
-    compMetrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BUFFER_SIZE.text(), 110));
-    topologyMetrics.put("bolt", compMetrics);
-
-    sensor = mock(BufferSizeSensor.class);
-    when(sensor.get()).thenReturn(topologyMetrics);
-
-    detector = new WaitQueueDisparityDetector(sensor, config);
-    symptoms = detector.detect();
-
-    assertEquals(0, symptoms.size());
-  }
-}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
new file mode 100644
index 0000000..e73d1c6
--- /dev/null
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/detectors/WaitQueueSkewDetectorTest.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Twitter. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package com.twitter.heron.healthmgr.detectors;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.core.SymptomsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+
+import org.junit.Test;
+
+import com.twitter.heron.healthmgr.HealthPolicyConfig;
+
+import static com.twitter.heron.healthmgr.detectors.WaitQueueSkewDetector.CONF_SKEW_RATIO;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class WaitQueueSkewDetectorTest {
+  @Test
+  public void testConfigAndFilter() {
+    HealthPolicyConfig config = mock(HealthPolicyConfig.class);
+    when(config.getConfig(CONF_SKEW_RATIO, 20.0)).thenReturn(15.0);
+
+    Measurement measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 1501);
+    Measurement measurement2
+        = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 100.0);
+
+    Collection<Measurement> metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    WaitQueueSkewDetector detector = new WaitQueueSkewDetector(config);
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    detector.initialize(context);
+    Collection<Symptom> symptoms = detector.detect(metrics);
+
+    assertEquals(3, symptoms.size());
+    SymptomsTable symptomsTable = SymptomsTable.of(symptoms);
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).size());
+    assertEquals(1, symptomsTable.type("POSITIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i1").size());
+    assertEquals(1, symptomsTable.type("NEGATIVE "+ BaseDetector.SymptomType
+        .SYMPTOM_WAIT_Q_SIZE_SKEW).assignment("i2").size());
+
+
+     measurement1
+        = new Measurement("bolt", "i1", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 1500);
+     measurement2
+        = new Measurement("bolt", "i2", METRIC_WAIT_Q_SIZE.text(), Instant.ofEpochSecond
+        (1497892222), 110.0);
+
+    metrics = new ArrayList<>();
+    metrics.add(measurement1);
+    metrics.add(measurement2);
+
+    detector = new WaitQueueSkewDetector(config);
+    detector.initialize(context);
+    symptoms = detector.detect(metrics);
+
+    assertEquals(0, symptoms.size());
+  }
+}
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
index 8a1b481..3602548 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/DataSkewDiagnoserTest.java
@@ -14,62 +14,122 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.DIAGNOSIS_DATA_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_DATA_SKEW;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class DataSkewDiagnoserTest {
+  private final String comp = "comp";
+  private Instant now = Instant.now();
+  private Collection<Measurement> measurements = new ArrayList<>();
+  private ExecutionContext context;
+  private IDiagnoser diagnoser;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+    measurements = new ArrayList<>();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new DataSkewDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
   public void failsIfNoDataSkewSymptom() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123);
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosis1DataSkewInstance() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
-
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNotNull(result);
-    assertEquals(DIAGNOSIS_DATA_SKEW.text(), result.getName());
-    assertEquals(1, result.getSymptoms().size());
-    Symptom symptom = result.getSymptoms().values().iterator().next();
-
-    assertEquals(123, symptom.getComponent()
-        .getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text()).intValue());
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_DATA_SKEW.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals("i1", diagnoses.assignments().iterator().next());
+    // TODO
+//    assertEquals(1, diagnoses.symptoms().size());
   }
 
   @Test
   public void diagnosisNoDataSkewLowBufferSize() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(5000, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1, 500, 500));
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 5000, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 1, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
 
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosisNoDataSkewLowRate() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(100, 2000, 2000));
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(10000, 500, 500));
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_EXE_COUNT, 100, 2000, 2000);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 10000, 500, 500);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom skewSymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, skewSymptom, qDisparitySymptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
+  }
 
-    Diagnosis result = new DataSkewDiagnoser().diagnose(symptoms);
-    assertNull(result);
+  private void addMeasurements(MetricName metricExeCount, int i1, int i2, int i3) {
+    measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+    measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+    measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
index 50aab9d..cb6a3cb 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/SlowInstanceDiagnoserTest.java
@@ -14,47 +14,99 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
+import com.twitter.heron.healthmgr.sensors.BaseSensor;
 
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_SLOW_INSTANCE;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class SlowInstanceDiagnoserTest {
+  private final String comp = "comp";
+  private IDiagnoser diagnoser;
+  private Instant now = Instant.now();
+  private Collection<Measurement> measurements = new ArrayList<>();
+  private ExecutionContext context;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+    measurements = new ArrayList<>();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new SlowInstanceDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
-  public void failsIfNoBufferSizeDiaparity() {
-    SlowInstanceDiagnoser diagnoser = new SlowInstanceDiagnoser();
-    Diagnosis result = diagnoser.diagnose(TestUtils.createBpSymptomList(123));
-    assertNull(result);
+  public void failsIfNoBufferSizeDisparity() {
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), Instant.now(), null);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosis1of3SlowInstances() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(1000, 20, 20));
-
-    Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
-    assertEquals(1, result.getSymptoms().size());
-    ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
-    assertEquals(123,
-        data.getMetricValueSum("container_1_bolt_0",METRIC_BACK_PRESSURE.text())
-            .intValue());
+    addMeasurements(METRIC_BACK_PRESSURE, 123, 0, 0);
+    addMeasurements(METRIC_WAIT_Q_SIZE, 1000, 20, 20);
+    when(context.measurements()).thenReturn(MeasurementsTable.of(measurements));
+
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_SLOW_INSTANCE.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals("i1", diagnoses.assignments().iterator().next());
+    // TODO
+//    assertEquals(1, diagnoses.symptoms().size());
   }
 
   @Test
   public void failIfInstanceWithBpHasSmallBuffer() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Symptom exeDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom, exeDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
+  }
 
-    Diagnosis result = new SlowInstanceDiagnoser().diagnose(symptoms);
-    assertNull(result);
+  private void addMeasurements(BaseSensor.MetricName metricExeCount, int i1, int i2, int i3) {
+    measurements.add(new Measurement(comp, "i1", metricExeCount.text(), now, i1));
+    measurements.add(new Measurement(comp, "i2", metricExeCount.text(), now, i2));
+    measurements.add(new Measurement(comp, "i3", metricExeCount.text(), now, i3));
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
index 0355308..e180ae6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/diagnosers/UnderProvisioningDiagnoserTest.java
@@ -14,51 +14,83 @@
 
 package com.twitter.heron.healthmgr.diagnosers;
 
-import java.util.List;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.api.IDiagnoser;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Symptom;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
+import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.heron.healthmgr.TestUtils;
-
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_COMP_BACK_PRESSURE;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_PROCESSING_RATE_SKEW;
+import static com.twitter.heron.healthmgr.detectors.BaseDetector.SymptomType.SYMPTOM_WAIT_Q_SIZE_SKEW;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class UnderProvisioningDiagnoserTest {
+  private final String comp = "comp";
+  private IDiagnoser diagnoser;
+  private Instant now = Instant.now();
+  private ExecutionContext context;
+
+  @Before
+  public void initTestData() {
+    now = Instant.now();
+
+    context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+
+    diagnoser = new UnderProvisioningDiagnoser();
+    diagnoser.initialize(context);
+  }
+
   @Test
   public void diagnosisWhen1Of1InstanceInBP() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0);
-    //symptoms.add(TestUtils.createLargeWaitQSymptom(5000));
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom symptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Collection<Symptom> symptoms = Collections.singletonList(symptom);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
     validateDiagnosis(result);
   }
 
   @Test
   public void diagnosisFailsNotSimilarQueueSizes() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createWaitQueueDisparitySymptom(100, 500, 500));
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_WAIT_Q_SIZE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
+
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
   @Test
   public void diagnosisFailsNotSimilarProcessingRates() {
-    List<Symptom> symptoms = TestUtils.createBpSymptomList(123, 0, 0);
-    symptoms.add(TestUtils.createExeCountSymptom(100, 500, 500));
+    // TODO BP instance should be same as the one with high processing rate
+    Collection<String> assign = Collections.singleton(comp);
+    Symptom bpSymptom = new Symptom(SYMPTOM_COMP_BACK_PRESSURE.text(), now, assign);
+    Symptom qDisparitySymptom = new Symptom(SYMPTOM_PROCESSING_RATE_SKEW.text(), now, assign);
+    Collection<Symptom> symptoms = Arrays.asList(bpSymptom, qDisparitySymptom);
 
-    Diagnosis result = new UnderProvisioningDiagnoser().diagnose(symptoms);
-    assertNull(result);
+    Collection<Diagnosis> result = diagnoser.diagnose(symptoms);
+    assertEquals(0, result.size());
   }
 
-  private void validateDiagnosis(Diagnosis result) {
-    assertEquals(1, result.getSymptoms().size());
-    ComponentMetrics data = result.getSymptoms().values().iterator().next().getComponent();
-    assertEquals(123,
-        data.getMetricValueSum("container_1_bolt_0", METRIC_BACK_PRESSURE.text())
-            .intValue());
+  private void validateDiagnosis(Collection<Diagnosis> result) {
+    assertEquals(1, result.size());
+    Diagnosis diagnoses = result.iterator().next();
+    assertEquals(DIAGNOSIS_UNDER_PROVISIONING.text(), diagnoses.type());
+    assertEquals(1, diagnoses.assignments().size());
+    assertEquals(comp, diagnoses.assignments().iterator().next());
+    // TODO
+//    Assert.assertEquals(1, result.getSymptoms().size());
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
index 91c634f..6bceaf1 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/resolvers/ScaleUpResolverTest.java
@@ -14,21 +14,26 @@
 
 package com.twitter.heron.healthmgr.resolvers;
 
+import java.time.Instant;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.microsoft.dhalion.detector.Symptom;
-import com.microsoft.dhalion.diagnoser.Diagnosis;
+import com.microsoft.dhalion.core.Action;
+import com.microsoft.dhalion.core.Diagnosis;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 import com.microsoft.dhalion.events.EventManager;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
-import com.microsoft.dhalion.resolver.Action;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.twitter.heron.api.generated.TopologyAPI;
+import com.twitter.heron.common.utils.topology.TopologyTests;
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 import com.twitter.heron.packing.roundrobin.RoundRobinPacking;
@@ -38,9 +43,8 @@ import com.twitter.heron.spi.common.Config;
 import com.twitter.heron.spi.common.Key;
 import com.twitter.heron.spi.packing.IRepacking;
 import com.twitter.heron.spi.packing.PackingPlan;
-import com.twitter.heron.common.utils.topology.TopologyTests;
 
-import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisName.SYMPTOM_UNDER_PROVISIONING;
+import static com.twitter.heron.healthmgr.diagnosers.BaseDiagnoser.DiagnosisType.DIAGNOSIS_UNDER_PROVISIONING;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
@@ -67,20 +71,26 @@ public class ScaleUpResolverTest {
     ISchedulerClient scheduler = mock(ISchedulerClient.class);
     when(scheduler.updateTopology(any(UpdateTopologyRequest.class))).thenReturn(true);
 
-    ComponentMetrics metrics
-        = new ComponentMetrics("bolt", "i1", METRIC_BACK_PRESSURE.text(), 123);
-    Symptom symptom = new Symptom(SYMPTOM_UNDER_PROVISIONING.text(), metrics);
-    List<Diagnosis> diagnosis = new ArrayList<>();
-    diagnosis.add(new Diagnosis("test", symptom));
+    Instant now = Instant.now();
+    Collections.singletonList(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 123));
+
+    List<String> assignments = Collections.singletonList("bolt");
+    Diagnosis diagnoses =
+        new Diagnosis(DIAGNOSIS_UNDER_PROVISIONING.text(), now, assignments, null);
+    List<Diagnosis> diagnosis = Collections.singletonList(diagnoses);
+
+    ExecutionContext context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
 
     ScaleUpResolver resolver
         = new ScaleUpResolver(null, packingPlanProvider, scheduler, eventManager, null);
+    resolver.initialize(context);
     ScaleUpResolver spyResolver = spy(resolver);
 
-    doReturn(2).when(spyResolver).computeScaleUpFactor(metrics);
+    doReturn(2).when(spyResolver).computeScaleUpFactor("bolt");
     doReturn(currentPlan).when(spyResolver).buildNewPackingPlan(any(HashMap.class), eq(currentPlan));
 
-    List<Action> result = spyResolver.resolve(diagnosis);
+    Collection<Action> result = spyResolver.resolve(diagnosis);
     verify(scheduler, times(1)).updateTopology(any(UpdateTopologyRequest.class));
     assertEquals(1, result.size());
   }
@@ -141,28 +151,35 @@ public class ScaleUpResolverTest {
 
   @Test
   public void testScaleUpFactorComputation() {
-    ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+    Instant now = Instant.now();
+    Collection<Measurement> result = new ArrayList<>();
 
-    ComponentMetrics metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 500));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
+    ExecutionContext context = Mockito.mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    when(context.previousCheckpoint()).thenReturn(now);
 
-    int result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(4, result);
-
-    metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 750));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 0));
-
-    result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(8, result);
-
-    metrics = new ComponentMetrics("bolt");
-    metrics.addInstanceMetric(new InstanceMetrics("i1", METRIC_BACK_PRESSURE.text(), 400));
-    metrics.addInstanceMetric(new InstanceMetrics("i2", METRIC_BACK_PRESSURE.text(), 100));
-    metrics.addInstanceMetric(new InstanceMetrics("i3", METRIC_BACK_PRESSURE.text(), 0));
-
-    result = resolver.computeScaleUpFactor(metrics);
-    assertEquals(6, result);
+    ScaleUpResolver resolver = new ScaleUpResolver(null, null, null, eventManager, null);
+    resolver.initialize(context);
+
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 500));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    int factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(4, factor);
+
+    result.clear();
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 750));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(8, factor);
+
+    result.clear();
+    result.add(new Measurement("bolt", "i1", METRIC_BACK_PRESSURE.text(), now, 400));
+    result.add(new Measurement("bolt", "i2", METRIC_BACK_PRESSURE.text(), now, 100));
+    result.add(new Measurement("bolt", "i3", METRIC_BACK_PRESSURE.text(), now, 0));
+    when(context.measurements()).thenReturn(MeasurementsTable.of(result));
+    factor = resolver.computeScaleUpFactor("bolt");
+    assertEquals(6, factor);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
index 75256f0..9091175 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BackPressureSensorTest.java
@@ -14,19 +14,27 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
+import com.microsoft.dhalion.policy.PoliciesExecutor.ExecutionContext;
 
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
-import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_BACK_PRESSURE;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -49,27 +57,44 @@ public class BackPressureSensorTest {
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
     for (String boltId : boltIds) {
-      String metric = MetricName.METRIC_BACK_PRESSURE + boltId;
+      String metric = METRIC_BACK_PRESSURE + boltId;
       // the back pressure sensor will return average bp per second, so multiply by duration
-      BufferSizeSensorTest.registerStMgrInstanceMetricResponse(metricsProvider,
+      registerStMgrInstanceMetricResponse(metricsProvider,
           metric,
           boltId.length() * DEFAULT_METRIC_DURATION.getSeconds());
     }
 
+
     BackPressureSensor backPressureSensor =
         new BackPressureSensor(packingPlanProvider, topologyProvider, null, metricsProvider);
 
-    Map<String, ComponentMetrics> componentMetrics = backPressureSensor.get();
-    assertEquals(2, componentMetrics.size());
+    ExecutionContext context = mock(ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    backPressureSensor.initialize(context);
+
+    Collection<Measurement> componentMetrics = backPressureSensor.fetch();
+    assertEquals(3, componentMetrics.size());
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(1, table.component("bolt-1").size());
+    assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+
+    assertEquals(2, table.component("bolt-2").size());
+    assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+    assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+        .type(METRIC_BACK_PRESSURE.text()).sum(), 0.01);
+  }
 
-    assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
-    assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+  static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
+                                                  String metric,
+                                                  long value) {
+    Instant instant = Instant.ofEpochSecond(10);
+    Measurement measurement = new Measurement("__stmgr__", "stmgr-1", metric, instant, value);
+    Collection<Measurement> result = Collections.singletonList(measurement);
 
-    assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
-    assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
-    assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
-        .getMetricValueSum(MetricName.METRIC_BACK_PRESSURE.text()).intValue());
+    when(metricsProvider.getMeasurements(
+        any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(metric), eq("__stmgr__")))
+        .thenReturn(result);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
index 22f62fe..b395d9c 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/BufferSizeSensorTest.java
@@ -14,12 +14,13 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.Collection;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
@@ -27,7 +28,7 @@ import com.twitter.heron.healthmgr.common.PackingPlanProvider;
 import com.twitter.heron.healthmgr.common.TopologyProvider;
 import com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName;
 
-import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
+import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_WAIT_Q_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -51,39 +52,31 @@ public class BufferSizeSensorTest {
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
     for (String boltId : boltIds) {
-      String metric = MetricName.METRIC_BUFFER_SIZE
-          + boltId + MetricName.METRIC_BUFFER_SIZE_SUFFIX;
-      registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
+      String metric = METRIC_WAIT_Q_SIZE
+          + boltId + MetricName.METRIC_WAIT_Q_SIZE_SUFFIX;
+      BackPressureSensorTest
+          .registerStMgrInstanceMetricResponse(metricsProvider, metric, boltId.length());
     }
 
     BufferSizeSensor bufferSizeSensor =
         new BufferSizeSensor(null, packingPlanProvider, topologyProvider, metricsProvider);
 
-    Map<String, ComponentMetrics> componentMetrics = bufferSizeSensor.get();
-    assertEquals(2, componentMetrics.size());
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(Instant.now());
+    bufferSizeSensor.initialize(context);
 
-    assertEquals(1, componentMetrics.get("bolt-1").getMetrics().size());
-    assertEquals(boltIds[0].length(), componentMetrics.get("bolt-1").getMetrics(boltIds[0])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
+    Collection<Measurement> componentMetrics = bufferSizeSensor.fetch();
+    assertEquals(3, componentMetrics.size());
 
-    assertEquals(2, componentMetrics.get("bolt-2").getMetrics().size());
-    assertEquals(boltIds[1].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[1])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
-    assertEquals(boltIds[2].length(), componentMetrics.get("bolt-2").getMetrics(boltIds[2])
-        .getMetricValueSum(MetricName.METRIC_BUFFER_SIZE.text()).intValue());
-  }
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(1, table.component("bolt-1").size());
+    assertEquals(boltIds[0].length(), table.component("bolt-1").instance(boltIds[0])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
 
-  static void registerStMgrInstanceMetricResponse(MetricsProvider metricsProvider,
-                                                  String metric,
-                                                  long value) {
-    Map<String, ComponentMetrics> result = new HashMap<>();
-    ComponentMetrics metrics = new ComponentMetrics("__stmgr__");
-    InstanceMetrics instanceMetrics = new InstanceMetrics("stmgr-1");
-    instanceMetrics.addMetric(metric, value);
-    metrics.addInstanceMetric(instanceMetrics);
-    result.put("__stmgr__", metrics);
-
-    when(metricsProvider.getComponentMetrics(metric, DEFAULT_METRIC_DURATION, "__stmgr__"))
-        .thenReturn(result);
+    assertEquals(2, table.component("bolt-2").size());
+    assertEquals(boltIds[1].length(), table.component("bolt-2").instance(boltIds[1])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
+    assertEquals(boltIds[2].length(), table.component("bolt-2").instance(boltIds[2])
+        .type(METRIC_WAIT_Q_SIZE.text()).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
index 2f5a6cc..88fcc03 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/ExecuteCountSensorTest.java
@@ -14,12 +14,16 @@
 
 package com.twitter.heron.healthmgr.sensors;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
 import com.microsoft.dhalion.api.MetricsProvider;
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+import com.microsoft.dhalion.policy.PoliciesExecutor;
 
 import org.junit.Test;
 
@@ -28,54 +32,48 @@ import com.twitter.heron.healthmgr.common.TopologyProvider;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.DEFAULT_METRIC_DURATION;
 import static com.twitter.heron.healthmgr.sensors.BaseSensor.MetricName.METRIC_EXE_COUNT;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ExecuteCountSensorTest {
   @Test
   public void providesBoltExecutionCountMetrics() {
+    Instant now = Instant.now();
+    String metric = METRIC_EXE_COUNT.text();
     TopologyProvider topologyProvider = mock(TopologyProvider.class);
     when(topologyProvider.getBoltNames()).thenReturn(new String[]{"bolt-1", "bolt-2"});
 
     MetricsProvider metricsProvider = mock(MetricsProvider.class);
 
-    Map<String, ComponentMetrics> result = new HashMap<>();
+    Collection<Measurement> result = new ArrayList<>();
+    result.add(new Measurement("bolt-1", "container_1_bolt-1_1", metric, now, 123));
+    result.add(new Measurement("bolt-1", "container_1_bolt-1_2", metric, now, 345));
+    result.add(new Measurement("bolt-2", "container_1_bolt-2_3", metric, now, 321));
+    result.add(new Measurement("bolt-2", "container_1_bolt-2_4", metric, now, 543));
 
-    ComponentMetrics metrics = new ComponentMetrics("bolt-1");
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_1", 123));
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-1_2", 345));
-    result.put("bolt-1", metrics);
-
-    metrics = new ComponentMetrics("bolt-2");
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_3", 321));
-    metrics.addInstanceMetric(createTestInstanceMetric("container_1_bolt-2_4", 543));
-    result.put("bolt-2", metrics);
-
-    when(metricsProvider
-        .getComponentMetrics(METRIC_EXE_COUNT.text(), DEFAULT_METRIC_DURATION, "bolt-1", "bolt-2"))
+    Collection<String> comps = Arrays.asList("bolt-1", "bolt-2");
+    when(metricsProvider.getMeasurements(
+        any(Instant.class), eq(DEFAULT_METRIC_DURATION), eq(Collections.singletonList(metric)), eq(comps)))
         .thenReturn(result);
 
     ExecuteCountSensor executeCountSensor
         = new ExecuteCountSensor(topologyProvider, null, metricsProvider);
-    Map<String, ComponentMetrics> componentMetrics = executeCountSensor.get();
-    assertEquals(2, componentMetrics.size());
-    assertEquals(123, componentMetrics.get("bolt-1")
-        .getMetrics("container_1_bolt-1_1")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(345, componentMetrics.get("bolt-1")
-        .getMetrics("container_1_bolt-1_2")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(321, componentMetrics.get("bolt-2")
-        .getMetrics("container_1_bolt-2_3")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-    assertEquals(543, componentMetrics.get("bolt-2")
-        .getMetrics("container_1_bolt-2_4")
-        .getMetricValueSum(METRIC_EXE_COUNT.text()).intValue());
-  }
+    PoliciesExecutor.ExecutionContext context = mock(PoliciesExecutor.ExecutionContext.class);
+    when(context.checkpoint()).thenReturn(now);
+    executeCountSensor.initialize(context);
 
-  private InstanceMetrics createTestInstanceMetric(String name, int value) {
-    InstanceMetrics instanceMetrics = new InstanceMetrics(name);
-    instanceMetrics.addMetric(METRIC_EXE_COUNT.text(), value);
-    return instanceMetrics;
+    Collection<Measurement> componentMetrics = executeCountSensor.fetch();
+    assertEquals(4, componentMetrics.size());
+    MeasurementsTable table = MeasurementsTable.of(componentMetrics);
+    assertEquals(123, table.component("bolt-1").instance("container_1_bolt-1_1")
+        .type(metric).sum(), 0.01);
+    assertEquals(345, table.component("bolt-1").instance("container_1_bolt-1_2")
+        .type(metric).sum(), 0.01);
+    assertEquals(321, table.component("bolt-2").instance("container_1_bolt-2_3")
+        .type(metric).sum(), 0.01);
+    assertEquals(543, table.component("bolt-2").instance("container_1_bolt-2_4")
+        .type(metric).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
index ea087c3..60f7ef6 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/MetricsCacheMetricsProviderTest.java
@@ -17,11 +17,16 @@ package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
+
+import org.junit.Test;
+import org.mockito.Mockito;
 
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
 import com.twitter.heron.proto.system.Common.Status;
 import com.twitter.heron.proto.system.Common.StatusCode;
 import com.twitter.heron.proto.tmaster.TopologyMaster;
@@ -32,18 +37,14 @@ import com.twitter.heron.proto.tmaster.TopologyMaster.MetricResponse.TaskMetric;
 import com.twitter.heron.proto.tmaster.TopologyMaster.MetricsCacheLocation;
 import com.twitter.heron.spi.statemgr.SchedulerStateManagerAdaptor;
 
-import org.junit.Test;
-import org.mockito.Mockito;
-
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 public class MetricsCacheMetricsProviderTest {
   @Test
-  public void provides1Comp2InstanceMetricsFromeMetricsCache() {
+  public void provides1Comp2InstanceMetricsFromMetricsCache() {
     MetricsCacheMetricsProvider spyMetricsProvider = createMetricsProviderSpy();
 
     String metric = "count";
@@ -83,18 +84,20 @@ public class MetricsCacheMetricsProviderTest {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(2, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(104,
-        componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
-    assertEquals(17,
-        componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+    assertEquals(2, table.uniqueInstances().size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(1, table.instance("container_1_bolt_1").size());
+    assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+    assertEquals(3, table.instance("container_1_bolt_2").size());
+    assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
   }
 
   @Test
@@ -146,19 +149,22 @@ public class MetricsCacheMetricsProviderTest {
     doReturn(response2).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
-    assertEquals(2, metrics.size());
-    assertNotNull(metrics.get(comp1));
-    assertEquals(1, metrics.get(comp1).getMetrics().size());
-    assertEquals(104,
-        metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
-    assertNotNull(metrics.get(comp2));
-    assertEquals(1, metrics.get(comp2).getMetrics().size());
-    assertEquals(17,
-        metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            Collections.singletonList(metric),
+            Arrays.asList(comp1, comp2));
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(2, table.uniqueComponents().size());
+    assertEquals(1, table.component(comp1).size());
+    assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+    assertEquals(3, table.component(comp2).size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+    assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
   }
 
   @Test
@@ -182,15 +188,16 @@ public class MetricsCacheMetricsProviderTest {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
     assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(1, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(1, table.component(comp).size());
+    assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
   }
 
   @Test
@@ -205,12 +212,13 @@ public class MetricsCacheMetricsProviderTest {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(0, metrics.get(comp).getMetrics().size());
+    assertEquals(0, metrics.size());
   }
 
   private MetricsCacheMetricsProvider createMetricsProviderSpy() {
@@ -228,9 +236,7 @@ public class MetricsCacheMetricsProviderTest {
     MetricsCacheMetricsProvider metricsProvider
         = new MetricsCacheMetricsProvider(stateMgr, "testTopo");
 
-    MetricsCacheMetricsProvider spyMetricsProvider = spy(metricsProvider);
-    spyMetricsProvider.setClock(new TestClock(70000));
-    return spyMetricsProvider;
+    return spy(metricsProvider);
   }
 
   @Test
@@ -274,44 +280,24 @@ public class MetricsCacheMetricsProviderTest {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromMetricsCache(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider
-            .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    ComponentMetrics componentMetrics = metrics.get(comp);
-    assertNotNull(componentMetrics);
-    assertEquals(2, componentMetrics.getMetrics().size());
-
-    InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(1, metricValues.size());
-    assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
-    instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(3, metricValues.size());
-    assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
-    assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
-    assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
-  }
-
-  private class TestClock extends MetricsCacheMetricsProvider.Clock {
-    long timeStamp;
-
-    TestClock(long timeStamp) {
-      this.timeStamp = timeStamp;
-    }
-
-    @Override
-    long currentTime() {
-      return timeStamp;
-    }
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+
+    MeasurementsTable result = table.instance("container_1_bolt_1");
+    assertEquals(1, result.size());
+    assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+    result = table.instance("container_1_bolt_2");
+    assertEquals(3, result.size());
+    assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+    assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+    assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
   }
 }
diff --git a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
index 01c71d2..2e7625f 100644
--- a/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
+++ b/heron/healthmgr/tests/java/com/twitter/heron/healthmgr/sensors/TrackerMetricsProviderTest.java
@@ -17,16 +17,16 @@ package com.twitter.heron.healthmgr.sensors;
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 
-import com.microsoft.dhalion.metrics.ComponentMetrics;
-import com.microsoft.dhalion.metrics.InstanceMetrics;
+import com.microsoft.dhalion.core.Measurement;
+import com.microsoft.dhalion.core.MeasurementsTable;
 
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 
@@ -48,18 +48,21 @@ public class TrackerMetricsProviderTest {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(2, metrics.get(comp).getMetrics().size());
-
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(104,
-        componentMetrics.get("container_1_bolt_1").getMetricValueSum(metric).intValue());
-    assertEquals(17,
-        componentMetrics.get("container_1_bolt_2").getMetricValueSum(metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+    assertEquals(2, table.uniqueInstances().size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(1, table.instance("container_1_bolt_1").size());
+    assertEquals(104, table.instance("container_1_bolt_1").sum(), 0.01);
+    assertEquals(3, table.instance("container_1_bolt_2").size());
+    assertEquals(17, table.instance("container_1_bolt_2").sum(), 0.01);
   }
 
   @Test
@@ -88,19 +91,22 @@ public class TrackerMetricsProviderTest {
     doReturn(response2).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp2, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp1, comp2);
-
-    assertEquals(2, metrics.size());
-    assertNotNull(metrics.get(comp1));
-    assertEquals(1, metrics.get(comp1).getMetrics().size());
-    assertEquals(104,
-        metrics.get(comp1).getMetricValueSum("container_1_bolt-1_2", metric).intValue());
-
-    assertNotNull(metrics.get(comp2));
-    assertEquals(1, metrics.get(comp2).getMetrics().size());
-    assertEquals(17,
-        metrics.get(comp2).getMetricValueSum("container_1_bolt-2_1", metric).intValue());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            Collections.singletonList(metric),
+            Arrays.asList(comp1, comp2));
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(2, table.uniqueComponents().size());
+    assertEquals(1, table.component(comp1).size());
+    assertEquals(104, table.instance("container_1_bolt-1_2").sum(), 0.01);
+
+    assertEquals(3, table.component(comp2).size());
+    assertEquals(1, table.uniqueTypes().size());
+    assertEquals(3, table.type(metric).instance("container_1_bolt-2_1").size());
+    assertEquals(17, table.instance("container_1_bolt-2_1").sum(), 0.01);
   }
 
   @Test
@@ -118,15 +124,17 @@ public class TrackerMetricsProviderTest {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(1, metrics.get(comp).getMetrics().size());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
 
-    HashMap<String, InstanceMetrics> componentMetrics = metrics.get(comp).getMetrics();
-    assertEquals(601, componentMetrics.get("stmgr-1").getMetricValueSum(metric).intValue());
+    assertEquals(1, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(1, table.component(comp).size());
+    assertEquals(601, table.instance("stmgr-1").type(metric).sum(), 0.01);
   }
 
   @Test
@@ -141,12 +149,14 @@ public class TrackerMetricsProviderTest {
 
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
-    Map<String, ComponentMetrics> metrics
-        = spyMetricsProvider.getComponentMetrics(metric, Duration.ofSeconds(60), comp);
 
-    assertEquals(1, metrics.size());
-    assertNotNull(metrics.get(comp));
-    assertEquals(0, metrics.get(comp).getMetrics().size());
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(0, metrics.size());
   }
 
   private TrackerMetricsProvider createMetricsProviderSpy() {
@@ -154,7 +164,6 @@ public class TrackerMetricsProviderTest {
         = new TrackerMetricsProvider("127.0.0.1", "topology", "dev", "env");
 
     TrackerMetricsProvider spyMetricsProvider = spy(metricsProvider);
-    spyMetricsProvider.setClock(new TestClock(70000));
     return spyMetricsProvider;
   }
 
@@ -175,44 +184,24 @@ public class TrackerMetricsProviderTest {
     doReturn(response).when(spyMetricsProvider)
         .getMetricsFromTracker(metric, comp, Instant.ofEpochSecond(10), Duration.ofSeconds(60));
 
-    Map<String, ComponentMetrics> metrics =
-        spyMetricsProvider
-            .getComponentMetrics(metric, Instant.ofEpochSecond(10), Duration.ofSeconds(60), comp);
-
-    assertEquals(1, metrics.size());
-    ComponentMetrics componentMetrics = metrics.get(comp);
-    assertNotNull(componentMetrics);
-    assertEquals(2, componentMetrics.getMetrics().size());
-
-    InstanceMetrics instanceMetrics = componentMetrics.getMetrics("container_1_bolt_1");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    Map<Instant, Double> metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(1, metricValues.size());
-    assertEquals(104, metricValues.get(Instant.ofEpochSecond(1497481288)).intValue());
-
-    instanceMetrics = componentMetrics.getMetrics("container_1_bolt_2");
-    assertNotNull(instanceMetrics);
-    assertEquals(1, instanceMetrics.getMetrics().size());
-
-    metricValues = instanceMetrics.getMetrics().get(metric);
-    assertEquals(3, metricValues.size());
-    assertEquals(12, metricValues.get(Instant.ofEpochSecond(1497481228L)).intValue());
-    assertEquals(2, metricValues.get(Instant.ofEpochSecond(1497481348L)).intValue());
-    assertEquals(3, metricValues.get(Instant.ofEpochSecond(1497481168L)).intValue());
-  }
-
-  private class TestClock extends TrackerMetricsProvider.Clock {
-    long timeStamp;
-
-    TestClock(long timeStamp) {
-      this.timeStamp = timeStamp;
-    }
-
-    @Override
-    long currentTime() {
-      return timeStamp;
-    }
+    Collection<Measurement> metrics =
+        spyMetricsProvider.getMeasurements(Instant.ofEpochSecond(10),
+            Duration.ofSeconds(60),
+            metric,
+            comp);
+
+    assertEquals(4, metrics.size());
+    MeasurementsTable table = MeasurementsTable.of(metrics);
+    assertEquals(4, table.component(comp).size());
+
+    MeasurementsTable result = table.instance("container_1_bolt_1");
+    assertEquals(1, result.size());
+    assertEquals(104, result.instant(Instant.ofEpochSecond(1497481288)).sum(), 0.01);
+
+    result = table.instance("container_1_bolt_2");
+    assertEquals(3, result.size());
+    assertEquals(12, result.instant(Instant.ofEpochSecond(1497481228L)).sum(), 0.01);
+    assertEquals(2, result.instant(Instant.ofEpochSecond(1497481348L)).sum(), 0.01);
+    assertEquals(3, result.instant(Instant.ofEpochSecond(1497481168L)).sum(), 0.01);
   }
 }

-- 
To stop receiving notification emails like this one, please contact
ashvin@apache.org.