You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/09/21 18:30:00 UTC

[storm] branch master updated: STORM-3694 allow reporting V2 metrics with dimensions and short names (#3329)

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 39a6fba  STORM-3694 allow reporting V2 metrics with dimensions and short names (#3329)
39a6fba is described below

commit 39a6fba632a70f1d4684489b5b4dc6cb98a34662
Author: agresch <ag...@gmail.com>
AuthorDate: Mon Sep 21 13:28:49 2020 -0500

    STORM-3694 allow reporting V2 metrics with dimensions and short names (#3329)
---
 docs/metrics_v2.md                                 |   6 +-
 .../jvm/org/apache/storm/daemon/worker/Worker.java |   2 +-
 .../apache/storm/metrics2/DimensionalReporter.java |  97 ++++++++++++++
 ...rmReporter.java => MetricRegistryProvider.java} |  15 +--
 .../apache/storm/metrics2/StormMetricRegistry.java | 146 ++++++++++++++++-----
 .../storm/metrics2/TaskMetricDimensions.java       |  71 ++++++++++
 .../org/apache/storm/metrics2/TaskMetricRepo.java  |  94 +++++++++++++
 .../metrics2/reporters/ConsoleStormReporter.java   |  47 ++++++-
 .../metrics2/reporters/ScheduledStormReporter.java |   4 +
 .../storm/metrics2/reporters/StormReporter.java    |   9 ++
 10 files changed, 441 insertions(+), 50 deletions(-)

diff --git a/docs/metrics_v2.md b/docs/metrics_v2.md
index 77aa167..c46459d 100644
--- a/docs/metrics_v2.md
+++ b/docs/metrics_v2.md
@@ -138,6 +138,10 @@ public interface StormMetricsFilter extends MetricFilter {
 }
 ```
 
+V2 metrics can be reported with a long name (such as storm.topology.mytopologyname-17-1595349167.hostname.__system.-1.6700-memory.pools.Code-Cache.max) or with a short
+name and dimensions (such as memory.pools.Code-Cache.max with dimensions task Id of -1 and component Id of __system) if reporters support this.  Each reporter defaults
+to using the long metric name, but can report the short name by configuring report.dimensions.enabled to true for the reporter.
+
 ## Backwards Compatibility Notes
 
 1. V2 metrics can also be reported to the Metrics Consumers registered with `topology.metrics.consumer.register` by enabling the `topology.enable.v2.metrics.tick` configuration.
@@ -169,4 +173,4 @@ However, the reporters configured with `topology.metrics.reporters` (or `storm.m
         duration.unit: "SECONDS"
     ```
    Default values will be used if they are not set or set to `null`.
-   
\ No newline at end of file
+   
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index eaa19c2..aef4997 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -180,7 +180,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         IStateStorage stateStorage = ClusterUtils.mkStateStorage(conf, topologyConf, csContext);
         IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(stateStorage, null, csContext);
 
-        metricRegistry.start(topologyConf);
+        metricRegistry.start(topologyConf, port);
 
         Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
         Map<String, String> initCreds = new HashMap<>();
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java
new file mode 100644
index 0000000..0e6c487
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/DimensionalReporter.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class that allows using a ScheduledReporter to report V2 task metrics with support for dimensions.
+ * <p></p>
+ * This reporter will be started and scheduled with the MetricRegistry.  Once it is called on to report,
+ * it will query the StormMetricRegistry for various sets of task metrics with unique dimensions.  The
+ * underlying ScheduledReporter will perform the actual reporting with the help of a DimensionHandler to
+ * deal with the dimensions.
+ */
+public class DimensionalReporter extends ScheduledReporter {
+    private ScheduledReporter underlyingReporter;
+    private MetricRegistryProvider metricRegistryProvider;
+    private MetricFilter filter;
+    private DimensionHandler dimensionHandler;
+
+    /**
+     * Constructor.
+     *
+     * @param metricRegistryProvider MetricRegistryProvider tracking task-specific metrics.
+     * @param unstartedReporter      ScheduledReporter to perform the actual reporting.  It should NOT be started.
+     * @param dimensionHandler       class to handle setting dimensions before reporting a set of metrics.
+     * @param name                   the reporter's name.
+     * @param filter                 the filter for which metrics to report.
+     * @param rateUnit               rate unit for the reporter.
+     * @param durationUnit           duration unit for the reporter.
+     * @param executor               the executor to use while scheduling reporting of metrics.
+     * @param shutdownExecutorOnStop if true, then executor will be stopped in same time with this reporter.
+     */
+    public DimensionalReporter(MetricRegistryProvider metricRegistryProvider,
+                               ScheduledReporter unstartedReporter,
+                               DimensionHandler dimensionHandler,
+                               String name,
+                               MetricFilter filter,
+                               TimeUnit rateUnit,
+                               TimeUnit durationUnit,
+                               ScheduledExecutorService executor,
+                               boolean shutdownExecutorOnStop) {
+        super(metricRegistryProvider.getRegistry(), name, filter, rateUnit, durationUnit, executor, shutdownExecutorOnStop);
+        underlyingReporter = unstartedReporter;
+        this.metricRegistryProvider = metricRegistryProvider;
+        this.filter = filter;
+        this.dimensionHandler = dimensionHandler;
+    }
+
+    @Override
+    public void report(SortedMap<String, Gauge> gauges, SortedMap<String, Counter> counters,
+                       SortedMap<String, Histogram> histograms, SortedMap<String, Meter> meters,
+                       SortedMap<String, Timer> timers) {
+        report();
+    }
+
+    @Override
+    public void report() {
+        for (Map.Entry<TaskMetricDimensions, TaskMetricRepo> entry : metricRegistryProvider.getTaskMetrics().entrySet()) {
+            TaskMetricRepo repo = entry.getValue();
+            if (dimensionHandler != null) {
+                TaskMetricDimensions dimensions = entry.getKey();
+                dimensionHandler.setDimensions(dimensions.getDimensions());
+            }
+            repo.report(underlyingReporter, filter);
+        }
+    }
+
+    public interface DimensionHandler {
+        /**
+         * Sets dimensions to be used for reporting on the next batch of metrics.
+         *
+         * @param dimensions    dimensions valid for use in the next scheduled report.
+         */
+        void setDimensions(Map<String, String> dimensions);
+    }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
similarity index 69%
copy from storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
copy to storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
index 8957608..889b236 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/MetricRegistryProvider.java
@@ -10,19 +10,14 @@
  * and limitations under the License.
  */
 
-package org.apache.storm.metrics2.reporters;
+package org.apache.storm.metrics2;
 
 import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Reporter;
 import java.util.Map;
 
-public interface StormReporter extends Reporter {
-    String REPORT_PERIOD = "report.period";
-    String REPORT_PERIOD_UNITS = "report.period.units";
+public interface MetricRegistryProvider {
 
-    void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf, Map<String, Object> reporterConf);
+    MetricRegistry getRegistry();
 
-    void start();
-
-    void stop();
-}
\ No newline at end of file
+    Map<TaskMetricDimensions, TaskMetricRepo> getTaskMetrics();
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 43cfcc7..5bd6b92 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -13,6 +13,7 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import com.codahale.metrics.ExponentiallyDecayingReservoir;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.Meter;
@@ -29,7 +30,6 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.Config;
-import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.metrics2.reporters.StormReporter;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.task.WorkerTopologyContext;
@@ -38,7 +38,7 @@ import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StormMetricRegistry {
+public class StormMetricRegistry implements MetricRegistryProvider {
     private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class);
     private static final String WORKER_METRIC_PREFIX = "storm.worker.";
     private static final String TOPOLOGY_METRIC_PREFIX = "storm.topology.";
@@ -50,26 +50,30 @@ public class StormMetricRegistry {
     private final ConcurrentMap<Integer, Map<String, Counter>> taskIdCounters = new ConcurrentHashMap<>();
     private final ConcurrentMap<Integer, Map<String, Timer>> taskIdTimers = new ConcurrentHashMap<>();
     private final ConcurrentMap<Integer, Map<String, Histogram>> taskIdHistograms = new ConcurrentHashMap<>();
+    private final ConcurrentMap<TaskMetricDimensions, TaskMetricRepo> taskMetrics = new ConcurrentHashMap<>();
     private String hostName = null;
+    private int port = -1;
+    private String topologyId = null;
 
     public <T> SimpleGauge<T> gauge(
         T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port) {
+        Gauge gauge = new SimpleGauge<>(initialValue);
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
-        Gauge gauge = registry.gauge(metricNames.getLongName(), () -> new SimpleGauge<>(initialValue));
+        gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
         saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
         return (SimpleGauge<T>) gauge;
     }
 
     public <T> Gauge<T> gauge(String name, Gauge<T> gauge, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
-        gauge = registry.register(metricNames.getLongName(), gauge);
+        gauge = registerGauge(metricNames, gauge, context.getThisTaskId(), context.getThisComponentId(), null);
         saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, gauge, taskIdGauges);
         return gauge;
     }
 
     public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId, Integer taskId, Integer port) {
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, taskId, port);
-        gauge = registry.register(metricNames.getLongName(), gauge);
+        gauge = registerGauge(metricNames, gauge, taskId, componentId, null);
         saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
         return gauge;
     }
@@ -77,69 +81,91 @@ public class StormMetricRegistry {
     public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId,
                               String streamId, Integer taskId, Integer port) {
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port);
-        gauge = registry.register(metricNames.getLongName(), gauge);
+        gauge = registerGauge(metricNames, gauge, taskId, componentId, streamId);
         saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
         return gauge;
     }
 
     public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
         MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
-        Meter meter = registry.meter(metricNames.getLongName());
+        Meter meter = registerMeter(metricNames, new Meter(), taskId, componentId, streamId);
         saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
         return meter;
     }
 
     public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
         MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
-        Meter meter = registry.meter(metricNames.getLongName());
+        Meter meter = registerMeter(metricNames, new Meter(), taskId, componentId, null);
         saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
         return meter;
     }
 
     public Meter meter(String name, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
-        Meter meter = registry.meter(metricNames.getLongName());
+        Meter meter = registerMeter(metricNames, new Meter(), context.getThisTaskId(), context.getThisComponentId(), null);
         saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, meter, taskIdMeters);
         return meter;
     }
 
     public Counter counter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
         MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
-        Counter counter = registry.counter(metricNames.getLongName());
+        Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, streamId);
         saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
         return counter;
     }
 
     public Counter counter(String name, String topologyId, String componentId, Integer taskId, Integer workerPort, String streamId) {
         MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, workerPort);
-        Counter counter = registry.counter(metricNames.getLongName());
+        Counter counter = registerCounter(metricNames, new Counter(), taskId, componentId, streamId);
         saveMetricTaskIdMapping(taskId, metricNames, counter, taskIdCounters);
         return counter;
     }
 
     public Counter counter(String name, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
-        Counter counter = registry.counter(metricNames.getLongName());
+        Counter counter = registerCounter(metricNames, new Counter(), context.getThisTaskId(), context.getThisComponentId(), null);
         saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, counter, taskIdCounters);
         return counter;
     }
 
+    public Timer timer(String name, TopologyContext context) {
+        MetricNames metricNames = topologyMetricName(name, context);
+        Timer timer = registerTimer(metricNames, new Timer(), context.getThisTaskId(), context.getThisComponentId(), null);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, timer, taskIdTimers);
+        return timer;
+    }
+
+    public Histogram histogram(String name, TopologyContext context) {
+        MetricNames metricNames = topologyMetricName(name, context);
+        Histogram histogram = registerHistogram(metricNames, new Histogram(new ExponentiallyDecayingReservoir()),
+                context.getThisTaskId(), context.getThisComponentId(), null);
+        saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, histogram, taskIdHistograms);
+        return histogram;
+    }
+
     public void metricSet(String prefix, MetricSet set, TopologyContext context) {
         // Instead of registering the metrics as a set, register them individually.
         // This allows fetching the individual metrics by type (getTaskGauges())
         // to work as expected.
         for (Map.Entry<String, Metric> entry : set.getMetrics().entrySet()) {
             MetricNames metricNames = topologyMetricName(prefix + "." + entry.getKey(), context);
-            Metric metric = registry.register(metricNames.getLongName(), entry.getValue());
+            Metric metric = entry.getValue();
             if (metric instanceof Gauge) {
+                registerGauge(metricNames, (Gauge) metric, context.getThisTaskId(), context.getThisComponentId(), null);
                 saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Gauge) metric, taskIdGauges);
             } else if (metric instanceof Meter) {
+                registerMeter(metricNames, (Meter) metric, context.getThisTaskId(), context.getThisComponentId(), null);
                 saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Meter) metric, taskIdMeters);
             } else if (metric instanceof Counter) {
+                registerCounter(metricNames, (Counter) metric, context.getThisTaskId(),
+                        context.getThisComponentId(), null);
                 saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Counter) metric, taskIdCounters);
             } else if (metric instanceof Timer) {
+                registerTimer(metricNames, (Timer) metric, context.getThisTaskId(), context.getThisComponentId(), null);
                 saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Timer) metric, taskIdTimers);
             } else if (metric instanceof Histogram) {
+                registerHistogram(metricNames, (Histogram) metric, context.getThisTaskId(),
+                        context.getThisComponentId(), null);
                 saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, (Histogram) metric, taskIdHistograms);
             } else {
                 LOG.error("Unable to save taskId mapping for metric {} named {}", metric, metricNames.getLongName());
@@ -147,28 +173,53 @@ public class StormMetricRegistry {
         }
     }
 
-    public Timer timer(String name, TopologyContext context) {
-        MetricNames metricNames = topologyMetricName(name, context);
-        Timer timer = registry.timer(metricNames.getLongName());
-        saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, timer, taskIdTimers);
-        return timer;
+    private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, MetricNames names, T metric, Map<Integer,
+            Map<String, T>> taskIdMetrics) {
+        Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
+        metrics.put(names.getShortName(), metric);
     }
 
-    public Histogram histogram(String name, TopologyContext context) {
-        MetricNames metricNames = topologyMetricName(name, context);
-        Histogram histogram = registry.histogram(metricNames.getLongName());
-        saveMetricTaskIdMapping(context.getThisTaskId(), metricNames, histogram, taskIdHistograms);
-        return histogram;
+    private <T> Gauge<T> registerGauge(MetricNames metricNames, Gauge<T> gauge, int taskId,
+                                       String componentId, String streamId) {
+        TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+        TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+        repo.addGauge(metricNames.getShortName(), gauge);
+        gauge = registry.register(metricNames.getLongName(), gauge);
+        return gauge;
     }
 
-    private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, MetricNames names, T metric, Map<Integer,
-            Map<String, T>> taskIdMetrics) {
-        Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
-        if (metrics.get(names.getV2TickName()) != null) {
-            LOG.warn("Adding duplicate short metric for {} with long name {}, only the last metric "
-                    + "will be reported during the V2 metrics tick.", names.getV2TickName(), names.longName);
-        }
-        metrics.put(names.getV2TickName(), metric);
+    private Meter registerMeter(MetricNames metricNames, Meter meter, int taskId, String componentId, String streamId) {
+        TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+        TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+        repo.addMeter(metricNames.getShortName(), meter);
+        meter = registry.register(metricNames.getLongName(), meter);
+        return meter;
+    }
+
+    private Counter registerCounter(MetricNames metricNames, Counter counter, int taskId,
+                                    String componentId, String streamId) {
+        TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+        TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+        repo.addCounter(metricNames.getShortName(), counter);
+        counter = registry.register(metricNames.getLongName(), counter);
+        return counter;
+    }
+
+    private Timer registerTimer(MetricNames metricNames, Timer timer, int taskId, String componentId, String streamId) {
+        TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+        TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+        repo.addTimer(metricNames.getShortName(), timer);
+        timer = registry.register(metricNames.getLongName(), timer);
+        return timer;
+    }
+
+    private Histogram registerHistogram(MetricNames metricNames, Histogram histogram, int taskId,
+                                        String componentId, String streamId) {
+        TaskMetricDimensions taskMetricDimensions = new TaskMetricDimensions(taskId, componentId, streamId, this);
+        TaskMetricRepo repo = taskMetrics.computeIfAbsent(taskMetricDimensions, (k) -> new TaskMetricRepo());
+        repo.addHistogram(metricNames.getShortName(), histogram);
+        histogram = registry.register(metricNames.getLongName(), histogram);
+        return histogram;
     }
 
     private <T extends Metric> Map<String, T> getMetricNameMap(int taskId, Map<Integer, Map<String, T>> taskIdMetrics) {
@@ -198,7 +249,7 @@ public class StormMetricRegistry {
         return getMetricNameMap(taskId, taskIdTimers);
     }
 
-    public void start(Map<String, Object> topoConf) {
+    public void start(Map<String, Object> topoConf, int port) {
         try {
             hostName = dotToUnderScore(Utils.localHostname());
         } catch (UnknownHostException e) {
@@ -206,6 +257,9 @@ public class StormMetricRegistry {
                      + " as 'localhost'.");
         }
 
+        this.topologyId = (String) topoConf.get(Config.STORM_ID);
+        this.port = port;
+
         LOG.info("Starting metrics reporters...");
         List<Map<String, Object>> reporterList = (List<Map<String, Object>>) topoConf.get(Config.TOPOLOGY_METRICS_REPORTERS);
         
@@ -221,7 +275,7 @@ public class StormMetricRegistry {
         LOG.info("Attempting to instantiate reporter class: {}", clazz);
         StormReporter reporter = ReflectionUtils.newInstance(clazz);
         if (reporter != null) {
-            reporter.prepare(registry, topoConf, reporterConfig);
+            reporter.prepare(this, topoConf, reporterConfig);
             reporter.start();
             reporters.add(reporter);
         }
@@ -234,6 +288,26 @@ public class StormMetricRegistry {
         }
     }
 
+    public MetricRegistry getRegistry() {
+        return registry;
+    }
+
+    public Map<TaskMetricDimensions, TaskMetricRepo> getTaskMetrics() {
+        return taskMetrics;
+    }
+
+    String getHostName() {
+        return hostName;
+    }
+
+    String getTopologyId() {
+        return topologyId;
+    }
+
+    Integer getPort() {
+        return port;
+    }
+
     private MetricNames workerMetricName(String name, String stormId, String componentId, String streamId,
                                          Integer taskId, Integer workerPort) {
         StringBuilder sb = new StringBuilder(WORKER_METRIC_PREFIX);
@@ -313,10 +387,10 @@ public class StormMetricRegistry {
         }
 
         /**
-         * Returns the short metric name to be used for reporting during the V2 metrics tick.
-         * @return The V2 metrics tick name.
+         * Returns the short metric name (without dimensions).
+         * @return The short metric name.
          */
-        String getV2TickName() {
+        String getShortName() {
             return shortName;
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java
new file mode 100644
index 0000000..20c4389
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricDimensions.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Class to store task-related V2 metrics dimensions.
+ */
+public class TaskMetricDimensions {
+    private int taskId;
+    private String componentId;
+    private String streamId;
+    private Map<String, String> dimensions = new HashMap<>();
+
+    public TaskMetricDimensions(int taskId, String componentId, String streamId, StormMetricRegistry metricRegistry) {
+        this.taskId = taskId;
+        dimensions.put("taskid", Integer.toString(this.taskId));
+
+        this.componentId = componentId;
+        if (this.componentId == null) {
+            this.componentId = "";
+        } else {
+            dimensions.put("componentId", this.componentId);
+        }
+
+        this.streamId = streamId;
+        if (this.streamId == null) {
+            this.streamId = "";
+        } else {
+            dimensions.put("streamId", this.streamId);
+        }
+
+        dimensions.put("hostname", metricRegistry.getHostName());
+        dimensions.put("topologyId", metricRegistry.getTopologyId());
+        dimensions.put("port", metricRegistry.getPort().toString());
+    }
+
+    public Map<String, String> getDimensions() {
+        return dimensions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TaskMetricDimensions otherD = (TaskMetricDimensions) o;
+        return (taskId == otherD.taskId && componentId.equals(otherD.componentId)
+                && streamId.equals(otherD.streamId));
+    }
+
+    @Override
+    public int hashCode() {
+        return taskId + componentId.hashCode() + streamId.hashCode();
+    }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
new file mode 100644
index 0000000..e88b64f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetricRepo.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricFilter;
+import com.codahale.metrics.ScheduledReporter;
+import com.codahale.metrics.Timer;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Metric repository to allow reporting of task-specific metrics.
+ */
+public class TaskMetricRepo {
+    private SortedMap<String, Gauge> gauges = new TreeMap<>();
+    private SortedMap<String, Counter> counters = new TreeMap<>();
+    private SortedMap<String, Histogram> histograms = new TreeMap<>();
+    private SortedMap<String, Meter> meters = new TreeMap<>();
+    private SortedMap<String, Timer> timers = new TreeMap<>();
+
+    public void addCounter(String name, Counter counter) {
+        counters.put(name, counter);
+    }
+
+    public void addGauge(String name, Gauge gauge) {
+        gauges.put(name, gauge);
+    }
+
+    public void addMeter(String name, Meter meter) {
+        meters.put(name, meter);
+    }
+
+    public void addHistogram(String name, Histogram histogram) {
+        histograms.put(name, histogram);
+    }
+
+    public void addTimer(String name, Timer timer) {
+        timers.put(name, timer);
+    }
+
+    public void report(ScheduledReporter reporter, MetricFilter filter) {
+        if (filter != null) {
+            SortedMap<String, Gauge> filteredGauges = new TreeMap<>();
+            SortedMap<String, Counter> filteredCounters = new TreeMap<>();
+            SortedMap<String, Histogram> filteredHistograms = new TreeMap<>();
+            SortedMap<String, Meter> filteredMeters = new TreeMap<>();
+            SortedMap<String, Timer> filteredTimers = new TreeMap<>();
+
+            for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
+                if (filter.matches(entry.getKey(), entry.getValue())) {
+                    filteredGauges.put(entry.getKey(), entry.getValue());
+                }
+            }
+            for (Map.Entry<String, Counter> entry : counters.entrySet()) {
+                if (filter.matches(entry.getKey(), entry.getValue())) {
+                    filteredCounters.put(entry.getKey(), entry.getValue());
+                }
+            }
+            for (Map.Entry<String, Histogram> entry : histograms.entrySet()) {
+                if (filter.matches(entry.getKey(), entry.getValue())) {
+                    filteredHistograms.put(entry.getKey(), entry.getValue());
+                }
+            }
+            for (Map.Entry<String, Meter> entry : meters.entrySet()) {
+                if (filter.matches(entry.getKey(), entry.getValue())) {
+                    filteredMeters.put(entry.getKey(), entry.getValue());
+                }
+            }
+            for (Map.Entry<String, Timer> entry : timers.entrySet()) {
+                if (filter.matches(entry.getKey(), entry.getValue())) {
+                    filteredTimers.put(entry.getKey(), entry.getValue());
+                }
+            }
+            reporter.report(filteredGauges, filteredCounters, filteredHistograms, filteredMeters, filteredTimers);
+        } else {
+            reporter.report(gauges, counters, histograms, meters, timers);
+        }
+    }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
index ffd278b..bfa84fb 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java
@@ -14,19 +14,35 @@ package org.apache.storm.metrics2.reporters;
 
 import com.codahale.metrics.ConsoleReporter;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.ScheduledReporter;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.storm.Config;
 import org.apache.storm.daemon.metrics.ClientMetricsUtils;
+import org.apache.storm.metrics2.DimensionalReporter;
+import org.apache.storm.metrics2.MetricRegistryProvider;
+import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.metrics2.filters.StormMetricsFilter;
+import org.apache.storm.utils.ObjectReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConsoleStormReporter extends ScheduledStormReporter {
+public class ConsoleStormReporter extends ScheduledStormReporter implements DimensionalReporter.DimensionHandler {
     private static final Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class);
 
     @Override
     public void prepare(MetricRegistry registry, Map<String, Object> topoConf, Map<String, Object> reporterConf) {
+        init(registry, null, reporterConf);
+    }
+
+    @Override
+    public void prepare(MetricRegistryProvider metricRegistryProvider, Map<String, Object> topoConf,
+                        Map<String, Object> reporterConf) {
+        init(metricRegistryProvider.getRegistry(), metricRegistryProvider, reporterConf);
+    }
+
+    private void init(MetricRegistry registry, MetricRegistryProvider metricRegistryProvider, Map<String, Object> reporterConf) {
         LOG.debug("Preparing ConsoleReporter");
         ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry);
 
@@ -57,7 +73,34 @@ public class ConsoleStormReporter extends ScheduledStormReporter {
         //defaults to seconds
         reportingPeriodUnit = getReportPeriodUnit(reporterConf);
 
-        reporter = builder.build();
+        ScheduledReporter consoleReporter = builder.build();
+
+        boolean reportDimensions = isReportDimensionsEnabled(reporterConf);
+        if (reportDimensions) {
+            if (metricRegistryProvider == null) {
+                throw new RuntimeException("MetricRegistryProvider is required to enable reporting dimensions");
+            }
+            if (rateUnit == null) {
+                rateUnit = TimeUnit.SECONDS;
+            }
+            if (durationUnit == null) {
+                durationUnit = TimeUnit.MILLISECONDS;
+            }
+            DimensionalReporter dimensionalReporter = new DimensionalReporter(metricRegistryProvider, consoleReporter, this,
+                    "ConsoleDimensionalReporter",
+                    filter, rateUnit, durationUnit, null, true);
+            reporter = dimensionalReporter;
+        } else {
+            reporter = consoleReporter;
+        }
     }
 
+    // We're unable to extend ConsoleReporter to handle dimensions, so we'll report dimensions here
+    @Override
+    public void setDimensions(Map<String, String> dimensions) {
+        System.out.println("Using dimensions: ");
+        for (Map.Entry<String, String> entry : dimensions.entrySet()) {
+            System.out.println(entry.getKey() + " : " + entry.getValue());
+        }
+    }
 }
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
index 0fe5c7d..3a7c990 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java
@@ -37,6 +37,10 @@ public abstract class ScheduledStormReporter implements StormReporter {
         return ObjectReader.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
     }
 
+    public static boolean isReportDimensionsEnabled(Map<String, Object> reporterConf) {
+        return ObjectReader.getBoolean(reporterConf.get(REPORT_DIMENSIONS_ENABLED), false);
+    }
+
     public static StormMetricsFilter getMetricsFilter(Map<String, Object> reporterConf) {
         StormMetricsFilter filter = null;
         Map<String, Object> filterConf = (Map<String, Object>) reporterConf.get("filter");
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
index 8957608..4e81667 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java
@@ -15,13 +15,22 @@ package org.apache.storm.metrics2.reporters;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Reporter;
 import java.util.Map;
+import org.apache.storm.metrics2.MetricRegistryProvider;
+
 
 public interface StormReporter extends Reporter {
     String REPORT_PERIOD = "report.period";
     String REPORT_PERIOD_UNITS = "report.period.units";
+    String REPORT_DIMENSIONS_ENABLED = "report.dimensions.enabled";
 
+    @Deprecated
     void prepare(MetricRegistry metricsRegistry, Map<String, Object> topoConf, Map<String, Object> reporterConf);
 
+    default void prepare(MetricRegistryProvider metricRegistryProvider, Map<String, Object> topoConf,
+                         Map<String, Object> reporterConf) {
+        prepare(metricRegistryProvider.getRegistry(), topoConf, reporterConf);
+    }
+
     void start();
 
     void stop();