You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/02/13 07:00:54 UTC

[incubator-pinot] branch introduce-factory-for-metrics updated (bfebad0 -> 35b514b)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a change to branch introduce-factory-for-metrics
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard bfebad0  Decouple from yammer library
     new 35b514b  Decouple from yammer library

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (bfebad0)
            \
             N -- N -- N   refs/heads/introduce-factory-for-metrics (35b514b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pinot-common/pom.xml | 4 ----
 1 file changed, 4 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Decouple from yammer library

Posted by jl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch introduce-factory-for-metrics
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 35b514b72e132f3515f75bfdc91617c525516d48
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Fri Feb 12 22:57:06 2021 -0800

    Decouple from yammer library
---
 .../broker/broker/helix/HelixBrokerStarter.java    |  16 +-
 .../LiteralOnlyBrokerRequestTest.java              |   8 +-
 .../pinot/common/metrics/AbstractMetrics.java      |  64 ++++---
 .../pinot/common/metrics/AggregatedCounter.java    | 131 --------------
 .../pinot/common/metrics/AggregatedHistogram.java  | 200 ---------------------
 .../pinot/common/metrics/AggregatedLongGauge.java  | 108 -----------
 .../pinot/common/metrics/AggregatedMeter.java      | 195 --------------------
 .../common/metrics/AggregatedMetricsRegistry.java  |  94 ----------
 .../apache/pinot/common/metrics/BrokerMetrics.java |   8 +-
 .../pinot/common/metrics/ControllerMetrics.java    |   6 +-
 ...eporterMetricsRegistryRegistrationListener.java |   8 +-
 .../apache/pinot/common/metrics/MetricsHelper.java | 145 +++------------
 .../MetricsRegistryRegistrationListener.java       |   4 +-
 .../apache/pinot/common/metrics/ServerMetrics.java |   8 +-
 .../pinot/common/metrics/ValidationMetrics.java    |  40 +++--
 .../PinotCounter.java}                             |  12 +-
 .../PinotGauge.java}                               |  20 ++-
 .../PinotHistogram.java}                           |  11 +-
 .../PinotJmxReporter.java}                         |  11 +-
 .../PinotMeter.java}                               |  12 +-
 .../pinot/common/metrics/base/PinotMetered.java    |  91 ++++++++++
 .../PinotMetric.java}                              |  20 ++-
 .../PinotMetricName.java}                          |  11 +-
 .../common/metrics/base/PinotMetricProcessor.java  |  72 ++++++++
 .../metrics/base/PinotMetricUtilsFactory.java      |  93 ++++++++++
 .../common/metrics/base/PinotMetricsRegistry.java  |  51 ++++++
 .../PinotTimer.java}                               |  15 +-
 .../YammerCounter.java}                            |  31 +++-
 .../pinot/common/metrics/yammer/YammerGauge.java   |  60 +++++++
 .../YammerJmxReporter.java}                        |  22 ++-
 .../YammerMeter.java}                              |  31 +++-
 .../pinot/common/metrics/yammer/YammerMetered.java |  72 ++++++++
 .../YammerMetric.java}                             |  28 ++-
 .../YammerMetricName.java}                         |  26 ++-
 .../metrics/yammer/YammerMetricProcessor.java      |  73 ++++++++
 .../metrics/yammer/YammerMetricsRegistry.java      |  99 ++++++++++
 .../pinot/common/metrics/yammer/YammerTimer.java   |  90 ++++++++++
 .../pinot/common/metrics/MetricsHelperTest.java    |  23 +--
 .../metrics/PinotMetricUtilsFactoryTest.java       |  53 ++++++
 .../spark/connector/PinotServerDataFetcher.scala   |   4 +-
 .../apache/pinot/controller/ControllerStarter.java |  51 ++++--
 .../controller/LeadControllerManagerTest.java      |   4 +-
 .../pinot/controller/api/TableSizeReaderTest.java  |   5 +-
 .../controller/helix/SegmentStatusCheckerTest.java |  25 +--
 .../periodictask/ControllerPeriodicTaskTest.java   |   5 +-
 .../helix/core/realtime/SegmentCompletionTest.java |   6 +-
 .../helix/core/retention/RetentionManagerTest.java |   6 +-
 .../core/retention/SegmentLineageCleanupTest.java  |   4 +-
 .../validation/StorageQuotaCheckerTest.java        |   4 +-
 .../realtime/HLRealtimeSegmentDataManager.java     |   6 +-
 .../realtime/LLRealtimeSegmentDataManager.java     |   6 +-
 .../core/io/writer/impl/DirectMemoryManager.java   |   4 +-
 .../core/io/writer/impl/MmapMemoryManager.java     |   4 +-
 .../data/manager/BaseTableDataManagerTest.java     |   7 +-
 .../offline/DimensionTableDataManagerTest.java     |   4 +-
 .../realtime/LLRealtimeSegmentDataManagerTest.java |   6 +-
 .../scheduler/MultiLevelPriorityQueueTest.java     |  14 +-
 .../query/scheduler/PrioritySchedulerTest.java     |   5 +-
 .../scheduler/fcfs/FCFSSchedulerGroupTest.java     |   4 +-
 .../SegmentGenerationWithNullValueVectorTest.java  |   5 +-
 .../pinot/query/executor/QueryExecutorTest.java    |   5 +-
 .../tests/SegmentCompletionIntegrationTest.java    |   9 +-
 .../org/apache/pinot/minion/MinionStarter.java     |  12 +-
 .../apache/pinot/minion/metrics/MinionMetrics.java |   6 +-
 .../pinot/server/starter/ServerInstance.java       |  11 +-
 65 files changed, 1146 insertions(+), 1138 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index 2a3c97f..a2bb692 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -19,14 +19,12 @@
 package org.apache.pinot.broker.broker.helix;
 
 import com.google.common.collect.ImmutableList;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixConstants.ChangeType;
@@ -54,6 +52,8 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.MetricsHelper;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.common.utils.CommonConstants.Helix;
@@ -71,6 +71,8 @@ import org.apache.pinot.spi.services.ServiceStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory.LIBRARY_NAME_KEY;
+
 
 @SuppressWarnings("unused")
 public class HelixBrokerStarter implements ServiceStartable {
@@ -92,7 +94,7 @@ public class HelixBrokerStarter implements ServiceStartable {
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private HelixDataAccessor _helixDataAccessor;
 
-  private MetricsRegistry _metricsRegistry;
+  private PinotMetricsRegistry _metricsRegistry;
   private BrokerMetrics _brokerMetrics;
   private RoutingManager _routingManager;
   private AccessControlFactory _accessControlFactory;
@@ -226,8 +228,10 @@ public class HelixBrokerStarter implements ServiceStartable {
 
     LOGGER.info("Setting up broker request handler");
     // Set up metric registry and broker metrics
-    _metricsRegistry = new MetricsRegistry();
-    MetricsHelper.initializeMetrics(_brokerConf.subset(Broker.METRICS_CONFIG_PREFIX));
+    PinotConfiguration metricsConfiguration = _brokerConf.subset(Broker.METRICS_CONFIG_PREFIX);
+    PinotMetricUtilsFactory.init(metricsConfiguration.getProperty(LIBRARY_NAME_KEY));
+    _metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+    MetricsHelper.initializeMetrics(metricsConfiguration);
     MetricsHelper.registerMetricsRegistry(_metricsRegistry);
     _brokerMetrics = new BrokerMetrics(
         _brokerConf.getProperty(Broker.CONFIG_OF_METRICS_NAME_PREFIX, Broker.DEFAULT_METRICS_NAME_PREFIX),
@@ -394,7 +398,7 @@ public class HelixBrokerStarter implements ServiceStartable {
     return _spectatorHelixManager;
   }
 
-  public MetricsRegistry getMetricsRegistry() {
+  public PinotMetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
 
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index 65896d2..df4c5ce 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -20,11 +20,11 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Collections;
 import java.util.Random;
 import org.apache.pinot.broker.api.RequestStatistics;
 import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.env.PinotConfiguration;
@@ -92,7 +92,8 @@ public class LiteralOnlyBrokerRequestTest {
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), true, Collections.emptySet()), null);
+            new BrokerMetrics("", PinotMetricUtilsFactory.getPinotMetricsRegistry(), true, Collections.emptySet()),
+            null);
     long randNum = RANDOM.nextLong();
     byte[] randBytes = new byte[12];
     RANDOM.nextBytes(randBytes);
@@ -119,7 +120,8 @@ public class LiteralOnlyBrokerRequestTest {
       throws Exception {
     SingleConnectionBrokerRequestHandler requestHandler =
         new SingleConnectionBrokerRequestHandler(new PinotConfiguration(), null, null, null, null,
-            new BrokerMetrics("", new MetricsRegistry(), true, Collections.emptySet()), null);
+            new BrokerMetrics("", PinotMetricUtilsFactory.getPinotMetricsRegistry(), true, Collections.emptySet()),
+            null);
     long currentTsMin = System.currentTimeMillis();
     JsonNode request = new ObjectMapper().readTree(
         "{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC', 'yyyy-MM-dd z') as firstDayOf2020\"}");
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index fb3ab8c..f75e160 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -19,8 +19,6 @@
 package org.apache.pinot.common.metrics;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,6 +30,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.metrics.base.PinotMeter;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +49,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
 
   protected final String _metricPrefix;
 
-  protected final MetricsRegistry _metricsRegistry;
+  protected final PinotMetricsRegistry _metricsRegistry;
 
   private final Class _clazz;
 
@@ -58,11 +60,11 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
   // Table level metrics are still emitted for allowed tables even if emitting table level metrics is disabled
   private final Set<String> _allowedTables;
 
-  public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Class clazz) {
+  public AbstractMetrics(String metricPrefix, PinotMetricsRegistry metricsRegistry, Class clazz) {
     this(metricPrefix, metricsRegistry, clazz, true, Collections.emptySet());
   }
 
-  public AbstractMetrics(String metricPrefix, MetricsRegistry metricsRegistry, Class clazz,
+  public AbstractMetrics(String metricPrefix, PinotMetricsRegistry metricsRegistry, Class clazz,
       boolean isTableLevelMetricsEnabled, Collection<String> allowedTables) {
     _metricPrefix = metricPrefix;
     _metricsRegistry = metricsRegistry;
@@ -76,12 +78,11 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * name to the allowed entries to make sure that all metrics are checked against the allowed tables.
    */
   private static Set<String> addNameVariations(Collection<String> allowedTables) {
-    return allowedTables.stream()
-        .flatMap(tableName -> TableNameBuilder.getTableNameVariations(tableName).stream())
+    return allowedTables.stream().flatMap(tableName -> TableNameBuilder.getTableNameVariations(tableName).stream())
         .collect(Collectors.toCollection(HashSet::new));
   }
 
-  public MetricsRegistry getMetricsRegistry() {
+  public PinotMetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
 
@@ -149,9 +150,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param timeUnit The log time duration time unit
    */
   private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) {
-    final MetricName metricName = new MetricName(_clazz, fullTimerName);
-    com.yammer.metrics.core.Timer timer =
-        MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
+    final PinotMetricName metricName = PinotMetricUtilsFactory.generatePinotMetricName(_clazz, fullTimerName);
     MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
         .update(duration, timeUnit);
   }
@@ -173,8 +172,7 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param unitCount The number of units to add to the meter
    * @param reusedMeter The meter to reuse
    */
-  public com.yammer.metrics.core.Meter addMeteredGlobalValue(final M meter, final long unitCount,
-      com.yammer.metrics.core.Meter reusedMeter) {
+  public PinotMeter addMeteredGlobalValue(final M meter, final long unitCount, PinotMeter reusedMeter) {
     if (reusedMeter != null) {
       reusedMeter.mark(unitCount);
       return reusedMeter;
@@ -182,9 +180,9 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
       final String fullMeterName;
       String meterName = meter.getMeterName();
       fullMeterName = _metricPrefix + meterName;
-      final MetricName metricName = new MetricName(_clazz, fullMeterName);
+      final PinotMetricName metricName = PinotMetricUtilsFactory.generatePinotMetricName(_clazz, fullMeterName);
 
-      final com.yammer.metrics.core.Meter newMeter =
+      final PinotMeter newMeter =
           MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
       newMeter.mark(unitCount);
       return newMeter;
@@ -209,8 +207,8 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param unitCount The number of units to add to the meter
    * @param reusedMeter The meter to reuse
    */
-  public com.yammer.metrics.core.Meter addMeteredTableValue(final String tableName, final M meter, final long unitCount,
-      com.yammer.metrics.core.Meter reusedMeter) {
+  public PinotMeter addMeteredTableValue(final String tableName, final M meter, final long unitCount,
+      PinotMeter reusedMeter) {
     if (reusedMeter != null) {
       reusedMeter.mark(unitCount);
       return reusedMeter;
@@ -218,20 +216,20 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
       final String fullMeterName;
       String meterName = meter.getMeterName();
       fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
-      final MetricName metricName = new MetricName(_clazz, fullMeterName);
+      final PinotMetricName metricName = PinotMetricUtilsFactory.generatePinotMetricName(_clazz, fullMeterName);
 
-      final com.yammer.metrics.core.Meter newMeter =
+      final PinotMeter newMeter =
           MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
       newMeter.mark(unitCount);
       return newMeter;
     }
   }
 
-  public com.yammer.metrics.core.Meter getMeteredTableValue(final String tableName, final M meter) {
+  public PinotMeter getMeteredTableValue(final String tableName, final M meter) {
     final String fullMeterName;
     String meterName = meter.getMeterName();
     fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
-    final MetricName metricName = new MetricName(_clazz, fullMeterName);
+    final PinotMetricName metricName = PinotMetricUtilsFactory.generatePinotMetricName(_clazz, fullMeterName);
 
     return MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
   }
@@ -462,19 +460,17 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param valueCallback The callback function used to retrieve the value of the gauge
    */
   public void addCallbackGauge(final String metricName, final Callable<Long> valueCallback) {
-    MetricsHelper.newGauge(_metricsRegistry, new MetricName(_clazz, _metricPrefix + metricName),
-        new com.yammer.metrics.core.Gauge<Long>() {
-          @Override
-          public Long value() {
-            try {
-              return valueCallback.call();
-            } catch (Exception e) {
-              LOGGER.error("Caught exception", e);
-              Utils.rethrowException(e);
-              throw new AssertionError("Should not reach this");
-            }
-          }
-        });
+    MetricsHelper
+        .newGauge(_metricsRegistry, PinotMetricUtilsFactory.generatePinotMetricName(_clazz, _metricPrefix + metricName),
+            PinotMetricUtilsFactory.generatePinotGauge(avoid -> {
+              try {
+                return valueCallback.call();
+              } catch (Exception e) {
+                LOGGER.error("Caught exception", e);
+                Utils.rethrowException(e);
+                throw new AssertionError("Should not reach this");
+              }
+            }));
   }
 
   protected abstract QP[] getQueryPhases();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedCounter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedCounter.java
deleted file mode 100644
index 4e5537b..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedCounter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricProcessor;
-import java.util.Collection;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-
-/**
- * Aggregated Histogram which aggregates counters.
- *  This class supports multi-level aggregations of counters
- *
- *
- */
-public class AggregatedCounter implements Metric {
-  // Container of inner meters
-  private final CopyOnWriteArrayList<Metric> _counters = new CopyOnWriteArrayList<Metric>();
-
-  private static final long DEFAULT_REFRESH_MS = 60 * 1000L; // 1 minute
-
-  // Refresh Delay config
-  private final long _refreshMs;
-
-  // Last Refreshed timestamp
-  private long _lastRefreshedTime;
-
-  // Count of entries
-  private volatile long _count;
-
-  @Override
-  public <T> void processWith(MetricProcessor<T> processor, MetricName name, T context)
-      throws Exception {
-    for (Metric c : _counters) {
-      c.processWith(processor, name, context);
-    }
-  }
-
-  public AggregatedCounter() {
-    _refreshMs = DEFAULT_REFRESH_MS;
-  }
-
-  public AggregatedCounter(long refreshMs) {
-    _refreshMs = refreshMs;
-  }
-
-  /**
-   * Add Collection of metrics to be aggregated
-   * @param counters collection of metrics to be aggregated
-   * @return this instance
-   */
-  public AggregatedCounter addAll(Collection<? extends Metric> counters) {
-    _counters.addAll(counters);
-    return this;
-  }
-
-  /**
-   * Add a metric to be aggregated
-   * @param counter
-   * @return this instance
-   */
-  public AggregatedCounter add(Metric counter) {
-    _counters.add(counter);
-    return this;
-  }
-
-  /**
-   * Remove a metric which was already added
-   * @param counter metric to be removed
-   * @return true if the metric was present in the list
-   */
-  public boolean remove(Metric counter) {
-    return _counters.remove(counter);
-  }
-
-  /**
-   * Returns the counter's current value.
-   *
-   * @return the counter's current value
-   */
-  public long count() {
-    refreshIfElapsed();
-    return _count;
-  }
-
-  /**
-   * Check elapsed time since last refresh and only refresh if time difference is
-   * greater than threshold.
-   */
-  private void refreshIfElapsed() {
-    long currentTime = System.currentTimeMillis();
-    if (currentTime - _lastRefreshedTime > _refreshMs && !_counters.isEmpty()) {
-      refresh();
-      _lastRefreshedTime = currentTime;
-    }
-  }
-
-  /**
-   * Update counter from underlying counters.
-   */
-  public void refresh() {
-    long count = 0;
-    for (Metric m : _counters) {
-      if (m instanceof Counter) {
-        count += ((Counter) m).count();
-      } else if (m instanceof AggregatedCounter) {
-        count += ((AggregatedCounter) m).count();
-      }
-    }
-    _count = count;
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedHistogram.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedHistogram.java
deleted file mode 100644
index 08f38f9..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedHistogram.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Metric;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricProcessor;
-import com.yammer.metrics.core.Sampling;
-import com.yammer.metrics.core.Summarizable;
-import com.yammer.metrics.stats.Snapshot;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-
-/**
- *
- * Aggregated Histogram which aggregates and provides Uniform Sampling Histograms.
- * We can have multi-level aggregations.
- *
- *
- */
-public class AggregatedHistogram<T extends Sampling> implements Sampling, Summarizable, Metric {
-
-  private final List<T> _histograms = new CopyOnWriteArrayList<T>();
-
-  private static final long DEFAULT_REFRESH_MS = 60 * 1000L; // 1 minute
-
-  // Refresh Delay config
-  private final long _refreshMs;
-
-  // Last Refreshed timestamp
-  private volatile long _lastRefreshedTime;
-
-  // Sampling stats
-  private volatile double _max;
-  private volatile double _min;
-  private volatile double _mean;
-  private volatile double _stdDev;
-  private volatile double _sum;
-
-  //Sanpshot
-  private volatile Snapshot _snapshot;
-
-  public AggregatedHistogram(long refreshMs) {
-    _refreshMs = refreshMs;
-  }
-
-  public AggregatedHistogram() {
-    _refreshMs = DEFAULT_REFRESH_MS;
-  }
-
-  /**
-   * Add Collection of metrics to be aggregated
-   * @return this instance
-   */
-  public AggregatedHistogram<T> addAll(Collection<T> histograms) {
-    _histograms.addAll(histograms);
-    return this;
-  }
-
-  /**
-   * Add a metric to be aggregated
-   * @return this instance
-   */
-  public AggregatedHistogram<T> add(T histogram) {
-    _histograms.add(histogram);
-    return this;
-  }
-
-  /**
-   * Remove a metric which was already added
-   * @return true if the metric was present in the list
-   */
-  public boolean remove(T histogram) {
-    return _histograms.remove(histogram);
-  }
-
-  /**
-   * Check elapsed time since last refresh and only refresh if time difference is
-   * greater than threshold.
-   */
-  private void refreshIfElapsed() {
-    long currentTime = System.currentTimeMillis();
-    if (currentTime - _lastRefreshedTime > _refreshMs && !_histograms.isEmpty()) {
-      refresh();
-      _lastRefreshedTime = currentTime;
-    }
-  }
-
-  /**
-   * update all stats using underlying histograms
-   */
-  public void refresh() {
-    List<Double> values = new ArrayList<Double>();
-    _min = Double.MAX_VALUE;
-    _max = Double.MIN_VALUE;
-    _sum = 0;
-    double meanSum = 0.0;
-    for (T hist : _histograms) {
-      if (hist instanceof Histogram) {
-        Histogram h = (Histogram) hist;
-        _min = Math.min(_min, h.min());
-        _max = Math.max(_max, h.max());
-        _sum += h.sum();
-        meanSum += h.mean();
-      } else {
-        AggregatedHistogram<Sampling> h = (AggregatedHistogram<Sampling>) hist;
-        _min = Math.min(_min, h.min());
-        _max = Math.max(_max, h.max());
-        _sum += h.sum();
-        meanSum += h.mean();
-      }
-      double[] val = hist.getSnapshot().getValues();
-      for (double d : val) {
-        values.add(d);
-      }
-    }
-
-    if (!_histograms.isEmpty()) {
-      _mean = meanSum / _histograms.size();
-    }
-
-    if (!values.isEmpty()) {
-
-      double[] vals = new double[values.size()];
-
-      int i = 0;
-      for (Double d : values) {
-        vals[i++] = d;
-      }
-
-      _snapshot = new Snapshot(vals);
-    }
-  }
-
-  @Override
-  public double max() {
-    refreshIfElapsed();
-    return _max;
-  }
-
-  @Override
-  public double min() {
-    refreshIfElapsed();
-    return _min;
-  }
-
-  @Override
-  public double mean() {
-    refreshIfElapsed();
-    return _mean;
-  }
-
-  @Override
-  public double stdDev() {
-    refreshIfElapsed();
-    return _stdDev;
-  }
-
-  @Override
-  public double sum() {
-    refreshIfElapsed();
-    return _sum;
-  }
-
-  @Override
-  public Snapshot getSnapshot() {
-    refreshIfElapsed();
-    return _snapshot;
-  }
-
-  @Override
-  public <T2> void processWith(MetricProcessor<T2> processor, MetricName name, T2 context)
-      throws Exception {
-    for (T h : _histograms) {
-      if (h instanceof Metric) {
-        ((Metric) h).processWith(processor, name, context);
-      }
-    }
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedLongGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedLongGauge.java
deleted file mode 100644
index b4205ba..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedLongGauge.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import com.yammer.metrics.core.Gauge;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-
-/**
- * An aggregated gauge that provides an average among the underlying gauges. You can have
- * multi-level aggregations of gauges (long types)
- *
- *
- * @param <T>
- */
-public class AggregatedLongGauge<T extends Number, V extends Gauge<T>> extends Gauge<Long> {
-  // Container of inner meters
-  private final List<Gauge<T>> _gauges = new CopyOnWriteArrayList<Gauge<T>>();
-
-  private static final long DEFAULT_REFRESH_MS = 60 * 1000L; // 1 minute
-
-  // Refresh Delay config
-  private final long _refreshMs;
-
-  // Last Refreshed timestamp
-  private volatile long _lastRefreshedTime;
-
-  // The mean instantaneous value
-  private volatile long _value;
-
-  public AggregatedLongGauge(long refreshMs) {
-    _refreshMs = refreshMs;
-  }
-
-  public AggregatedLongGauge() {
-    _refreshMs = DEFAULT_REFRESH_MS;
-  }
-
-  /**
-   * Add Collection of metrics to be aggregated
-   * @return this instance
-   */
-  public AggregatedLongGauge<T, V> addAll(Collection<Gauge<T>> gauges) {
-    _gauges.addAll(gauges);
-    return this;
-  }
-
-  /**
-   * Add a metric to be aggregated
-   * @return this instance
-   */
-  public AggregatedLongGauge<T, V> add(Gauge<T> gauge) {
-    _gauges.add(gauge);
-    return this;
-  }
-
-  /**
-   * Remove a metric which was already added
-   * @return true if the metric was present in the list
-   */
-  public boolean remove(Gauge<T> gauge) {
-    return _gauges.remove(gauge);
-  }
-
-  /**
-   * Check elapsed time since last refresh and only refresh if time difference is
-   * greater than threshold.
-   */
-  private void refreshIfElapsed() {
-    long currentTime = System.currentTimeMillis();
-    if (currentTime - _lastRefreshedTime > _refreshMs && !_gauges.isEmpty()) {
-      refresh();
-      _lastRefreshedTime = currentTime;
-    }
-  }
-
-  public void refresh() {
-    long sum = 0;
-    for (Gauge<T> gauge : _gauges) {
-      sum += gauge.value().longValue();
-    }
-    _value = sum / _gauges.size();
-  }
-
-  @Override
-  public Long value() {
-    refreshIfElapsed();
-    return _value;
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMeter.java
deleted file mode 100644
index 221bb96..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMeter.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import com.yammer.metrics.core.Metered;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricProcessor;
-import com.yammer.metrics.core.Stoppable;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-
-
-/**
- * Aggregated Meter Stats to consolidate metrics from across consolidated meters. We can have multi-level
- * aggregation with this class. One example is
- *  You have a JVM wide aggregated metric for ClientStats and a peer specific clientStats
- *  ( client objects connection to the same server) and individual metrics.
- *
- *  We have refreshMs that will throttle the aggregation frequency to 1 minute (by default). The refresh happens
- *  in the same thread which called the metric method.
- *
- */
-public class AggregatedMeter<T extends Metered & Stoppable> implements Metered, Stoppable {
-
-  public static final long SECONDS_IN_ONE_MIN = 60;
-  public static final long SECONDS_IN_FIVE_MIN = SECONDS_IN_ONE_MIN * 5;
-  public static final long SECONDS_IN_FIFTEEN_MIN = SECONDS_IN_ONE_MIN * 15;
-
-  // Container of inner meters
-  private final List<T> _meters = new CopyOnWriteArrayList<T>();
-
-  private static final long DEFAULT_REFRESH_MS = 60 * 1000; // 1 minute
-
-  // Refresh Delay config
-  private final long _refreshMs;
-
-  // Last Refreshed timestamp
-  private volatile long _lastRefreshedTime;
-
-  // Count of entries
-  private volatile long _count;
-
-  // Rates
-  private volatile double _oneMinRate;
-  private volatile double _fiveMinRate;
-  private volatile double _fifteenMinRate;
-  private volatile double _meanRate;
-
-  public AggregatedMeter() {
-    _refreshMs = DEFAULT_REFRESH_MS;
-  }
-
-  public AggregatedMeter(long refreshMs) {
-    _refreshMs = refreshMs;
-  }
-
-  /**
-   * Add Collection of metrics to be aggregated
-   * @return this instance
-   */
-  public AggregatedMeter<T> addAll(Collection<T> meters) {
-    _meters.addAll(meters);
-    return this;
-  }
-
-  /**
-   * Add a metric to be aggregated
-   * @return this instance
-   */
-  public AggregatedMeter<T> add(T meter) {
-    _meters.add(meter);
-    return this;
-  }
-
-  /**
-   * Remove a metric which was already added
-   * @return true if the metric was present in the list
-   */
-  public boolean remove(T meter) {
-    return _meters.remove(meter);
-  }
-
-  @Override
-  public void stop() {
-    for (T m : _meters) {
-      m.stop();
-    }
-  }
-
-  @Override
-  public <T2> void processWith(MetricProcessor<T2> processor, MetricName name, T2 context)
-      throws Exception {
-    for (T m : _meters) {
-      m.processWith(processor, name, context);
-    }
-  }
-
-  /**
-   * Check elapsed time since last refresh and only refresh if time difference is
-   * greater than threshold.
-   */
-  private void refreshIfElapsed() {
-    long currentTime = System.currentTimeMillis();
-    if (currentTime - _lastRefreshedTime > _refreshMs && !_meters.isEmpty()) {
-      refresh();
-      _lastRefreshedTime = currentTime;
-    }
-  }
-
-  public void refresh() {
-    // Refresh 1 min
-    long oneMinSum = 0;
-    long fiveMinSum = 0;
-    long fifteenMinSum = 0;
-    long meanSum = 0;
-    int count = _meters.size();
-    _count = 0;
-    for (T m : _meters) {
-      oneMinSum += m.oneMinuteRate() * SECONDS_IN_ONE_MIN;
-      fiveMinSum += m.fiveMinuteRate() * SECONDS_IN_FIVE_MIN;
-      fifteenMinSum += m.fifteenMinuteRate() * SECONDS_IN_FIFTEEN_MIN;
-      meanSum += m.meanRate() * m.count();
-      _count += m.count();
-    }
-
-    _oneMinRate = oneMinSum / (count * SECONDS_IN_ONE_MIN * 1.0);
-    _fiveMinRate = fiveMinSum / (count * SECONDS_IN_FIVE_MIN * 1.0);
-    _fifteenMinRate = fifteenMinSum / (count * SECONDS_IN_FIFTEEN_MIN * 1.0);
-    _meanRate = meanSum / _count;
-  }
-
-  @Override
-  public TimeUnit rateUnit() {
-    if (_meters.isEmpty()) {
-      return null;
-    }
-    return _meters.get(0).rateUnit();
-  }
-
-  @Override
-  public String eventType() {
-    if (_meters.isEmpty()) {
-      return null;
-    }
-    return _meters.get(0).eventType();
-  }
-
-  @Override
-  public long count() {
-    refreshIfElapsed();
-    return _count;
-  }
-
-  @Override
-  public double fifteenMinuteRate() {
-    refreshIfElapsed();
-    return _fifteenMinRate;
-  }
-
-  @Override
-  public double fiveMinuteRate() {
-    refreshIfElapsed();
-    return _fiveMinRate;
-  }
-
-  @Override
-  public double meanRate() {
-    refreshIfElapsed();
-    return _meanRate;
-  }
-
-  @Override
-  public double oneMinuteRate() {
-    refreshIfElapsed();
-    return _oneMinRate;
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMetricsRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMetricsRegistry.java
deleted file mode 100644
index bc154ef..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AggregatedMetricsRegistry.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import com.yammer.metrics.core.Clock;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Metered;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Sampling;
-import com.yammer.metrics.core.Stoppable;
-
-
-/**
- * A metrics registry which extends {@link MetricsRegistry} and provides additional
- * APIs to register aggregated metrics to registry
- *
- *
- */
-public class AggregatedMetricsRegistry extends MetricsRegistry {
-
-  /**
-   * Creates a new {@link AggregatedMetricsRegistry}.
-   */
-  public AggregatedMetricsRegistry() {
-    super();
-  }
-
-  /**
-   * Creates a new {@link AggregatedMetricsRegistry} with the given {@link Clock} instance.
-   *
-   * @param clock    a {@link Clock} instance
-   */
-  public AggregatedMetricsRegistry(Clock clock) {
-    super(clock);
-  }
-
-  /**
-   * Creates a new {@link AggregatedCounter} and registers it under the given metric name.
-   *
-   * @param metricName the name of the metric
-   * @return a new {@link AggregatedCounter}
-   */
-  public AggregatedCounter newAggregatedCounter(MetricName metricName) {
-    return getOrAdd(metricName, new AggregatedCounter());
-  }
-
-  /**
-   * Creates a new {@link AggregatedHistogram} and registers it under the given metric name.
-   *
-   * @param metricName the name of the metric
-   * @return a new {@link AggregatedHistogram}
-   */
-  public <T extends Sampling> AggregatedHistogram<T> newAggregatedHistogram(MetricName metricName) {
-    return getOrAdd(metricName, new AggregatedHistogram<T>());
-  }
-
-  /**
-   * Creates a new {@link AggregatedMeter} and registers it under the given metric name.
-   *
-   * @param metricName the name of the metric
-   * @return a new {@link AggregatedMeter}
-   */
-  public <T extends Metered & Stoppable> AggregatedMeter<T> newAggregatedMeter(MetricName metricName) {
-    return getOrAdd(metricName, new AggregatedMeter<T>());
-  }
-
-  /**
-   * Creates a new {@link AggregatedLongGauge} and registers it under the given metric name.
-   *
-   * @param metricName the name of the metric
-   * @return a new {@link AggregatedLongGauge}
-   */
-  public <T extends Number, V extends Gauge<T>> AggregatedLongGauge<T, V> newAggregatedLongGauge(
-      MetricName metricName) {
-    return getOrAdd(metricName, new AggregatedLongGauge<T, V>());
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java
index c2c4ac6..3d9410d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMetrics.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Collection;
 import java.util.Collections;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 import static org.apache.pinot.common.utils.CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS;
 import static org.apache.pinot.common.utils.CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX;
@@ -37,16 +37,16 @@ public class BrokerMetrics extends AbstractMetrics<BrokerQueryPhase, BrokerMeter
    *
    * @param metricsRegistry The metric registry used to register timers and meters.
    */
-  public BrokerMetrics(MetricsRegistry metricsRegistry) {
+  public BrokerMetrics(PinotMetricsRegistry metricsRegistry) {
     this(metricsRegistry, DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptySet());
   }
 
-  public BrokerMetrics(MetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
+  public BrokerMetrics(PinotMetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
       Collection<String> allowedTables) {
     this(DEFAULT_METRICS_NAME_PREFIX, metricsRegistry, isTableLevelMetricsEnabled, allowedTables);
   }
 
-  public BrokerMetrics(String prefix, MetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
+  public BrokerMetrics(String prefix, PinotMetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
       Collection<String> allowedTables) {
     super(prefix, metricsRegistry, BrokerMetrics.class, isTableLevelMetricsEnabled, allowedTables);
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java
index 7190cfd..f803fad 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerMetrics.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 import static org.apache.pinot.common.utils.CommonConstants.Controller.DEFAULT_METRICS_PREFIX;
 
@@ -28,11 +28,11 @@ import static org.apache.pinot.common.utils.CommonConstants.Controller.DEFAULT_M
  */
 public class ControllerMetrics extends AbstractMetrics<AbstractMetrics.QueryPhase, ControllerMeter, ControllerGauge, ControllerTimer> {
 
-  public ControllerMetrics(MetricsRegistry metricsRegistry) {
+  public ControllerMetrics(PinotMetricsRegistry metricsRegistry) {
     this(DEFAULT_METRICS_PREFIX, metricsRegistry);
   }
 
-  public ControllerMetrics(String prefix, MetricsRegistry metricsRegistry) {
+  public ControllerMetrics(String prefix, PinotMetricsRegistry metricsRegistry) {
     super(prefix, metricsRegistry, ControllerMetrics.class);
   }
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
index 9e6cb13..2f22293 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
@@ -18,8 +18,8 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.reporting.JmxReporter;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,9 +32,9 @@ public class JmxReporterMetricsRegistryRegistrationListener implements MetricsRe
   private static final Logger LOGGER = LoggerFactory.getLogger(JmxReporterMetricsRegistryRegistrationListener.class);
 
   @Override
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry) {
+  public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
     LOGGER.info("Registering JmxReporterMetricsRegistryRegistrationListener");
-    new JmxReporter(metricsRegistry).start();
+    PinotMetricUtilsFactory.generatePinotJmxReporter(metricsRegistry).start();
     LOGGER.info("Number of metrics in metricsRegistry: {}", metricsRegistry.allMetrics().size());
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java
index f6e2eb1..3c27039 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java
@@ -18,17 +18,6 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.Metrics;
-import com.yammer.metrics.core.Counter;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.Histogram;
-import com.yammer.metrics.core.Meter;
-import com.yammer.metrics.core.Metered;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-import com.yammer.metrics.core.Sampling;
-import com.yammer.metrics.core.Stoppable;
-import com.yammer.metrics.core.Timer;
 import java.lang.reflect.Constructor;
 import java.util.Arrays;
 import java.util.List;
@@ -36,6 +25,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.base.PinotCounter;
+import org.apache.pinot.common.metrics.base.PinotGauge;
+import org.apache.pinot.common.metrics.base.PinotHistogram;
+import org.apache.pinot.common.metrics.base.PinotMeter;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotTimer;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +40,7 @@ import org.slf4j.LoggerFactory;
 public class MetricsHelper {
   private static final Logger LOGGER = LoggerFactory.getLogger(MetricsHelper.class);
 
-  private static Map<MetricsRegistry, Boolean> metricsRegistryMap = new ConcurrentHashMap<>();
+  private static Map<PinotMetricsRegistry, Boolean> metricsRegistryMap = new ConcurrentHashMap<>();
 
   private static Map<MetricsRegistryRegistrationListener, Boolean> metricsRegistryRegistrationListenersMap =
       new ConcurrentHashMap<>();
@@ -91,9 +87,9 @@ public class MetricsHelper {
       metricsRegistryRegistrationListenersMap.put(listener, Boolean.TRUE);
 
       // Fire events to register all previously registered metrics registries
-      Set<MetricsRegistry> metricsRegistries = metricsRegistryMap.keySet();
+      Set<PinotMetricsRegistry> metricsRegistries = metricsRegistryMap.keySet();
       LOGGER.info("Number of metrics registry: {}", metricsRegistries.size());
-      for (MetricsRegistry metricsRegistry : metricsRegistries) {
+      for (PinotMetricsRegistry metricsRegistry : metricsRegistries) {
         listener.onMetricsRegistryRegistered(metricsRegistry);
       }
     }
@@ -104,7 +100,7 @@ public class MetricsHelper {
    *
    * @param registry The registry to register
    */
-  public static void registerMetricsRegistry(MetricsRegistry registry) {
+  public static void registerMetricsRegistry(PinotMetricsRegistry registry) {
     synchronized (MetricsHelper.class) {
       metricsRegistryMap.put(registry, Boolean.TRUE);
 
@@ -129,30 +125,8 @@ public class MetricsHelper {
    * @param unit TimeUnit for rate determination
    * @return Meter
    */
-  public static Meter newMeter(MetricsRegistry registry, MetricName name, String eventType, TimeUnit unit) {
-    if (registry != null) {
-      return registry.newMeter(name, eventType, unit);
-    } else {
-      return Metrics.newMeter(name, eventType, unit);
-    }
-  }
-
-  /**
-   *
-   * Return an existing aggregated meter if registry is not null and a aggregated meter already exist
-   * with the same metric name. Otherwise, creates a new aggregated meter and registers (if registry not null)
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @return AggregatedMeter
-   */
-  public static <T extends Metered & Stoppable> AggregatedMeter<T> newAggregatedMeter(
-      AggregatedMetricsRegistry registry, MetricName name) {
-    if (registry != null) {
-      return registry.newAggregatedMeter(name);
-    } else {
-      return new AggregatedMeter<T>(); //not registered
-    }
+  public static PinotMeter newMeter(PinotMetricsRegistry registry, PinotMetricName name, String eventType, TimeUnit unit) {
+    return registry.newMeter(name, eventType, unit);
   }
 
   /**
@@ -165,29 +139,8 @@ public class MetricsHelper {
    * @param name metric name
    * @return Counter
    */
-  public static Counter newCounter(MetricsRegistry registry, MetricName name) {
-    if (registry != null) {
-      return registry.newCounter(name);
-    } else {
-      return Metrics.newCounter(name);
-    }
-  }
-
-  /**
-   *
-   * Return an existing aggregated counter if registry is not null and a aggregated counter already exist
-   * with the same metric name. Otherwise, creates a new aggregated counter and registers (if registry not null)
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @return AggregatedCounter
-   */
-  public static AggregatedCounter newAggregatedCounter(AggregatedMetricsRegistry registry, MetricName name) {
-    if (registry != null) {
-      return registry.newAggregatedCounter(name);
-    } else {
-      return new AggregatedCounter();
-    }
+  public static PinotCounter newCounter(PinotMetricsRegistry registry, PinotMetricName name) {
+    return registry.newCounter(name);
   }
 
   /**
@@ -201,30 +154,8 @@ public class MetricsHelper {
    * @param biased (true if uniform distribution, otherwise exponential weighted)
    * @return histogram
    */
-  public static Histogram newHistogram(MetricsRegistry registry, MetricName name, boolean biased) {
-    if (registry != null) {
-      return registry.newHistogram(name, biased);
-    } else {
-      return Metrics.newHistogram(name, biased);
-    }
-  }
-
-  /**
-   *
-   * Return an existing aggregated histogram if registry is not null and a aggregated histogram already exist
-   * with the same metric name. Otherwise, creates a new aggregated histogram and registers (if registry not null)
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @return AggregatedHistogram
-   */
-  public static <T extends Sampling> AggregatedHistogram<T> newAggregatedHistogram(AggregatedMetricsRegistry registry,
-      MetricName name) {
-    if (registry != null) {
-      return registry.newAggregatedHistogram(name);
-    } else {
-      return new AggregatedHistogram<T>();
-    }
+  public static PinotHistogram newHistogram(PinotMetricsRegistry registry, PinotMetricName name, boolean biased) {
+    return registry.newHistogram(name, biased);
   }
 
   /**
@@ -238,41 +169,15 @@ public class MetricsHelper {
    * @param gauge Underlying gauge to be tracked
    * @return gauge
    */
-  public static <T> Gauge<T> newGauge(MetricsRegistry registry, MetricName name, Gauge<T> gauge) {
-    if (registry != null) {
-      return registry.newGauge(name, gauge);
-    } else {
-      return Metrics.newGauge(name, gauge);
-    }
+  public static <T> PinotGauge<T> newGauge(PinotMetricsRegistry registry, PinotMetricName name, PinotGauge<T> gauge) {
+    return registry.newGauge(name, gauge);
   }
 
   /**
    * Removes an existing metric
    */
-  public static void removeMetric(MetricsRegistry registry, MetricName name) {
-    if (registry != null) {
-      registry.removeMetric(name);
-    } else {
-      Metrics.defaultRegistry().removeMetric(name);
-    }
-  }
-
-  /**
-   *
-   * Return an existing aggregated long gauge if registry is not null and a aggregated long gauge already exist
-   * with the same metric name. Otherwise, creates a new aggregated long gauge and registers (if registry not null)
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @return AggregatedLongGauge
-   */
-  public static <T extends Number, V extends Gauge<T>> AggregatedLongGauge<T, V> newAggregatedLongGauge(
-      AggregatedMetricsRegistry registry, MetricName name) {
-    if (registry != null) {
-      return registry.newAggregatedLongGauge(name);
-    } else {
-      return new AggregatedLongGauge<T, V>();
-    }
+  public static void removeMetric(PinotMetricsRegistry registry, PinotMetricName name) {
+    registry.removeMetric(name);
   }
 
   /**
@@ -287,12 +192,8 @@ public class MetricsHelper {
    * @param rateUnit TimeUnit for rate determination
    * @return Timer
    */
-  public static Timer newTimer(MetricsRegistry registry, MetricName name, TimeUnit durationUnit, TimeUnit rateUnit) {
-    if (registry != null) {
-      return registry.newTimer(name, durationUnit, rateUnit);
-    } else {
-      return Metrics.newTimer(name, durationUnit, rateUnit);
-    }
+  public static PinotTimer newTimer(PinotMetricsRegistry registry, PinotMetricName name, TimeUnit durationUnit, TimeUnit rateUnit) {
+    return registry.newTimer(name, durationUnit, rateUnit);
   }
 
   /**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
index 53d2193..152f35c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 
 /**
@@ -26,5 +26,5 @@ import com.yammer.metrics.core.MetricsRegistry;
  *
  */
 public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry);
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
index ca4e464..cf44b2c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.common.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Collection;
 import java.util.Collections;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ENABLE_TABLE_LEVEL_METRICS;
 import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_METRICS_PREFIX;
@@ -32,16 +32,16 @@ import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_METRI
  */
 public class ServerMetrics extends AbstractMetrics<ServerQueryPhase, ServerMeter, ServerGauge, ServerTimer> {
 
-  public ServerMetrics(MetricsRegistry metricsRegistry) {
+  public ServerMetrics(PinotMetricsRegistry metricsRegistry) {
     this(DEFAULT_METRICS_PREFIX, metricsRegistry, DEFAULT_ENABLE_TABLE_LEVEL_METRICS, Collections.emptySet());
   }
 
-  public ServerMetrics(MetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
+  public ServerMetrics(PinotMetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
       Collection<String> allowedTables) {
     this(DEFAULT_METRICS_PREFIX, metricsRegistry, isTableLevelMetricsEnabled, allowedTables);
   }
 
-  public ServerMetrics(String prefix, MetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
+  public ServerMetrics(String prefix, PinotMetricsRegistry metricsRegistry, boolean isTableLevelMetricsEnabled,
       Collection<String> allowedTables) {
     super(prefix, metricsRegistry, ServerMetrics.class, isTableLevelMetricsEnabled, allowedTables);
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index 9b87b74..9105814 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -19,28 +19,30 @@
 package org.apache.pinot.common.metrics;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.yammer.metrics.core.Gauge;
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.base.PinotGauge;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 
 /**
  * Validation metrics utility class, which contains the glue code to publish metrics.
  */
 public class ValidationMetrics {
-  private final MetricsRegistry _metricsRegistry;
+  private final PinotMetricsRegistry _metricsRegistry;
   private final Map<String, Long> _gaugeValues = new HashMap<>();
-  private final Set<MetricName> _metricNames = new HashSet<>();
+  private final Set<PinotMetricName> _metricNames = new HashSet<>();
 
   /**
    * A simple gauge that returns whatever last value was stored in the _gaugeValues hash map.
    */
-  private class StoredValueGauge extends Gauge<Long> {
+  private class StoredValueGauge extends PinotGauge<Long> {
     private final String key;
 
     public StoredValueGauge(String key) {
@@ -51,13 +53,17 @@ public class ValidationMetrics {
     public Long value() {
       return _gaugeValues.get(key);
     }
+
+    @Override
+    public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context) {
+    }
   }
 
   /**
    * A simple gauge that returns the difference in hours between the current system time and the value stored in the
    * _gaugeValues hash map.
    */
-  private class CurrentTimeMillisDeltaGaugeHours extends Gauge<Double> {
+  private class CurrentTimeMillisDeltaGaugeHours extends PinotGauge<Double> {
     private final String key;
 
     private final double MILLIS_PER_HOUR = TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
@@ -76,22 +82,26 @@ public class ValidationMetrics {
         return Double.MIN_VALUE;
       }
     }
+
+    @Override
+    public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context) {
+    }
   }
 
   private interface GaugeFactory<T> {
-    Gauge<T> buildGauge(final String key);
+    PinotGauge<T> buildGauge(final String key);
   }
 
   private class StoredValueGaugeFactory implements GaugeFactory<Long> {
     @Override
-    public Gauge<Long> buildGauge(final String key) {
+    public PinotGauge<Long> buildGauge(final String key) {
       return new StoredValueGauge(key);
     }
   }
 
   private class CurrentTimeMillisDeltaGaugeHoursFactory implements GaugeFactory<Double> {
     @Override
-    public Gauge<Double> buildGauge(final String key) {
+    public PinotGauge<Double> buildGauge(final String key) {
       return new CurrentTimeMillisDeltaGaugeHours(key);
     }
   }
@@ -105,7 +115,7 @@ public class ValidationMetrics {
    *
    * @param metricsRegistry The metrics registry used to store all the gauges.
    */
-  public ValidationMetrics(MetricsRegistry metricsRegistry) {
+  public ValidationMetrics(PinotMetricsRegistry metricsRegistry) {
     _metricsRegistry = metricsRegistry;
   }
 
@@ -184,11 +194,11 @@ public class ValidationMetrics {
     return "pinot.controller." + resource + "." + gaugeName;
   }
 
-  private MetricName makeMetricName(final String gaugeName) {
-    return new MetricName(ValidationMetrics.class, gaugeName);
+  private PinotMetricName makeMetricName(final String gaugeName) {
+    return PinotMetricUtilsFactory.generatePinotMetricName(ValidationMetrics.class, gaugeName);
   }
 
-  private void makeGauge(final String gaugeName, final MetricName metricName, final GaugeFactory<?> gaugeFactory,
+  private void makeGauge(final String gaugeName, final PinotMetricName metricName, final GaugeFactory<?> gaugeFactory,
       final long value) {
     if (!_gaugeValues.containsKey(gaugeName)) {
       _gaugeValues.put(gaugeName, value);
@@ -203,7 +213,7 @@ public class ValidationMetrics {
    * Unregisters all validation metrics.
    */
   public void unregisterAllMetrics() {
-    for (MetricName metricName : _metricNames) {
+    for (PinotMetricName metricName : _metricNames) {
       MetricsHelper.removeMetric(_metricsRegistry, metricName);
     }
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotCounter.java
similarity index 69%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotCounter.java
index 53d2193..3d9db6e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotCounter.java
@@ -16,15 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
+public interface PinotCounter extends PinotMetric {
 
-
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  Object getCounter();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotGauge.java
similarity index 70%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotGauge.java
index 53d2193..ff826e8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotGauge.java
@@ -16,15 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
 
+public abstract class PinotGauge<T> implements PinotMetric {
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  public Object getGauge() {
+    return null;
+  }
+
+  /**
+   * Returns the metric's current value.
+   *
+   * @return the metric's current value
+   */
+  public abstract T value();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotHistogram.java
similarity index 70%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotHistogram.java
index 53d2193..bdfb7eb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotHistogram.java
@@ -16,15 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
 
+public interface PinotHistogram extends PinotMetric {
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  Object getHistogram();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotJmxReporter.java
similarity index 70%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotJmxReporter.java
index 53d2193..a7b939e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotJmxReporter.java
@@ -16,15 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
 
+public interface PinotJmxReporter {
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  void start();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMeter.java
similarity index 69%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMeter.java
index 53d2193..3471795 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMeter.java
@@ -16,15 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
+public interface PinotMeter {
 
-
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  void mark(final long unitCount);
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetered.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetered.java
new file mode 100644
index 0000000..2918669
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetered.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.base;
+
+import java.util.concurrent.TimeUnit;
+
+
+public interface PinotMetered extends PinotMetric {
+
+  Object getMetered();
+
+  /**
+   * Returns the meter's rate unit.
+   *
+   * @return the meter's rate unit
+   */
+  TimeUnit rateUnit();
+
+  /**
+   * Returns the type of events the meter is measuring.
+   *
+   * @return the meter's event type
+   */
+  String eventType();
+
+  /**
+   * Returns the number of events which have been marked.
+   *
+   * @return the number of events which have been marked
+   */
+  long count();
+
+  /**
+   * Returns the fifteen-minute exponentially-weighted moving average rate at which events have
+   * occurred since the meter was created.
+   * <p/>
+   * This rate has the same exponential decay factor as the fifteen-minute load average in the
+   * {@code top} Unix command.
+   *
+   * @return the fifteen-minute exponentially-weighted moving average rate at which events have
+   *         occurred since the meter was created
+   */
+  double fifteenMinuteRate();
+
+  /**
+   * Returns the five-minute exponentially-weighted moving average rate at which events have
+   * occurred since the meter was created.
+   * <p/>
+   * This rate has the same exponential decay factor as the five-minute load average in the {@code
+   * top} Unix command.
+   *
+   * @return the five-minute exponentially-weighted moving average rate at which events have
+   *         occurred since the meter was created
+   */
+  double fiveMinuteRate();
+
+  /**
+   * Returns the mean rate at which events have occurred since the meter was created.
+   *
+   * @return the mean rate at which events have occurred since the meter was created
+   */
+  double meanRate();
+
+  /**
+   * Returns the one-minute exponentially-weighted moving average rate at which events have
+   * occurred since the meter was created.
+   * <p/>
+   * This rate has the same exponential decay factor as the one-minute load average in the {@code
+   * top} Unix command.
+   *
+   * @return the one-minute exponentially-weighted moving average rate at which events have
+   *         occurred since the meter was created
+   */
+  double oneMinuteRate();
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetric.java
similarity index 57%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetric.java
index 53d2193..a8bfc8d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetric.java
@@ -16,15 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
 
+public interface PinotMetric {
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  /**
+   * Allow the given {@link PinotMetricProcessor} to process {@code this} as a metric.
+   *
+   * @param processor    a {@link PinotMetricProcessor}
+   * @param name         the name of the current metric
+   * @param context      a given context which should be passed on to {@code processor}
+   * @param <T>          the type of the context object
+   * @throws Exception if something goes wrong
+   */
+  <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context) throws Exception;
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricName.java
similarity index 70%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricName.java
index 53d2193..8cf69d5 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricName.java
@@ -16,15 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
 
+public interface PinotMetricName {
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+  Object getMetricName();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricProcessor.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricProcessor.java
new file mode 100644
index 0000000..1b1f42d
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricProcessor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.base;
+
+
+public interface PinotMetricProcessor<T> {
+  /**
+   * Process the given {@link PinotMetered} instance.
+   *
+   * @param name       the name of the meter
+   * @param meter      the meter
+   * @param context    the context of the meter
+   * @throws Exception if something goes wrong
+   */
+  void processMeter(PinotMetricName name, PinotMetered meter, T context) throws Exception;
+
+  /**
+   * Process the given counter.
+   *
+   * @param name       the name of the counter
+   * @param counter    the counter
+   * @param context    the context of the meter
+   * @throws Exception if something goes wrong
+   */
+  void processCounter(PinotMetricName name, PinotCounter counter, T context) throws Exception;
+
+  /**
+   * Process the given histogram.
+   *
+   * @param name       the name of the histogram
+   * @param histogram  the histogram
+   * @param context    the context of the meter
+   * @throws Exception if something goes wrong
+   */
+  void processHistogram(PinotMetricName name, PinotHistogram histogram, T context) throws Exception;
+
+  /**
+   * Process the given timer.
+   *
+   * @param name       the name of the timer
+   * @param timer      the timer
+   * @param context    the context of the meter
+   * @throws Exception if something goes wrong
+   */
+  void processTimer(PinotMetricName name, PinotTimer timer, T context) throws Exception;
+
+  /**
+   * Process the given gauge.
+   *
+   * @param name       the name of the gauge
+   * @param gauge      the gauge
+   * @param context    the context of the meter
+   * @throws Exception if something goes wrong
+   */
+  void processGauge(PinotMetricName name, PinotGauge<?> gauge, T context) throws Exception;
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricUtilsFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricUtilsFactory.java
new file mode 100644
index 0000000..24536d1
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricUtilsFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.base;
+
+import java.util.function.Function;
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metrics.yammer.YammerGauge;
+import org.apache.pinot.common.metrics.yammer.YammerJmxReporter;
+import org.apache.pinot.common.metrics.yammer.YammerMetricName;
+import org.apache.pinot.common.metrics.yammer.YammerMetricsRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class PinotMetricUtilsFactory {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotMetricUtilsFactory.class);
+  public static final String LIBRARY_NAME_KEY = "libraryName";
+  public static final String YAMMER_KEY = "yammer";
+
+  public static String LIBRARY_TO_USE = YAMMER_KEY;
+
+  public static void init(String libraryName)
+      throws InvalidConfigException {
+    if (libraryName == null) {
+      return;
+    }
+    switch (libraryName) {
+      case YAMMER_KEY:
+        LIBRARY_TO_USE = YAMMER_KEY;
+        break;
+      // TODO: support more libraries.
+      default:
+        throw new InvalidConfigException("PinotMetricsRegistry for " + libraryName + " cannot be initialized.");
+    }
+    LOGGER.info("Setting metric library to: " + LIBRARY_TO_USE);
+  }
+
+  public static PinotMetricsRegistry getPinotMetricsRegistry() {
+    switch (LIBRARY_TO_USE) {
+      case YAMMER_KEY:
+        return new YammerMetricsRegistry();
+      //TODO: support more libraries.
+      default:
+        return new YammerMetricsRegistry();
+    }
+  }
+
+  public static PinotMetricName generatePinotMetricName(Class<?> klass, String name) {
+    switch (LIBRARY_TO_USE) {
+      case YAMMER_KEY:
+        return new YammerMetricName(klass, name);
+      //TODO: support more libraries.
+      default:
+        return new YammerMetricName(klass, name);
+    }
+  }
+
+  public static <T> PinotGauge<T> generatePinotGauge(Function<Void, T> condition) {
+    switch (LIBRARY_TO_USE) {
+      case YAMMER_KEY:
+        return new YammerGauge<T>(condition);
+      //TODO: support more libraries.
+      default:
+        return new YammerGauge<T>(condition);
+    }
+  }
+
+  public static PinotJmxReporter generatePinotJmxReporter(PinotMetricsRegistry metricsRegistry) {
+    switch (LIBRARY_TO_USE) {
+      case YAMMER_KEY:
+        return new YammerJmxReporter(metricsRegistry);
+      //TODO: support more libraries.
+      default:
+        return new YammerJmxReporter(metricsRegistry);
+    }
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricsRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricsRegistry.java
new file mode 100644
index 0000000..85eae36
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotMetricsRegistry.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.base;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public interface PinotMetricsRegistry {
+
+  void removeMetric(PinotMetricName name);
+
+  <T>PinotGauge<T> newGauge(PinotMetricName name, PinotGauge<T> gauge);
+
+  PinotMeter newMeter(PinotMetricName name, String eventType, TimeUnit unit);
+
+  PinotCounter newCounter(PinotMetricName name);
+
+  PinotTimer newTimer(PinotMetricName name, TimeUnit durationUnit, TimeUnit rateUnit);
+
+  PinotHistogram newHistogram(PinotMetricName name, boolean biased);
+
+  /**
+   * Returns an unmodifiable map of all metrics and their names.
+   *
+   * @return an unmodifiable map of all metrics and their names
+   */
+  Map<PinotMetricName, PinotMetric> allMetrics();
+
+  Object getMetricsRegistry();
+
+//  newAggregatedMeter(PinotMetricName name);
+
+  void shutdown();
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotTimer.java
similarity index 70%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotTimer.java
index 53d2193..5c9d2a6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/base/PinotTimer.java
@@ -16,15 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.base;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import java.util.concurrent.TimeUnit;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public interface PinotTimer extends PinotMetered {
+
+  void update(long duration, TimeUnit unit);
+
+  Object getTimer();
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java
similarity index 52%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java
index 53d2193..2a9df59 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java
@@ -16,15 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.yammer;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Counter;
+import org.apache.pinot.common.metrics.base.PinotCounter;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public class YammerCounter extends YammerMetric implements PinotCounter {
+  private Counter _counter;
+
+  public YammerCounter(Counter counter) {
+    super(counter);
+    _counter = counter;
+  }
+
+  @Override
+  public Object getCounter() {
+    return _counter;
+  }
+
+  @Override
+  public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context)
+      throws Exception {
+    processor.processCounter(name, this, context);
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java
new file mode 100644
index 0000000..811d666
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.yammer;
+
+import com.yammer.metrics.core.Gauge;
+import java.util.function.Function;
+import org.apache.pinot.common.metrics.base.PinotGauge;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
+
+
+public class YammerGauge<T> extends PinotGauge<T> {
+
+  private final Gauge<T> _gauge;
+
+  public YammerGauge(Gauge<T> gauge) {
+    _gauge = gauge;
+  }
+
+  public YammerGauge(Function<Void, T> condition) {
+    this(new Gauge<T>() {
+      @Override
+      public T value() {
+        return condition.apply(null);
+      }
+    });
+  }
+
+  @Override
+  public Object getGauge() {
+    return _gauge;
+  }
+
+  @Override
+  public T value() {
+    return _gauge.value();
+  }
+
+  @Override
+  public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context)
+      throws Exception {
+    processor.processGauge(name, this, context);
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java
similarity index 60%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java
index 53d2193..1ca56f0 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java
@@ -16,15 +16,23 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.yammer;
 
 import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.reporting.JmxReporter;
+import org.apache.pinot.common.metrics.base.PinotJmxReporter;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public class YammerJmxReporter implements PinotJmxReporter {
+  private final JmxReporter _jmxReporter;
+
+  public YammerJmxReporter(PinotMetricsRegistry metricsRegistry) {
+    _jmxReporter = new JmxReporter((MetricsRegistry) metricsRegistry.getMetricsRegistry());
+  }
+
+  @Override
+  public void start() {
+    _jmxReporter.start();
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java
similarity index 53%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java
index 53d2193..88a2371 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java
@@ -16,15 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.yammer;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Meter;
+import org.apache.pinot.common.metrics.base.PinotMeter;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public class YammerMeter extends YammerMetered implements PinotMeter {
+  private Meter _meter;
+
+  public YammerMeter(Meter meter) {
+    super(meter);
+    _meter = meter;
+  }
+
+  @Override
+  public void mark(long unitCount) {
+    _meter.mark(unitCount);
+  }
+
+  @Override
+  public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context)
+      throws Exception {
+    processor.processMeter(name, this, context);
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java
new file mode 100644
index 0000000..bed13e0
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.yammer;
+
+import com.yammer.metrics.core.Metered;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.base.PinotMetered;
+
+
+public abstract class YammerMetered implements PinotMetered {
+  private final Metered _metered;
+
+  public YammerMetered(Metered metered) {
+    _metered = metered;
+  }
+
+  @Override
+  public Object getMetered() {
+    return _metered;
+  }
+
+  @Override
+  public TimeUnit rateUnit() {
+    return _metered.rateUnit();
+  }
+
+  @Override
+  public String eventType() {
+    return _metered.eventType();
+  }
+
+  @Override
+  public long count() {
+    return _metered.count();
+  }
+
+  @Override
+  public double fifteenMinuteRate() {
+    return _metered.fifteenMinuteRate();
+  }
+
+  @Override
+  public double fiveMinuteRate() {
+    return _metered.fiveMinuteRate();
+  }
+
+  @Override
+  public double meanRate() {
+    return _metered.meanRate();
+  }
+
+  @Override
+  public double oneMinuteRate() {
+    return _metered.oneMinuteRate();
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java
similarity index 57%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java
index 53d2193..f02f600 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java
@@ -16,15 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.yammer;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Metric;
+import org.apache.pinot.common.metrics.base.PinotMetric;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public class YammerMetric implements PinotMetric {
+  private Metric _metric;
+
+  public YammerMetric(Metric metric) {
+    _metric = metric;
+  }
+
+  public Metric getMetric() {
+    return _metric;
+  }
+
+  @Override
+  public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context)
+      throws Exception {
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java
similarity index 61%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java
index 53d2193..a1d94e6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java
@@ -16,15 +16,25 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.common.metrics.yammer;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.MetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
 
 
-/**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
- *
- */
-public interface MetricsRegistryRegistrationListener {
-  public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry);
+public class YammerMetricName implements PinotMetricName {
+  private MetricName _metricName;
+
+  public YammerMetricName(Class<?> klass, String name) {
+    _metricName = new MetricName(klass, name);
+  }
+
+  public YammerMetricName(MetricName metricName) {
+    _metricName = metricName;
+  }
+
+  @Override
+  public MetricName getMetricName() {
+    return _metricName;
+  }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricProcessor.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricProcessor.java
new file mode 100644
index 0000000..89968b8
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricProcessor.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.yammer;
+
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.Metered;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricProcessor;
+import com.yammer.metrics.core.Timer;
+import org.apache.pinot.common.metrics.base.PinotCounter;
+import org.apache.pinot.common.metrics.base.PinotGauge;
+import org.apache.pinot.common.metrics.base.PinotHistogram;
+import org.apache.pinot.common.metrics.base.PinotMetered;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
+import org.apache.pinot.common.metrics.base.PinotTimer;
+
+
+public class YammerMetricProcessor<T> implements PinotMetricProcessor<T> {
+  private MetricProcessor<T> _metricProcessor;
+
+  public YammerMetricProcessor(MetricProcessor<T> metricProcessor) {
+    _metricProcessor = metricProcessor;
+  }
+
+  @Override
+  public void processMeter(PinotMetricName name, PinotMetered meter, T context)
+      throws Exception {
+    _metricProcessor.processMeter((MetricName) name.getMetricName(), (Metered) meter.getMetered(), context);
+  }
+
+  @Override
+  public void processCounter(PinotMetricName name, PinotCounter counter, T context)
+      throws Exception {
+    _metricProcessor.processCounter((MetricName) name.getMetricName(), (Counter) counter.getCounter(), context);
+  }
+
+  @Override
+  public void processHistogram(PinotMetricName name, PinotHistogram histogram, T context)
+      throws Exception {
+    _metricProcessor.processHistogram((MetricName) name.getMetricName(), (Histogram) histogram.getHistogram(), context);
+  }
+
+  @Override
+  public void processTimer(PinotMetricName name, PinotTimer timer, T context)
+      throws Exception {
+    _metricProcessor.processTimer((MetricName) name.getMetricName(), (Timer) timer.getTimer(), context);
+  }
+
+  @Override
+  public void processGauge(PinotMetricName name, PinotGauge<?> gauge, T context)
+      throws Exception {
+    _metricProcessor.processGauge((MetricName) name.getMetricName(), (Gauge<?>) gauge.getGauge(), context);
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java
new file mode 100644
index 0000000..ba4cc9e
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.yammer;
+
+import com.yammer.metrics.core.Clock;
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.Metric;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.base.PinotCounter;
+import org.apache.pinot.common.metrics.base.PinotGauge;
+import org.apache.pinot.common.metrics.base.PinotHistogram;
+import org.apache.pinot.common.metrics.base.PinotMeter;
+import org.apache.pinot.common.metrics.base.PinotMetric;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotTimer;
+
+
+public class YammerMetricsRegistry implements PinotMetricsRegistry {
+  MetricsRegistry _metricsRegistry;
+
+  public YammerMetricsRegistry() {
+    _metricsRegistry = new MetricsRegistry();
+  }
+
+  public YammerMetricsRegistry(Clock clock) {
+    _metricsRegistry = new MetricsRegistry(clock);
+  }
+
+  @Override
+  public void removeMetric(PinotMetricName name) {
+
+  }
+
+  @Override
+  public <T>PinotGauge<T> newGauge(PinotMetricName name, PinotGauge<T> gauge) {
+    return new YammerGauge<T>(_metricsRegistry.newGauge((MetricName) name.getMetricName(), (Gauge<T>) gauge.getGauge()));
+  }
+
+  @Override
+  public PinotMeter newMeter(PinotMetricName name, String eventType, TimeUnit unit) {
+    return new YammerMeter(_metricsRegistry.newMeter((MetricName) name.getMetricName(), eventType, unit));
+  }
+
+  @Override
+  public PinotCounter newCounter(PinotMetricName name) {
+    return new YammerCounter(_metricsRegistry.newCounter((MetricName) name.getMetricName()));
+  }
+
+  @Override
+  public PinotTimer newTimer(PinotMetricName name, TimeUnit durationUnit, TimeUnit rateUnit) {
+    return new YammerTimer(_metricsRegistry.newTimer((MetricName) name.getMetricName(), durationUnit, rateUnit));
+  }
+
+  @Override
+  public PinotHistogram newHistogram(PinotMetricName name, boolean biased) {
+    return null;
+  }
+
+  @Override
+  public Map<PinotMetricName, PinotMetric> allMetrics() {
+    Map<MetricName, Metric> yammerMetrics = _metricsRegistry.allMetrics();
+    Map<PinotMetricName, PinotMetric> allMetrics = new HashMap<>();
+    for (Map.Entry<MetricName, Metric> entry : yammerMetrics.entrySet()) {
+      allMetrics.put(new YammerMetricName(entry.getKey()), new YammerMetric(entry.getValue()));
+    }
+    return allMetrics;
+  }
+
+  @Override
+  public Object getMetricsRegistry() {
+    return _metricsRegistry;
+  }
+
+  @Override
+  public void shutdown() {
+    _metricsRegistry.shutdown();
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java
new file mode 100644
index 0000000..28fc412
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics.yammer;
+
+import com.yammer.metrics.core.Timer;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.metrics.base.PinotMetricName;
+import org.apache.pinot.common.metrics.base.PinotMetricProcessor;
+import org.apache.pinot.common.metrics.base.PinotTimer;
+
+
+public class YammerTimer implements PinotTimer {
+  private Timer _timer;
+
+  public YammerTimer(Timer timer) {
+    _timer = timer;
+  }
+
+  @Override
+  public void update(long duration, TimeUnit unit) {
+    _timer.update(duration, unit);
+  }
+
+  @Override
+  public Object getTimer() {
+    return _timer;
+  }
+
+  @Override
+  public Object getMetered() {
+    return _timer;
+  }
+
+  @Override
+  public TimeUnit rateUnit() {
+    return _timer.rateUnit();
+  }
+
+  @Override
+  public String eventType() {
+    return _timer.eventType();
+  }
+
+  @Override
+  public long count() {
+    return _timer.count();
+  }
+
+  @Override
+  public double fifteenMinuteRate() {
+    return _timer.fifteenMinuteRate();
+  }
+
+  @Override
+  public double fiveMinuteRate() {
+    return _timer.fiveMinuteRate();
+  }
+
+  @Override
+  public double meanRate() {
+    return _timer.meanRate();
+  }
+
+  @Override
+  public double oneMinuteRate() {
+    return _timer.oneMinuteRate();
+  }
+
+  @Override
+  public <T> void processWith(PinotMetricProcessor<T> processor, PinotMetricName name, T context)
+      throws Exception {
+    processor.processTimer(name, this, context);
+  }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java
index 41b55b8..aa6c52e 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java
@@ -24,12 +24,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.testng.annotations.Test;
 
-import com.yammer.metrics.core.MetricName;
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 /**
  * Tests for the MetricsHelper class.
@@ -41,36 +41,37 @@ public class MetricsHelperTest {
 
   public static class ListenerOne implements MetricsRegistryRegistrationListener {
     @Override
-    public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry) {
+    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
       listenerOneOkay = true;
     }
   }
 
   public static class ListenerTwo implements MetricsRegistryRegistrationListener {
     @Override
-    public void onMetricsRegistryRegistered(MetricsRegistry metricsRegistry) {
+    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
       listenerTwoOkay = true;
     }
   }
 
   @Test
-  public void testMetricsHelperRegistration() {
+  public void testMetricsHelperRegistration()
+      throws InvalidConfigException {
     listenerOneOkay = false;
     listenerTwoOkay = false;
 
     Map<String, Object> properties = new HashMap<>();
     properties.put("pinot.broker.metrics.metricsRegistryRegistrationListeners",
         ListenerOne.class.getName() + "," + ListenerTwo.class.getName());
-    
-    PinotConfiguration configuration = new PinotConfiguration(properties);
 
-    MetricsRegistry registry = new MetricsRegistry();
+    PinotConfiguration configuration = new PinotConfiguration(properties);
+    PinotMetricUtilsFactory.init(configuration.getProperty(PinotMetricUtilsFactory.LIBRARY_NAME_KEY));
+    PinotMetricsRegistry registry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
 
     // Initialize the MetricsHelper and create a new timer
     MetricsHelper.initializeMetrics(configuration.subset("pinot.broker.metrics"));
     MetricsHelper.registerMetricsRegistry(registry);
-    MetricsHelper.newTimer(registry, new MetricName(MetricsHelperTest.class, "dummy"), TimeUnit.MILLISECONDS,
-        TimeUnit.MILLISECONDS);
+    MetricsHelper.newTimer(registry, PinotMetricUtilsFactory.generatePinotMetricName(MetricsHelperTest.class, "dummy"),
+        TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS);
 
     // Check that the two listeners fired
     assertTrue(listenerOneOkay);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsFactoryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsFactoryTest.java
new file mode 100644
index 0000000..6cc88fe
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsFactoryTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metrics;
+
+import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class PinotMetricUtilsFactoryTest {
+
+  @Test
+  public void testPinotMetricsRegistryFactory() {
+    PinotMetricsRegistry pinotMetricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+    Assert.assertNotNull(pinotMetricsRegistry);
+    Assert.assertEquals(pinotMetricsRegistry.getClass().getSimpleName(), "YammerMetricsRegistry");
+
+    try {
+      PinotMetricUtilsFactory.init("badLibraryName");
+      Assert.fail("Fail to initialize PinotMetricsRegistry of yammer");
+    } catch (InvalidConfigException e) {
+      // Expected.
+    }
+
+    try {
+      PinotMetricUtilsFactory.init("yammer");
+    } catch (InvalidConfigException e) {
+      Assert.fail("Fail to initialize PinotMetricsRegistry of yammer");
+    }
+
+    pinotMetricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+    Assert.assertNotNull(pinotMetricsRegistry);
+    Assert.assertEquals(pinotMetricsRegistry.getClass().getSimpleName(), "YammerMetricsRegistry");
+  }
+}
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
index 830a2f1..8fb6832 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
@@ -20,9 +20,9 @@ package org.apache.pinot.connector.spark.connector
 
 import java.util.{List => JList, Map => JMap}
 
-import com.yammer.metrics.core.MetricsRegistry
 import org.apache.helix.model.InstanceConfig
 import org.apache.pinot.common.metrics.BrokerMetrics
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory
 import org.apache.pinot.common.request.BrokerRequest
 import org.apache.pinot.common.utils.DataTable
 import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
@@ -45,7 +45,7 @@ private[pinot] class PinotServerDataFetcher(
   extends Logging {
   private val sqlCompiler = new CalciteSqlCompiler()
   private val brokerId = "apache_spark"
-  private val metricsRegistry = new MetricsRegistry()
+  private val metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry
   private val brokerMetrics = new BrokerMetrics(metricsRegistry)
   private val queryRouter = new QueryRouter(brokerId, brokerMetrics)
   // TODO add support for TLS-secured server
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 233f85e..22787e5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -21,7 +21,6 @@ package org.apache.pinot.controller;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -30,7 +29,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -50,10 +48,13 @@ import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TaskDriver;
 import org.apache.pinot.common.Utils;
+import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.MetricsHelper;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ValidationMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
@@ -95,6 +96,8 @@ import org.glassfish.hk2.utilities.binding.AbstractBinder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory.LIBRARY_NAME_KEY;
+
 
 public class ControllerStarter implements ServiceStartable {
   private static final Logger LOGGER = LoggerFactory.getLogger(ControllerStarter.class);
@@ -103,15 +106,13 @@ public class ControllerStarter implements ServiceStartable {
   private static final Long DATA_DIRECTORY_MISSING_VALUE = 1000000L;
   private static final Long DATA_DIRECTORY_EXCEPTION_VALUE = 1100000L;
   private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
-  private static final String MAX_STATE_TRANSITIONS_PER_INSTANCE =  "MaxStateTransitionsPerInstance";
+  private static final String MAX_STATE_TRANSITIONS_PER_INSTANCE = "MaxStateTransitionsPerInstance";
 
   private final ControllerConf _config;
   private final List<ListenerConfig> _listenerConfigs;
   private final ControllerAdminApiApplication _adminApp;
   // TODO: rename this variable once it's full separated with Helix controller.
   private final PinotHelixResourceManager _helixResourceManager;
-  private final MetricsRegistry _metricsRegistry;
-  private final ControllerMetrics _controllerMetrics;
   private final ExecutorService _executorService;
 
   private final String _helixZkURL;
@@ -124,6 +125,8 @@ public class ControllerStarter implements ServiceStartable {
 
   private HelixManager _helixControllerManager;
   private HelixManager _helixParticipantManager;
+  private PinotMetricsRegistry _metricsRegistry;
+  private ControllerMetrics _controllerMetrics;
 
   // Can only be constructed after resource manager getting started
   private OfflineSegmentIntervalChecker _offlineSegmentIntervalChecker;
@@ -154,14 +157,12 @@ public class ControllerStarter implements ServiceStartable {
 
     String host = conf.getControllerHost();
     int port = _listenerConfigs.get(0).getPort();
-    
+
     _helixControllerInstanceId = host + "_" + port;
     _helixParticipantInstanceId = LeadControllerUtils.generateParticipantInstanceId(host, port);
     _isUpdateStateModel = _config.isUpdateSegmentStateModel();
     _enableBatchMessageMode = _config.getEnableBatchMessageMode();
 
-    _metricsRegistry = new MetricsRegistry();
-    _controllerMetrics = new ControllerMetrics(conf.getMetricsPrefix(), _metricsRegistry);
     _serviceStatusCallbackList = new ArrayList<>();
     if (_controllerMode == ControllerConf.ControllerMode.HELIX_ONLY) {
       _adminApp = null;
@@ -273,9 +274,7 @@ public class ControllerStarter implements ServiceStartable {
     Utils.logVersions();
 
     // Set up controller metrics
-    MetricsHelper.initializeMetrics(_config.subset(METRICS_REGISTRY_NAME));
-    MetricsHelper.registerMetricsRegistry(_metricsRegistry);
-    _controllerMetrics.initializeGlobalMeters();
+    initControllerMetrics();
 
     switch (_controllerMode) {
       case DUAL:
@@ -292,8 +291,8 @@ public class ControllerStarter implements ServiceStartable {
         LOGGER.error("Invalid mode: " + _controllerMode);
     }
 
-    ServiceStatus
-        .setServiceStatusCallback(_helixParticipantInstanceId, new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
+    ServiceStatus.setServiceStatusCallback(_helixParticipantInstanceId,
+        new ServiceStatus.MultipleCallbackServiceStatusCallback(_serviceStatusCallbackList));
   }
 
   private void setUpHelixController() {
@@ -319,8 +318,8 @@ public class ControllerStarter implements ServiceStartable {
   private void setUpPinotController() {
     // install default SSL context if necessary (even if not force-enabled everywhere)
     TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_config, ControllerConf.CONTROLLER_TLS_PREFIX);
-    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath())
-        || StringUtils.isNotBlank(tlsDefaults.getTrustStorePath())) {
+    if (StringUtils.isNotBlank(tlsDefaults.getKeyStorePath()) || StringUtils
+        .isNotBlank(tlsDefaults.getTrustStorePath())) {
       LOGGER.info("Installing default SSL context for any client requests");
       TlsUtils.installDefaultSSLSocketFactory(tlsDefaults);
     }
@@ -477,9 +476,24 @@ public class ControllerStarter implements ServiceStartable {
     };
   }
 
+  private void initControllerMetrics() {
+    PinotConfiguration metricsConfiguration = _config.subset(METRICS_REGISTRY_NAME);
+    try {
+      PinotMetricUtilsFactory.init(metricsConfiguration.getProperty(LIBRARY_NAME_KEY));
+      _metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+    } catch (InvalidConfigException e) {
+      throw new RuntimeException("Caught InvalidConfigException when initializing metricsRegistry", e);
+    }
+    _controllerMetrics = new ControllerMetrics(_config.getMetricsPrefix(), _metricsRegistry);
+
+    MetricsHelper.initializeMetrics(metricsConfiguration);
+    MetricsHelper.registerMetricsRegistry(_metricsRegistry);
+    _controllerMetrics.initializeGlobalMeters();
+  }
+
   private void initPinotFSFactory() {
     LOGGER.info("Initializing PinotFSFactory");
-    
+
     PinotFSFactory.init(_config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
   }
 
@@ -504,7 +518,8 @@ public class ControllerStarter implements ServiceStartable {
   }
 
   private void initPinotCrypterFactory() {
-    PinotConfiguration pinotCrypterConfig = _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
+    PinotConfiguration pinotCrypterConfig =
+        _config.subset(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_CRYPTER);
     LOGGER.info("Initializing PinotCrypterFactory");
     try {
       PinotCrypterFactory.init(pinotCrypterConfig);
@@ -649,7 +664,7 @@ public class ControllerStarter implements ServiceStartable {
     }
   }
 
-  public MetricsRegistry getMetricsRegistry() {
+  public PinotMetricsRegistry getMetricsRegistry() {
     return _metricsRegistry;
   }
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/LeadControllerManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/LeadControllerManagerTest.java
index 5b7266c..a736721 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/LeadControllerManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/LeadControllerManagerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.helix.ConfigAccessor;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -26,6 +25,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.utils.helix.LeadControllerUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -48,7 +48,7 @@ public class LeadControllerManagerTest {
 
   @BeforeMethod
   public void setup() {
-    _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    _controllerMetrics = new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     _helixManager = mock(HelixManager.class);
     HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
     when(_helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
index 94edf97..302703f 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/TableSizeReaderTest.java
@@ -23,7 +23,6 @@ import com.google.common.collect.HashBiMap;
 import com.sun.net.httpserver.HttpExchange;
 import com.sun.net.httpserver.HttpHandler;
 import com.sun.net.httpserver.HttpServer;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -39,6 +38,7 @@ import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.restlet.resources.SegmentSizeInfo;
 import org.apache.pinot.common.restlet.resources.TableSizeInfo;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -64,7 +64,8 @@ public class TableSizeReaderTest {
   private static final Logger LOGGER = LoggerFactory.getLogger(TableSizeReaderTest.class);
   private final Executor executor = Executors.newFixedThreadPool(1);
   private final HttpConnectionManager connectionManager = new MultiThreadedHttpConnectionManager();
-  private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+  private final ControllerMetrics _controllerMetrics =
+      new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
   private PinotHelixResourceManager helix;
   private Map<String, FakeSizeServer> serverMap = new HashMap<>();
   private final String URI_PATH = "/table/";
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
index f61b064..2a6a579 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/SegmentStatusCheckerTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.controller.helix;
 
 import com.google.common.collect.Lists;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +30,8 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
@@ -49,7 +50,7 @@ public class SegmentStatusCheckerTest {
   private SegmentStatusChecker segmentStatusChecker;
   private PinotHelixResourceManager helixResourceManager;
   private LeadControllerManager leadControllerManager;
-  private MetricsRegistry metricsRegistry;
+  private PinotMetricsRegistry metricsRegistry;
   private ControllerMetrics controllerMetrics;
   private ControllerConf config;
 
@@ -91,7 +92,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -158,7 +159,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -239,7 +240,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -286,7 +287,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -318,7 +319,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -381,7 +382,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -427,7 +428,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -471,7 +472,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -510,7 +511,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
@@ -560,7 +561,7 @@ public class SegmentStatusCheckerTest {
       leadControllerManager = mock(LeadControllerManager.class);
       when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     }
-    metricsRegistry = new MetricsRegistry();
+    metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
     controllerMetrics = new ControllerMetrics(metricsRegistry);
     segmentStatusChecker =
         new SegmentStatusChecker(helixResourceManager, leadControllerManager, config, controllerMetrics);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
index dcd5469..676c9dc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/periodictask/ControllerPeriodicTaskTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.helix.core.periodictask;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -26,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -46,7 +46,8 @@ public class ControllerPeriodicTaskTest {
 
   private final PinotHelixResourceManager _resourceManager = mock(PinotHelixResourceManager.class);
   private final LeadControllerManager _leadControllerManager = mock(LeadControllerManager.class);
-  private final ControllerMetrics _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+  private final ControllerMetrics _controllerMetrics =
+      new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
   private final AtomicBoolean _startTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _stopTaskCalled = new AtomicBoolean();
   private final AtomicBoolean _processTablesCalled = new AtomicBoolean();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b7e05f4..d0f1a3e 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -19,12 +19,12 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.base.Preconditions;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.lang.reflect.Field;
 import java.util.Map;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -1366,7 +1366,7 @@ public class SegmentCompletionTest {
     public HelixManager _helixManager = mock(HelixManager.class);
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager) {
-      this(pinotHelixResourceManager, new ControllerMetrics(new MetricsRegistry()));
+      this(pinotHelixResourceManager, new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()));
     }
 
     protected MockPinotLLCRealtimeSegmentManager(PinotHelixResourceManager pinotHelixResourceManager,
@@ -1410,7 +1410,7 @@ public class SegmentCompletionTest {
 
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
         boolean isLeader) {
-      this(helixManager, segmentManager, isLeader, new ControllerMetrics(new MetricsRegistry()));
+      this(helixManager, segmentManager, isLeader, new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()));
     }
 
     protected MockSegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager,
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index f5cac8b..9c59929 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -29,6 +28,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.ControllerConf;
@@ -95,7 +95,7 @@ public class RetentionManagerTest {
     when(pinotHelixResourceManager.getOfflineSegmentMetadata(OFFLINE_TABLE_NAME)).thenReturn(metadataList);
 
     ControllerConf conf = new ControllerConf();
-    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
     RetentionManager retentionManager =
@@ -221,7 +221,7 @@ public class RetentionManagerTest {
     when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
 
     ControllerConf conf = new ControllerConf();
-    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
     RetentionManager retentionManager =
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index caeb4e4..8126854 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -29,6 +28,7 @@ import org.apache.pinot.common.lineage.LineageEntryState;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.ControllerTestUtils;
 import org.apache.pinot.controller.LeadControllerManager;
@@ -79,7 +79,7 @@ public class SegmentLineageCleanupTest {
     LeadControllerManager leadControllerManager = mock(LeadControllerManager.class);
     when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     ControllerConf conf = new ControllerConf();
-    ControllerMetrics controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
     _retentionManager = new RetentionManager(ControllerTestUtils.getHelixResourceManager(), leadControllerManager, conf, controllerMetrics);
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
index 837734a..730439b 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/StorageQuotaCheckerTest.java
@@ -18,11 +18,11 @@
  */
 package org.apache.pinot.controller.validation;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Collections;
 import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.QuotaConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -54,7 +54,7 @@ public class StorageQuotaCheckerTest {
     _tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
     _tableSizeReader = mock(TableSizeReader.class);
-    _controllerMetrics = new ControllerMetrics(new MetricsRegistry());
+    _controllerMetrics = new ControllerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     _storageQuotaChecker = new StorageQuotaChecker(_tableConfig, _tableSizeReader, _controllerMetrics, true);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 015839a..078512f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.data.manager.realtime;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.Meter;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.base.PinotMeter;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -99,8 +99,8 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   private final Logger _segmentLogger;
   private final SegmentVersion _segmentVersion;
 
-  private Meter _tableAndStreamRowsConsumed = null;
-  private Meter _tableRowsConsumed = null;
+  private PinotMeter _tableAndStreamRowsConsumed = null;
+  private PinotMeter _tableRowsConsumed = null;
 
   // An instance of this class exists only for the duration of the realtime segment that is currently being consumed.
   // Once the segment is committed, the segment is handled by OfflineSegmentDataManager
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 84d4592..83a3814 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -21,7 +21,6 @@ package org.apache.pinot.core.data.manager.realtime;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.net.URISyntaxException;
@@ -41,6 +40,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.base.PinotMeter;
 import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -428,8 +428,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
   }
 
   private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
-    Meter realtimeRowsConsumedMeter = null;
-    Meter realtimeRowsDroppedMeter = null;
+    PinotMeter realtimeRowsConsumedMeter = null;
+    PinotMeter realtimeRowsDroppedMeter = null;
 
     int indexedMessageCount = 0;
     int streamMessageCount = 0;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/DirectMemoryManager.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/DirectMemoryManager.java
index f45511a..7ec09fe 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/DirectMemoryManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/DirectMemoryManager.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.io.writer.impl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -37,7 +37,7 @@ public class DirectMemoryManager extends RealtimeIndexOffHeapMemoryManager {
 
   @VisibleForTesting
   public DirectMemoryManager(final String segmentName) {
-    this(segmentName, new ServerMetrics(new MetricsRegistry()));
+    this(segmentName, new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()));
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/MmapMemoryManager.java b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/MmapMemoryManager.java
index 3c0ecfd..d9bd029 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/MmapMemoryManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/io/writer/impl/MmapMemoryManager.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.io.writer.impl;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FilenameFilter;
@@ -27,6 +26,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.LinkedList;
 import java.util.List;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.io.readerwriter.RealtimeIndexOffHeapMemoryManager;
 import org.apache.pinot.core.segment.memory.PinotDataBuffer;
@@ -101,7 +101,7 @@ public class MmapMemoryManager extends RealtimeIndexOffHeapMemoryManager {
 
   @VisibleForTesting
   public MmapMemoryManager(String dirPathName, String segmentName) {
-    this(dirPathName, segmentName, new ServerMetrics(new MetricsRegistry()));
+    this(dirPathName, segmentName, new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()));
   }
 
   private String getFilePrefix() {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
index 7096d0e..2906a0a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -32,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.data.manager.config.TableDataManagerConfig;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
@@ -108,9 +108,8 @@ public class BaseTableDataManagerTest {
       when(config.getTableName()).thenReturn(TABLE_NAME);
       when(config.getDataDir()).thenReturn(_tmpDir.getAbsolutePath());
     }
-    tableDataManager
-        .init(config, "dummyInstance", mock(ZkHelixPropertyStore.class), new ServerMetrics(new MetricsRegistry()), mock(
-            HelixManager.class));
+    tableDataManager.init(config, "dummyInstance", mock(ZkHelixPropertyStore.class),
+        new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()), mock(HelixManager.class));
     tableDataManager.start();
     Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap");
     segsMapField.setAccessible(true);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
index 23dd01f..6a5f0dc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager.offline;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.net.URL;
 import java.util.Arrays;
@@ -28,6 +27,7 @@ import org.apache.helix.AccessOption;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
@@ -106,7 +106,7 @@ public class DimensionTableDataManagerTest {
       when(config.getTableName()).thenReturn(TABLE_NAME);
       when(config.getDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
     }
-    tableDataManager.init(config, "dummyInstance", mockPropertyStore(), new ServerMetrics(new MetricsRegistry()),
+    tableDataManager.init(config, "dummyInstance", mockPropertyStore(), new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()),
         mock(HelixManager.class));
     tableDataManager.start();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 0017c43..e44ecc2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -33,6 +32,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -130,7 +130,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
   private RealtimeTableDataManager createTableDataManager() {
     final String instanceId = "server-1";
-    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName);
+    SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()), _tableName);
     RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class);
     when(tableDataManager.getServerInstance()).thenReturn(instanceId);
     RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class);
@@ -159,7 +159,7 @@ public class LLRealtimeSegmentDataManagerTest {
     LLCSegmentName llcSegmentName = new LLCSegmentName(_segmentNameStr);
     _partitionIdToSemaphoreMap.putIfAbsent(_partitionId, new Semaphore(1));
     Schema schema = Schema.fromString(makeSchema());
-    ServerMetrics serverMetrics = new ServerMetrics(new MetricsRegistry());
+    ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     FakeLLRealtimeSegmentDataManager segmentDataManager =
         new FakeLLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, resourceDir, schema,
             llcSegmentName, _partitionIdToSemaphoreMap, serverMetrics);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
index a507f74..1724d8e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/MultiLevelPriorityQueueTest.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.query.scheduler.resources.PolicyBasedResourceManager;
 import org.apache.pinot.core.query.scheduler.resources.ResourceLimitPolicy;
@@ -40,12 +41,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Preconditions;
-import com.yammer.metrics.core.MetricsRegistry;
 
 
 public class MultiLevelPriorityQueueTest {
   SchedulerGroup group;
-  final ServerMetrics metrics = new ServerMetrics(new MetricsRegistry());
+  final ServerMetrics metrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
 
   final SchedulerGroupMapper groupMapper = new TableBasedGroupMapper();
   final TestSchedulerGroupFactory groupFactory = new TestSchedulerGroupFactory();
@@ -81,9 +81,9 @@ public class MultiLevelPriorityQueueTest {
       throws OutOfCapacityException {
     Map<String, Object> properties = new HashMap<>();
     properties.put(MultiLevelPriorityQueue.MAX_PENDING_PER_GROUP_KEY, 2);
-    
+
     PinotConfiguration configuration =new PinotConfiguration(properties);
-    
+
     ResourceManager rm = new UnboundedResourceManager(configuration);
     MultiLevelPriorityQueue queue = createQueue(configuration, rm);
     queue.put(createQueryRequest(groupOne, metrics));
@@ -129,12 +129,12 @@ public class MultiLevelPriorityQueueTest {
     properties.put(ResourceManager.QUERY_RUNNER_CONFIG_KEY, 10);
     properties.put(ResourceLimitPolicy.TABLE_THREADS_SOFT_LIMIT, 20);
     properties.put(ResourceLimitPolicy.TABLE_THREADS_HARD_LIMIT, 80);
-    
+
     PinotConfiguration configuration = new PinotConfiguration(properties);
-    
+
     PolicyBasedResourceManager rm = new PolicyBasedResourceManager(configuration);
     MultiLevelPriorityQueue queue = createQueue(configuration, rm);
-    
+
     queue.put(createQueryRequest(groupOne, metrics));
     queue.put(createQueryRequest(groupOne, metrics));
     queue.put(createQueryRequest(groupTwo, metrics));
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index d378820..7889986 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.scheduler;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.Uninterruptibles;
-import com.yammer.metrics.core.MetricsRegistry;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,6 +39,7 @@ import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nullable;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.utils.DataTable;
@@ -63,7 +63,8 @@ import static org.testng.Assert.assertTrue;
 
 
 public class PrioritySchedulerTest {
-  private static final ServerMetrics metrics = new ServerMetrics(new MetricsRegistry());
+  private static final ServerMetrics metrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
+
   private static boolean useBarrier = false;
   private static CyclicBarrier startupBarrier;
   private static CyclicBarrier validationBarrier;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSSchedulerGroupTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSSchedulerGroupTest.java
index 5374714..6e871d2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSSchedulerGroupTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSSchedulerGroupTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.core.query.scheduler.fcfs;
 
-import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.query.scheduler.SchedulerQueryContext;
 import org.testng.annotations.Test;
@@ -29,7 +29,7 @@ import static org.testng.Assert.assertNull;
 
 
 public class FCFSSchedulerGroupTest {
-  static final ServerMetrics metrics = new ServerMetrics(new MetricsRegistry());
+  static ServerMetrics metrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
 
   @Test
   public void testCompare() {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
index f168a7d..ac9161b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNullValueVectorTest.java
@@ -40,6 +40,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.segment.ReadMode;
@@ -73,8 +74,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 /**
  * Class for testing segment generation with byte[] data type.
@@ -140,7 +139,7 @@ public class SegmentGenerationWithNullValueVectorTest {
       throws ConfigurationException {
     _segmentNames.add(_segment.getSegmentName());
     // Mock the instance data manager
-    _serverMetrics = new ServerMetrics(new MetricsRegistry());
+    _serverMetrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..a93070e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -32,6 +32,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.segment.ReadMode;
@@ -58,8 +59,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.yammer.metrics.core.MetricsRegistry;
-
 
 public class QueryExecutorTest {
   private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
@@ -102,7 +101,7 @@ public class QueryExecutorTest {
     }
 
     // Mock the instance data manager
-    _serverMetrics = new ServerMetrics(new MetricsRegistry());
+    _serverMetrics = new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry());
     TableDataManagerConfig tableDataManagerConfig = mock(TableDataManagerConfig.class);
     when(tableDataManagerConfig.getTableDataManagerType()).thenReturn("OFFLINE");
     when(tableDataManagerConfig.getTableName()).thenReturn(TABLE_NAME);
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
index da96684..4377187 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.integration.tests;
 
 import com.google.common.base.Function;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.helix.HelixManager;
@@ -32,6 +31,7 @@ import org.apache.helix.participant.statemachine.StateModel;
 import org.apache.helix.participant.statemachine.StateModelFactory;
 import org.apache.helix.participant.statemachine.StateModelInfo;
 import org.apache.helix.participant.statemachine.Transition;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -45,7 +45,6 @@ import org.apache.pinot.server.realtime.ControllerLeaderLocator;
 import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -141,11 +140,11 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest
     }, 60_000L, "Failed to reach CONSUMING state");
 
     // Now report to the controller that we had to stop consumption
-    ServerSegmentCompletionProtocolHandler protocolHandler =
-        new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName);
+    ServerSegmentCompletionProtocolHandler protocolHandler = new ServerSegmentCompletionProtocolHandler(
+        new ServerMetrics(PinotMetricUtilsFactory.getPinotMetricsRegistry()), realtimeTableName);
     SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
     params.withStreamPartitionMsgOffset(new LongMsgOffset(45688L).toString()).withSegmentName(_currentSegment)
-        .withReason("RandomReason") .withInstanceId(_serverInstance);
+        .withReason("RandomReason").withInstanceId(_serverInstance);
     SegmentCompletionProtocol.Response response = protocolHandler.segmentStoppedConsuming(params);
     Assert.assertEquals(response.getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
 
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 015b8e2..2d8e45f 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.minion;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import java.io.File;
 import java.io.IOException;
 import javax.net.ssl.SSLContext;
@@ -32,6 +31,8 @@ import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metrics.MetricsHelper;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.utils.ClientSSLContextGenerator;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.common.utils.NetUtil;
@@ -53,6 +54,7 @@ import org.apache.pinot.spi.services.ServiceStartable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory.LIBRARY_NAME_KEY;
 import static org.apache.pinot.common.utils.CommonConstants.HTTPS_PROTOCOL;
 
 
@@ -151,8 +153,14 @@ public class MinionStarter implements ServiceStartable {
 
     // Initialize metrics
     LOGGER.info("Initializing metrics");
+    // TODO: put all the metrics related configs down to "pinot.server.metrics"
+    PinotConfiguration metricsConfiguration = _config;
+    PinotMetricUtilsFactory.init(metricsConfiguration.getProperty(LIBRARY_NAME_KEY));
+    PinotMetricsRegistry metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+
+
     MetricsHelper.initializeMetrics(_config);
-    MetricsRegistry metricsRegistry = new MetricsRegistry();
+
     MetricsHelper.registerMetricsRegistry(metricsRegistry);
     MinionMetrics minionMetrics = new MinionMetrics(_config
         .getProperty(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY,
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/metrics/MinionMetrics.java b/pinot-minion/src/main/java/org/apache/pinot/minion/metrics/MinionMetrics.java
index 69f4fe5..dfa3e74 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/metrics/MinionMetrics.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/metrics/MinionMetrics.java
@@ -18,18 +18,18 @@
  */
 package org.apache.pinot.minion.metrics;
 
-import com.yammer.metrics.core.MetricsRegistry;
 import org.apache.pinot.common.metrics.AbstractMetrics;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
 import org.apache.pinot.common.utils.CommonConstants;
 
 
 public class MinionMetrics extends AbstractMetrics<MinionQueryPhase, MinionMeter, MinionGauge, MinionTimer> {
 
-  public MinionMetrics(MetricsRegistry metricsRegistry) {
+  public MinionMetrics(PinotMetricsRegistry metricsRegistry) {
     this(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX, metricsRegistry);
   }
 
-  public MinionMetrics(String prefix, MetricsRegistry metricsRegistry) {
+  public MinionMetrics(String prefix, PinotMetricsRegistry metricsRegistry) {
     super(prefix, metricsRegistry, MinionMetrics.class);
   }
 
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index af5e40b..a2ca962 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -19,13 +19,14 @@
 package org.apache.pinot.server.starter;
 
 import com.google.common.base.Preconditions;
-import com.yammer.metrics.core.MetricsRegistry;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metrics.MetricsHelper;
+import org.apache.pinot.common.metrics.base.PinotMetricsRegistry;
+import org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
@@ -43,6 +44,8 @@ import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.pinot.common.metrics.base.PinotMetricUtilsFactory.LIBRARY_NAME_KEY;
+
 
 /**
  * A standalone server which will listen on a port and serve queries based on the given configuration. Cluster
@@ -67,8 +70,10 @@ public class ServerInstance {
     LOGGER.info("Initializing server instance");
 
     LOGGER.info("Initializing server metrics");
-    MetricsHelper.initializeMetrics(serverConf.getMetricsConfig());
-    MetricsRegistry metricsRegistry = new MetricsRegistry();
+    PinotConfiguration metricsConfiguration = serverConf.getMetricsConfig();
+    PinotMetricUtilsFactory.init(metricsConfiguration.getProperty(LIBRARY_NAME_KEY));
+    PinotMetricsRegistry metricsRegistry = PinotMetricUtilsFactory.getPinotMetricsRegistry();
+    MetricsHelper.initializeMetrics(metricsConfiguration);
     MetricsHelper.registerMetricsRegistry(metricsRegistry);
     _serverMetrics =
         new ServerMetrics(serverConf.getMetricsPrefix(), metricsRegistry, serverConf.emitTableLevelMetrics(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org