You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/03/08 17:59:28 UTC

[incubator-pinot] branch master updated: Make Pinot metrics pluggable (#6640)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dbdd67  Make Pinot metrics pluggable (#6640)
1dbdd67 is described below

commit 1dbdd6754bd6e03d68926bbd9e9abaa3d37dd0b6
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Mon Mar 8 09:59:11 2021 -0800

    Make Pinot metrics pluggable (#6640)
    
    * Make Pinot metrics pluggable
    
    * Mark pinot-yammer as test scope
    
    Co-authored-by: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
---
 pinot-broker/pom.xml                               |   5 +
 .../broker/broker/helix/HelixBrokerStarter.java    |   3 -
 pinot-common/pom.xml                               |   9 +-
 .../pinot/common/metrics/AbstractMetrics.java      |  38 ++--
 ...eporterMetricsRegistryRegistrationListener.java |   2 +-
 .../apache/pinot/common/metrics/MetricsHelper.java | 253 ---------------------
 .../MetricsRegistryRegistrationListener.java       |   2 +-
 .../pinot/common/metrics/PinotMetricUtils.java     | 205 +++++++++++++----
 .../pinot/common/metrics/ValidationMetrics.java    |  10 +-
 .../pinot/common/metrics/MetricsHelperTest.java    | 103 ---------
 .../pinot/common/metrics/PinotMetricUtilsTest.java |  84 +++++--
 pinot-connectors/pinot-spark-connector/pom.xml     |   5 +
 pinot-controller/pom.xml                           |   5 +
 .../apache/pinot/controller/ControllerStarter.java |  13 +-
 .../minion/generator/TaskGeneratorRegistry.java    |  13 +-
 pinot-core/pom.xml                                 |   6 +
 pinot-distribution/pinot-assembly.xml              |   6 +
 pinot-integration-tests/pom.xml                    |   5 +
 pinot-minion/pom.xml                               |   5 +
 .../org/apache/pinot/minion/MinionStarter.java     |   4 -
 .../minion/event/EventObserverFactoryRegistry.java |  12 +-
 .../executor/TaskExecutorFactoryRegistry.java      |  15 +-
 .../pinot-yammer}/pom.xml                          |  18 +-
 .../plugin}/metrics/yammer/YammerCounter.java      |   2 +-
 .../pinot/plugin}/metrics/yammer/YammerGauge.java  |   2 +-
 .../plugin}/metrics/yammer/YammerJmxReporter.java  |   2 +-
 .../pinot/plugin}/metrics/yammer/YammerMeter.java  |   2 +-
 .../plugin}/metrics/yammer/YammerMetered.java      |   2 +-
 .../pinot/plugin}/metrics/yammer/YammerMetric.java |   2 +-
 .../plugin}/metrics/yammer/YammerMetricName.java   |   2 +-
 .../metrics/yammer/YammerMetricsFactory.java       |  67 ++++++
 .../metrics/yammer/YammerMetricsRegistry.java      |   2 +-
 .../yammer/YammerMetricsRegistryListener.java      |   2 +-
 .../pinot/plugin}/metrics/yammer/YammerTimer.java  |   2 +-
 .../pom.xml                                        |  44 ++--
 .../pinot-minion-builtin-tasks/pom.xml             |   8 +
 pinot-plugins/pom.xml                              |   1 +
 pinot-server/pom.xml                               |   5 +
 .../pinot/server/starter/ServerInstance.java       |   3 -
 .../spi/annotations/metrics/MetricsFactory.java    |  19 +-
 .../annotations/metrics/PinotMetricsFactory.java   |  62 +++++
 .../pinot/spi/utils/PinotReflectionUtils.java      |  40 ++++
 pom.xml                                            |  10 +-
 43 files changed, 545 insertions(+), 555 deletions(-)

diff --git a/pinot-broker/pom.xml b/pinot-broker/pom.xml
index c493a19..f411d65 100644
--- a/pinot-broker/pom.xml
+++ b/pinot-broker/pom.xml
@@ -154,6 +154,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
index d79b940..8cd41c3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java
@@ -51,7 +51,6 @@ import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
-import org.apache.pinot.common.metrics.MetricsHelper;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -229,8 +228,6 @@ public class HelixBrokerStarter implements ServiceStartable {
     PinotConfiguration metricsConfiguration = _brokerConf.subset(Broker.METRICS_CONFIG_PREFIX);
     PinotMetricUtils.init(metricsConfiguration);
     _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    MetricsHelper.initializeMetrics(metricsConfiguration);
-    MetricsHelper.registerMetricsRegistry(_metricsRegistry);
     _brokerMetrics = new BrokerMetrics(
         _brokerConf.getProperty(Broker.CONFIG_OF_METRICS_NAME_PREFIX, Broker.DEFAULT_METRICS_NAME_PREFIX),
         _metricsRegistry,
diff --git a/pinot-common/pom.xml b/pinot-common/pom.xml
index 5e44895..a8b8a3d 100644
--- a/pinot-common/pom.xml
+++ b/pinot-common/pom.xml
@@ -84,6 +84,11 @@
       <artifactId>pinot-spi</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpmime</artifactId>
     </dependency>
@@ -119,10 +124,6 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>com.yammer.metrics</groupId>
-      <artifactId>metrics-core</artifactId>
-    </dependency>
-    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index f28a4db..f8f0967 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -149,9 +149,8 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param timeUnit The log time duration time unit
    */
   private void addValueToTimer(String fullTimerName, final long duration, final TimeUnit timeUnit) {
-    final PinotMetricName metricName = PinotMetricUtils.generatePinotMetricName(_clazz, fullTimerName);
-    MetricsHelper.newTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS)
-        .update(duration, timeUnit);
+    final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullTimerName);
+    PinotMetricUtils.makePinotTimer(_metricsRegistry, metricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS);
   }
 
   /**
@@ -179,10 +178,10 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
       final String fullMeterName;
       String meterName = meter.getMeterName();
       fullMeterName = _metricPrefix + meterName;
-      final PinotMetricName metricName = PinotMetricUtils.generatePinotMetricName(_clazz, fullMeterName);
+      final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName);
 
       final PinotMeter newMeter =
-          MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
+          PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
       newMeter.mark(unitCount);
       return newMeter;
     }
@@ -215,10 +214,10 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
       final String fullMeterName;
       String meterName = meter.getMeterName();
       fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
-      final PinotMetricName metricName = PinotMetricUtils.generatePinotMetricName(_clazz, fullMeterName);
+      final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName);
 
       final PinotMeter newMeter =
-          MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
+          PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
       newMeter.mark(unitCount);
       return newMeter;
     }
@@ -228,9 +227,9 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
     final String fullMeterName;
     String meterName = meter.getMeterName();
     fullMeterName = _metricPrefix + getTableName(tableName) + "." + meterName;
-    final PinotMetricName metricName = PinotMetricUtils.generatePinotMetricName(_clazz, fullMeterName);
+    final PinotMetricName metricName = PinotMetricUtils.makePinotMetricName(_clazz, fullMeterName);
 
-    return MetricsHelper.newMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
+    return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
   }
 
   /**
@@ -459,17 +458,16 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e
    * @param valueCallback The callback function used to retrieve the value of the gauge
    */
   public void addCallbackGauge(final String metricName, final Callable<Long> valueCallback) {
-    MetricsHelper
-        .newGauge(_metricsRegistry, PinotMetricUtils.generatePinotMetricName(_clazz, _metricPrefix + metricName),
-            PinotMetricUtils.generatePinotGauge(avoid -> {
-              try {
-                return valueCallback.call();
-              } catch (Exception e) {
-                LOGGER.error("Caught exception", e);
-                Utils.rethrowException(e);
-                throw new AssertionError("Should not reach this");
-              }
-            }));
+    PinotMetricUtils.makeGauge(_metricsRegistry, PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + metricName),
+        PinotMetricUtils.makePinotGauge(avoid -> {
+          try {
+            return valueCallback.call();
+          } catch (Exception e) {
+            LOGGER.error("Caught exception", e);
+            Utils.rethrowException(e);
+            throw new AssertionError("Should not reach this");
+          }
+        }));
   }
 
   protected abstract QP[] getQueryPhases();
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
index 7c352b6..e1c27de 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/JmxReporterMetricsRegistryRegistrationListener.java
@@ -33,7 +33,7 @@ public class JmxReporterMetricsRegistryRegistrationListener implements MetricsRe
   @Override
   public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
     LOGGER.info("Registering JmxReporterMetricsRegistryRegistrationListener");
-    PinotMetricUtils.generatePinotJmxReporter(metricsRegistry).start();
+    PinotMetricUtils.makePinotJmxReporter(metricsRegistry).start();
     LOGGER.info("Number of metrics in metricsRegistry: {}", metricsRegistry.allMetrics().size());
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java
deleted file mode 100644
index 4ce2308..0000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsHelper.java
+++ /dev/null
@@ -1,253 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import java.lang.reflect.Constructor;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.metrics.PinotCounter;
-import org.apache.pinot.spi.metrics.PinotGauge;
-import org.apache.pinot.spi.metrics.PinotHistogram;
-import org.apache.pinot.spi.metrics.PinotMeter;
-import org.apache.pinot.spi.metrics.PinotMetricName;
-import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
-import org.apache.pinot.spi.metrics.PinotTimer;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class MetricsHelper {
-  private static final Logger LOGGER = LoggerFactory.getLogger(MetricsHelper.class);
-
-  private static final Map<PinotMetricsRegistry, Boolean> metricsRegistryMap = new ConcurrentHashMap<>();
-
-  private static final Map<MetricsRegistryRegistrationListener, Boolean> metricsRegistryRegistrationListenersMap =
-      new ConcurrentHashMap<>();
-
-  /**
-   * Initializes the metrics system by initializing the registry registration listeners present in the configuration.
-   *
-   * @param configuration The subset of the configuration containing the metrics-related keys
-   */
-  public static void initializeMetrics(PinotConfiguration configuration) {
-    synchronized (MetricsHelper.class) {
-      List<String> listenerClassNames = configuration.getProperty("metricsRegistryRegistrationListeners",
-          Arrays.asList(JmxReporterMetricsRegistryRegistrationListener.class.getName()));
-
-      // Build each listener using their default constructor and add them
-      for (String listenerClassName : listenerClassNames) {
-        try {
-          Class<? extends MetricsRegistryRegistrationListener> clazz =
-              (Class<? extends MetricsRegistryRegistrationListener>) Class.forName(listenerClassName);
-          Constructor<? extends MetricsRegistryRegistrationListener> defaultConstructor =
-              clazz.getDeclaredConstructor();
-          MetricsRegistryRegistrationListener listener = defaultConstructor.newInstance();
-
-          LOGGER.info("Registering metricsRegistry to listener {}", listenerClassName);
-          addMetricsRegistryRegistrationListener(listener);
-        } catch (Exception e) {
-          LOGGER
-              .warn("Caught exception while initializing MetricsRegistryRegistrationListener " + listenerClassName, e);
-        }
-      }
-    }
-    LOGGER.info("Number of listeners got registered: {}", metricsRegistryRegistrationListenersMap.size());
-  }
-
-  /**
-   * Adds a metrics registry registration listener. When adding a metrics registry registration listener, events are
-   * fired to add all previously registered metrics registries to the newly added metrics registry registration
-   * listener.
-   *
-   * @param listener The listener to add
-   */
-  public static void addMetricsRegistryRegistrationListener(MetricsRegistryRegistrationListener listener) {
-    synchronized (MetricsHelper.class) {
-      metricsRegistryRegistrationListenersMap.put(listener, Boolean.TRUE);
-
-      // Fire events to register all previously registered metrics registries
-      Set<PinotMetricsRegistry> metricsRegistries = metricsRegistryMap.keySet();
-      LOGGER.info("Number of metrics registry: {}", metricsRegistries.size());
-      for (PinotMetricsRegistry metricsRegistry : metricsRegistries) {
-        listener.onMetricsRegistryRegistered(metricsRegistry);
-      }
-    }
-  }
-
-  /**
-   * Registers the metrics registry with the metrics helper.
-   *
-   * @param registry The registry to register
-   */
-  public static void registerMetricsRegistry(PinotMetricsRegistry registry) {
-    synchronized (MetricsHelper.class) {
-      metricsRegistryMap.put(registry, Boolean.TRUE);
-
-      // Fire event to all registered listeners
-      Set<MetricsRegistryRegistrationListener> metricsRegistryRegistrationListeners =
-          metricsRegistryRegistrationListenersMap.keySet();
-      for (MetricsRegistryRegistrationListener metricsRegistryRegistrationListener : metricsRegistryRegistrationListeners) {
-        metricsRegistryRegistrationListener.onMetricsRegistryRegistered(registry);
-      }
-    }
-  }
-
-  /**
-   *
-   * Return an existing meter if
-   *  (a) A meter already exist with the same metric name.
-   * Otherwise, creates a new meter and registers
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @param eventType Event Type
-   * @param unit TimeUnit for rate determination
-   * @return Meter
-   */
-  public static PinotMeter newMeter(PinotMetricsRegistry registry, PinotMetricName name, String eventType,
-      TimeUnit unit) {
-    return registry.newMeter(name, eventType, unit);
-  }
-
-  /**
-   *
-   * Return an existing counter if
-   *  (a) A counter already exist with the same metric name.
-   * Otherwise, creates a new meter and registers
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @return Counter
-   */
-  public static PinotCounter newCounter(PinotMetricsRegistry registry, PinotMetricName name) {
-    return registry.newCounter(name);
-  }
-
-  /**
-   *
-   * Return an existing histogram if
-   *  (a) A histogram already exist with the same metric name.
-   * Otherwise, creates a new meter and registers
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @param biased (true if uniform distribution, otherwise exponential weighted)
-   * @return histogram
-   */
-  public static PinotHistogram newHistogram(PinotMetricsRegistry registry, PinotMetricName name, boolean biased) {
-    return registry.newHistogram(name, biased);
-  }
-
-  /**
-   *
-   * Return an existing gauge if
-   *  (a) A gauge already exist with the same metric name.
-   * Otherwise, creates a new meter and registers
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @param gauge Underlying gauge to be tracked
-   * @return gauge
-   */
-  public static <T> PinotGauge<T> newGauge(PinotMetricsRegistry registry, PinotMetricName name, PinotGauge<T> gauge) {
-    return registry.newGauge(name, gauge);
-  }
-
-  /**
-   * Removes an existing metric
-   */
-  public static void removeMetric(PinotMetricsRegistry registry, PinotMetricName name) {
-    registry.removeMetric(name);
-  }
-
-  /**
-   *
-   * Return an existing timer if
-   *  (a) A timer already exist with the same metric name.
-   * Otherwise, creates a new timer and registers
-   *
-   * @param registry MetricsRegistry
-   * @param name metric name
-   * @param durationUnit TimeUnit for duration
-   * @param rateUnit TimeUnit for rate determination
-   * @return Timer
-   */
-  public static PinotTimer newTimer(PinotMetricsRegistry registry, PinotMetricName name, TimeUnit durationUnit,
-      TimeUnit rateUnit) {
-    return registry.newTimer(name, durationUnit, rateUnit);
-  }
-
-  /**
-   * Useful for measuring elapsed times.
-   *
-   * Usage :
-   * <pre>
-   * {@code
-   *   TimerContext tc = MtericsHelper.startTimer();
-   *   ....
-   *   Your code to be measured
-   *   ....
-   *   tc.stop();
-   *   long elapsedTimeMs = tc.getLatencyMs();
-   *
-   * }
-   * </pre>
-   * @return
-   */
-  public static TimerContext startTimer() {
-    return new TimerContext();
-  }
-
-  /**
-   *
-   * TimerContext to measure elapsed time
-   *
-   */
-  public static class TimerContext {
-    private final long _startTimeNanos;
-    private long _stopTimeNanos;
-    private boolean _isDone;
-
-    public TimerContext() {
-      _startTimeNanos = System.nanoTime();
-      _isDone = false;
-    }
-
-    public void stop() {
-      _isDone = true;
-      _stopTimeNanos = System.nanoTime();
-    }
-
-    /**
-     *
-     * @return
-     */
-    public long getLatencyMs() {
-      if (!_isDone) {
-        stop();
-      }
-      return (_stopTimeNanos - _startTimeNanos) / 1000000L;
-    }
-  }
-}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
index 0f82991..740c942 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
@@ -22,7 +22,7 @@ import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 
 
 /**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
+ * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the PinotMetricUtils.
  *
  */
 public interface MetricsRegistryRegistrationListener {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/PinotMetricUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/PinotMetricUtils.java
index 6ada305..6ee0b27 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/PinotMetricUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/PinotMetricUtils.java
@@ -18,82 +18,189 @@
  */
 package org.apache.pinot.common.metrics;
 
+import com.google.common.base.Preconditions;
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
-import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.spi.annotations.metrics.MetricsFactory;
+import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.metrics.PinotGauge;
 import org.apache.pinot.spi.metrics.PinotJmxReporter;
+import org.apache.pinot.spi.metrics.PinotMeter;
 import org.apache.pinot.spi.metrics.PinotMetricName;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
-import org.apache.pinot.common.metrics.yammer.YammerGauge;
-import org.apache.pinot.common.metrics.yammer.YammerJmxReporter;
-import org.apache.pinot.common.metrics.yammer.YammerMetricName;
-import org.apache.pinot.common.metrics.yammer.YammerMetricsRegistry;
+import org.apache.pinot.spi.metrics.PinotTimer;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class PinotMetricUtils {
   private static final Logger LOGGER = LoggerFactory.getLogger(PinotMetricUtils.class);
-  public static final String LIBRARY_NAME_KEY = "libraryName";
-  public static final String YAMMER_KEY = "yammer";
+  private static final String METRICS_PACKAGE_REGEX_PATTERN = ".*\\.plugin\\.metrics\\..*";
 
-  private static String LIBRARY_TO_USE = YAMMER_KEY;
+  private static PinotMetricsFactory _pinotMetricsFactory = null;
 
-  public static void init(PinotConfiguration metricsConfiguration)
-      throws InvalidConfigException {
-    String libraryName = metricsConfiguration.getProperty(PinotMetricUtils.LIBRARY_NAME_KEY);
-    if (libraryName == null) {
-      return;
+  private static final Map<PinotMetricsRegistry, Boolean> metricsRegistryMap = new ConcurrentHashMap<>();
+  private static final Map<MetricsRegistryRegistrationListener, Boolean> metricsRegistryRegistrationListenersMap =
+      new ConcurrentHashMap<>();
+
+  public static void init(PinotConfiguration metricsConfiguration) {
+    // Initializes PinotMetricsFactory.
+    initializePinotMetricsFactory(metricsConfiguration);
+
+    // Initializes metrics using the metrics configuration.
+    initializeMetrics(metricsConfiguration);
+    registerMetricsRegistry(getPinotMetricsRegistry());
+  }
+
+  /**
+   * Initializes PinotMetricsFactory with metrics configurations.
+   * @param metricsConfiguration The subset of the configuration containing the metrics-related keys
+   */
+  private static void initializePinotMetricsFactory(PinotConfiguration metricsConfiguration) {
+    Set<Class<?>> classes = getPinotMetricsFactoryClasses();
+    if (classes.size() > 1) {
+      LOGGER.warn("More than one PinotMetricsFactory is initialized: {}", classes);
     }
-    switch (libraryName) {
-      case YAMMER_KEY:
-        LIBRARY_TO_USE = YAMMER_KEY;
-        break;
-      // TODO: support more libraries.
-      default:
-        throw new InvalidConfigException("PinotMetricsRegistry for " + libraryName + " cannot be initialized.");
+    for (Class<?> clazz : classes) {
+      MetricsFactory annotation = clazz.getAnnotation(MetricsFactory.class);
+      if (annotation.enabled()) {
+        try {
+          PinotMetricsFactory pinotMetricsFactory = (PinotMetricsFactory) clazz.newInstance();
+          pinotMetricsFactory.init(metricsConfiguration);
+          registerMetricsFactory(pinotMetricsFactory);
+        } catch (Exception e) {
+          LOGGER.error("Caught exception while initializing pinot metrics registry: {}, skipping it", clazz, e);
+        }
+      }
     }
-    LOGGER.info("Setting metric library to: " + LIBRARY_TO_USE);
+    Preconditions.checkState(_pinotMetricsFactory != null,
+        "Failed to initialize PinotMetricsFactory. Please check if any pinot-metrics related jar is actually added to the classpath.");
   }
 
-  public static PinotMetricsRegistry getPinotMetricsRegistry() {
-    switch (LIBRARY_TO_USE) {
-      case YAMMER_KEY:
-        return new YammerMetricsRegistry();
-      //TODO: support more libraries.
-      default:
-        return new YammerMetricsRegistry();
+  private static Set<Class<?>> getPinotMetricsFactoryClasses() {
+    return PinotReflectionUtils.getClassesThroughReflection(METRICS_PACKAGE_REGEX_PATTERN, MetricsFactory.class);
+  }
+
+  /**
+   * Initializes the metrics system by initializing the registry registration listeners present in the configuration.
+   *
+   * @param configuration The subset of the configuration containing the metrics-related keys
+   */
+  private static void initializeMetrics(PinotConfiguration configuration) {
+    synchronized (PinotMetricUtils.class) {
+      List<String> listenerClassNames = configuration.getProperty("metricsRegistryRegistrationListeners",
+          Arrays.asList(JmxReporterMetricsRegistryRegistrationListener.class.getName()));
+
+      // Build each listener using their default constructor and add them
+      for (String listenerClassName : listenerClassNames) {
+        try {
+          Class<? extends MetricsRegistryRegistrationListener> clazz =
+              (Class<? extends MetricsRegistryRegistrationListener>) Class.forName(listenerClassName);
+          Constructor<? extends MetricsRegistryRegistrationListener> defaultConstructor =
+              clazz.getDeclaredConstructor();
+          MetricsRegistryRegistrationListener listener = defaultConstructor.newInstance();
+
+          LOGGER.info("Registering metricsRegistry to listener {}", listenerClassName);
+          addMetricsRegistryRegistrationListener(listener);
+        } catch (Exception e) {
+          LOGGER
+              .warn("Caught exception while initializing MetricsRegistryRegistrationListener " + listenerClassName, e);
+        }
+      }
     }
+    LOGGER.info("Number of listeners got registered: {}", metricsRegistryRegistrationListenersMap.size());
   }
 
-  public static PinotMetricName generatePinotMetricName(Class<?> klass, String name) {
-    switch (LIBRARY_TO_USE) {
-      case YAMMER_KEY:
-        return new YammerMetricName(klass, name);
-      //TODO: support more libraries.
-      default:
-        return new YammerMetricName(klass, name);
+  /**
+   * Adds a metrics registry registration listener. When adding a metrics registry registration listener, events are
+   * fired to add all previously registered metrics registries to the newly added metrics registry registration
+   * listener.
+   *
+   * @param listener The listener to add
+   */
+  private static void addMetricsRegistryRegistrationListener(MetricsRegistryRegistrationListener listener) {
+    synchronized (PinotMetricUtils.class) {
+      metricsRegistryRegistrationListenersMap.put(listener, Boolean.TRUE);
+
+      // Fire events to register all previously registered metrics registries
+      Set<PinotMetricsRegistry> metricsRegistries = metricsRegistryMap.keySet();
+      LOGGER.info("Number of metrics registry: {}", metricsRegistries.size());
+      for (PinotMetricsRegistry metricsRegistry : metricsRegistries) {
+        listener.onMetricsRegistryRegistered(metricsRegistry);
+      }
     }
   }
 
-  public static <T> PinotGauge<T> generatePinotGauge(Function<Void, T> condition) {
-    switch (LIBRARY_TO_USE) {
-      case YAMMER_KEY:
-        return new YammerGauge<T>(condition);
-      //TODO: support more libraries.
-      default:
-        return new YammerGauge<T>(condition);
+  /**
+   * Registers the metrics registry with the metrics helper.
+   *
+   * @param registry The registry to register
+   */
+  private static void registerMetricsRegistry(PinotMetricsRegistry registry) {
+    synchronized (PinotMetricUtils.class) {
+      metricsRegistryMap.put(registry, Boolean.TRUE);
+
+      // Fire event to all registered listeners
+      Set<MetricsRegistryRegistrationListener> metricsRegistryRegistrationListeners =
+          metricsRegistryRegistrationListenersMap.keySet();
+      for (MetricsRegistryRegistrationListener metricsRegistryRegistrationListener : metricsRegistryRegistrationListeners) {
+        metricsRegistryRegistrationListener.onMetricsRegistryRegistered(registry);
+      }
     }
   }
 
-  public static PinotJmxReporter generatePinotJmxReporter(PinotMetricsRegistry metricsRegistry) {
-    switch (LIBRARY_TO_USE) {
-      case YAMMER_KEY:
-        return new YammerJmxReporter(metricsRegistry);
-      //TODO: support more libraries.
-      default:
-        return new YammerJmxReporter(metricsRegistry);
+  /**
+   * Registers an metrics factory.
+   */
+  private static void registerMetricsFactory(PinotMetricsFactory metricsFactory) {
+    LOGGER.info("Registering metrics factory: {}", metricsFactory.getMetricsFactoryName());
+    _pinotMetricsFactory = metricsFactory;
+  }
+
+  public static PinotMetricsRegistry getPinotMetricsRegistry() {
+    if (_pinotMetricsFactory == null) {
+      // If init method didn't get called previously, just simply init with an empty hashmap. This is commonly used in tests.
+      init(new PinotConfiguration(Collections.emptyMap()));
     }
+    return _pinotMetricsFactory.getPinotMetricsRegistry();
+  }
+
+  public static PinotMetricName makePinotMetricName(Class<?> klass, String name) {
+    return _pinotMetricsFactory.makePinotMetricName(klass, name);
+  }
+
+  public static <T> PinotGauge<T> makePinotGauge(Function<Void, T> condition) {
+    return _pinotMetricsFactory.makePinotGauge(condition);
+  }
+
+  public static <T> PinotGauge<T> makeGauge(PinotMetricsRegistry registry, PinotMetricName name, PinotGauge<T> gauge) {
+    return registry.newGauge(name, gauge);
+  }
+
+  public static PinotTimer makePinotTimer(PinotMetricsRegistry registry, PinotMetricName name, TimeUnit durationUnit,
+      TimeUnit rateUnit) {
+    return registry.newTimer(name, durationUnit, rateUnit);
+  }
+
+  public static PinotMeter makePinotMeter(PinotMetricsRegistry registry, PinotMetricName name, String eventType,
+      TimeUnit unit) {
+    return registry.newMeter(name, eventType, unit);
+  }
+
+  public static void removeMetric(PinotMetricsRegistry registry, PinotMetricName name) {
+    registry.removeMetric(name);
+  }
+
+  public static PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry metricsRegistry) {
+    return _pinotMetricsFactory.makePinotJmxReporter(metricsRegistry);
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
index 7117f7e..440791d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ValidationMetrics.java
@@ -54,7 +54,7 @@ public class ValidationMetrics {
 
     @Override
     public Object getGauge() {
-      return PinotMetricUtils.generatePinotGauge(avoid -> value()).getGauge();
+      return PinotMetricUtils.makePinotGauge(avoid -> value()).getGauge();
     }
 
     @Override
@@ -94,7 +94,7 @@ public class ValidationMetrics {
 
     @Override
     public Object getGauge() {
-      return PinotMetricUtils.generatePinotGauge(avoid -> value()).getGauge();
+      return PinotMetricUtils.makePinotGauge(avoid -> value()).getGauge();
     }
   }
 
@@ -205,14 +205,14 @@ public class ValidationMetrics {
   }
 
   private PinotMetricName makeMetricName(final String gaugeName) {
-    return PinotMetricUtils.generatePinotMetricName(ValidationMetrics.class, gaugeName);
+    return PinotMetricUtils.makePinotMetricName(ValidationMetrics.class, gaugeName);
   }
 
   private void makeGauge(final String gaugeName, final PinotMetricName metricName, final GaugeFactory<?> gaugeFactory,
       final long value) {
     if (!_gaugeValues.containsKey(gaugeName)) {
       _gaugeValues.put(gaugeName, value);
-      MetricsHelper.newGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(gaugeName));
+      PinotMetricUtils.makeGauge(_metricsRegistry, metricName, gaugeFactory.buildGauge(gaugeName));
       _metricNames.add(metricName);
     } else {
       _gaugeValues.put(gaugeName, value);
@@ -224,7 +224,7 @@ public class ValidationMetrics {
    */
   public void unregisterAllMetrics() {
     for (PinotMetricName metricName : _metricNames) {
-      MetricsHelper.removeMetric(_metricsRegistry, metricName);
+      PinotMetricUtils.removeMetric(_metricsRegistry, metricName);
     }
 
     _metricNames.clear();
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java
deleted file mode 100644
index ea70bd6..0000000
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/MetricsHelperTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.metrics;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.exception.InvalidConfigException;
-import org.apache.pinot.spi.metrics.PinotMeter;
-import org.apache.pinot.spi.metrics.PinotMetricName;
-import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
-import org.apache.pinot.spi.env.PinotConfiguration;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests for the MetricsHelper class.
- *
- */
-public class MetricsHelperTest {
-  public static boolean listenerOneOkay;
-  public static boolean listenerTwoOkay;
-
-  public static class ListenerOne implements MetricsRegistryRegistrationListener {
-    @Override
-    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
-      listenerOneOkay = true;
-    }
-  }
-
-  public static class ListenerTwo implements MetricsRegistryRegistrationListener {
-    @Override
-    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
-      listenerTwoOkay = true;
-    }
-  }
-
-  @Test
-  public void testMetricsHelperRegistration()
-      throws InvalidConfigException {
-    listenerOneOkay = false;
-    listenerTwoOkay = false;
-
-    Map<String, Object> properties = new HashMap<>();
-    properties.put("pinot.broker.metrics.metricsRegistryRegistrationListeners",
-        ListenerOne.class.getName() + "," + ListenerTwo.class.getName());
-
-    PinotConfiguration configuration = new PinotConfiguration(properties);
-    PinotMetricUtils.init(configuration);
-    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
-
-    // Initialize the MetricsHelper and create a new timer
-    MetricsHelper.initializeMetrics(configuration.subset("pinot.broker.metrics"));
-    MetricsHelper.registerMetricsRegistry(registry);
-    MetricsHelper.newTimer(registry, PinotMetricUtils.generatePinotMetricName(MetricsHelperTest.class, "dummy"),
-        TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS);
-
-    // Check that the two listeners fired
-    Assert.assertTrue(listenerOneOkay);
-    Assert.assertTrue(listenerTwoOkay);
-  }
-
-  @Test
-  public void testMetricValue() {
-    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
-    PinotMeter pinotMeter = MetricsHelper
-        .newMeter(registry, PinotMetricUtils.generatePinotMetricName(MetricsHelperTest.class, "testMeter"), "testMeter",
-            TimeUnit.MILLISECONDS);
-    pinotMeter.mark();
-    Assert.assertEquals(pinotMeter.count(), 1L);
-
-    pinotMeter.mark(2L);
-    Assert.assertEquals(pinotMeter.count(), 3L);
-  }
-
-  @Test
-  public void testPinotMetricName() {
-    PinotMetricName testMetricName1 =
-        PinotMetricUtils.generatePinotMetricName(MetricsHelperTest.class, "testMetricName");
-    PinotMetricName testMetricName2 =
-        PinotMetricUtils.generatePinotMetricName(MetricsHelperTest.class, "testMetricName");
-    Assert.assertNotNull(testMetricName1);
-    Assert.assertNotNull(testMetricName2);
-    Assert.assertEquals(testMetricName1, testMetricName2);
-  }
-}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java
index 113776a..2f18d79 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/PinotMetricUtilsTest.java
@@ -20,8 +20,10 @@ package org.apache.pinot.common.metrics;
 
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.pinot.common.exception.InvalidConfigException;
+import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.metrics.PinotMetricName;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -31,31 +33,77 @@ public class PinotMetricUtilsTest {
 
   @Test
   public void testPinotMetricsRegistryFactory() {
-    PinotMetricsRegistry pinotMetricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    Assert.assertNotNull(pinotMetricsRegistry);
-    Assert.assertEquals(pinotMetricsRegistry.getClass().getSimpleName(), "YammerMetricsRegistry");
-
     try {
       Map<String, Object> properties = new HashMap<>();
-      properties.put(PinotMetricUtils.LIBRARY_NAME_KEY, "badLibraryName");
       PinotConfiguration configuration = new PinotConfiguration(properties);
       PinotMetricUtils.init(configuration);
+    } catch (Exception e) {
       Assert.fail("Fail to initialize PinotMetricsRegistry of yammer");
-    } catch (InvalidConfigException e) {
-      // Expected.
     }
+    PinotMetricsRegistry pinotMetricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
+    Assert.assertNotNull(pinotMetricsRegistry);
+    Assert.assertEquals(pinotMetricsRegistry.getClass().getSimpleName(), "YammerMetricsRegistry");
+  }
 
-    try {
-      Map<String, Object> properties = new HashMap<>();
-      properties.put(PinotMetricUtils.LIBRARY_NAME_KEY, "yammer");
-      PinotConfiguration configuration = new PinotConfiguration(properties);
-      PinotMetricUtils.init(configuration);
-    } catch (InvalidConfigException e) {
-      Assert.fail("Fail to initialize PinotMetricsRegistry of yammer");
+  public static boolean listenerOneOkay;
+  public static boolean listenerTwoOkay;
+
+  public static class ListenerOne implements MetricsRegistryRegistrationListener {
+    @Override
+    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
+      listenerOneOkay = true;
     }
+  }
 
-    pinotMetricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    Assert.assertNotNull(pinotMetricsRegistry);
-    Assert.assertEquals(pinotMetricsRegistry.getClass().getSimpleName(), "YammerMetricsRegistry");
+  public static class ListenerTwo implements MetricsRegistryRegistrationListener {
+    @Override
+    public void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry) {
+      listenerTwoOkay = true;
+    }
+  }
+
+  @Test
+  public void testPinotMetricsRegistration() {
+    listenerOneOkay = false;
+    listenerTwoOkay = false;
+
+    Map<String, Object> properties = new HashMap<>();
+    properties.put("pinot.broker.metrics.metricsRegistryRegistrationListeners",
+        PinotMetricUtilsTest.ListenerOne.class.getName() + "," + PinotMetricUtilsTest.ListenerTwo.class.getName());
+
+    // Initialize the PinotMetricUtils and create a new timer
+    PinotConfiguration configuration = new PinotConfiguration(properties);
+    PinotMetricUtils.init(configuration.subset("pinot.broker.metrics"));
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    PinotMetricUtils.makePinotTimer(registry, PinotMetricUtils.makePinotMetricName(PinotMetricUtilsTest.class, "dummy"),
+        TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS);
+
+    // Check that the two listeners fired
+    Assert.assertTrue(listenerOneOkay);
+    Assert.assertTrue(listenerTwoOkay);
+  }
+
+  @Test
+  public void testMetricValue() {
+    PinotMetricsRegistry registry = PinotMetricUtils.getPinotMetricsRegistry();
+    PinotMeter pinotMeter = PinotMetricUtils
+        .makePinotMeter(registry, PinotMetricUtils.makePinotMetricName(PinotMetricUtilsTest.class, "testMeter"),
+            "dummyEventType", TimeUnit.MILLISECONDS);
+    pinotMeter.mark();
+    Assert.assertEquals(pinotMeter.count(), 1L);
+
+    pinotMeter.mark(2L);
+    Assert.assertEquals(pinotMeter.count(), 3L);
+  }
+
+  @Test
+  public void testPinotMetricName() {
+    PinotMetricName testMetricName1 =
+        PinotMetricUtils.makePinotMetricName(PinotMetricUtilsTest.class, "testMetricName");
+    PinotMetricName testMetricName2 =
+        PinotMetricUtils.makePinotMetricName(PinotMetricUtilsTest.class, "testMetricName");
+    Assert.assertNotNull(testMetricName1);
+    Assert.assertNotNull(testMetricName2);
+    Assert.assertEquals(testMetricName1, testMetricName2);
   }
 }
diff --git a/pinot-connectors/pinot-spark-connector/pom.xml b/pinot-connectors/pinot-spark-connector/pom.xml
index 73e8a98..323445b 100644
--- a/pinot-connectors/pinot-spark-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-connector/pom.xml
@@ -266,6 +266,11 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.pinot</groupId>
+            <artifactId>pinot-yammer</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>
diff --git a/pinot-controller/pom.xml b/pinot-controller/pom.xml
index 9746a0e..875a721 100644
--- a/pinot-controller/pom.xml
+++ b/pinot-controller/pom.xml
@@ -85,6 +85,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 45ad404..b9c0501 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -48,11 +48,9 @@ import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.model.Message;
 import org.apache.helix.task.TaskDriver;
 import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.common.metrics.MetricsHelper;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.metrics.ValidationMetrics;
@@ -482,16 +480,9 @@ public class ControllerStarter implements ServiceStartable {
 
   private void initControllerMetrics() {
     PinotConfiguration metricsConfiguration = _config.subset(METRICS_REGISTRY_NAME);
-    try {
-      PinotMetricUtils.init(metricsConfiguration);
-      _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    } catch (InvalidConfigException e) {
-      throw new RuntimeException("Caught InvalidConfigException when initializing metricsRegistry", e);
-    }
+    PinotMetricUtils.init(metricsConfiguration);
+    _metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
     _controllerMetrics = new ControllerMetrics(_config.getMetricsPrefix(), _metricsRegistry);
-
-    MetricsHelper.initializeMetrics(metricsConfiguration);
-    MetricsHelper.registerMetricsRegistry(_metricsRegistry);
     _controllerMetrics.initializeGlobalMeters();
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
index 30baf63..95dee4b 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/TaskGeneratorRegistry.java
@@ -26,11 +26,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
 import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
 import org.apache.pinot.spi.annotations.minion.TaskGenerator;
-import org.reflections.Reflections;
-import org.reflections.scanners.TypeAnnotationsScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.reflections.util.FilterBuilder;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,12 +69,7 @@ public class TaskGeneratorRegistry {
   }
 
   public static Set<Class<?>> getTaskGeneratorClasses() {
-    Reflections reflections = new Reflections(
-        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
-            .filterInputsBy(new FilterBuilder.Include(TASK_GENERATOR_PACKAGE_REGEX_PATTERN))
-            .setScanners(new TypeAnnotationsScanner()));
-    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskGenerator.class, true);
-    return classes;
+    return PinotReflectionUtils.getClassesThroughReflection(TASK_GENERATOR_PACKAGE_REGEX_PATTERN, TaskGenerator.class);
   }
 
   /**
diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml
index 6e007f9..c381db2 100644
--- a/pinot-core/pom.xml
+++ b/pinot-core/pom.xml
@@ -219,6 +219,12 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.lucene</groupId>
       <artifactId>lucene-core</artifactId>
       <version>${lucene.version}</version>
diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml
index f8fe065..90ed685 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -128,6 +128,12 @@
       <destName>plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pinot-minion-builtin-tasks-${project.version}-shaded.jar</destName>
     </file>
     <!-- End Include Pinot Minion Tasks Plugins-->
+    <!-- Start Include Pinot Metrics Plugins-->
+    <file>
+      <source>${pinot.root}/pinot-plugins/pinot-metrics/pinot-yammer/target/pinot-yammer-${project.version}-shaded.jar</source>
+      <destName>plugins/pinot-metrics/pinot-yammer/pinot-yammer-${project.version}-shaded.jar</destName>
+    </file>
+    <!-- End Include Pinot Metrics Plugins-->
     <!-- End Include Pinot Plugins-->
   </files>
   <fileSets>
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 598412e..c837772 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -214,6 +214,11 @@
       <scope>runtime</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
     </dependency>
diff --git a/pinot-minion/pom.xml b/pinot-minion/pom.xml
index f9a442c..8c86c48 100644
--- a/pinot-minion/pom.xml
+++ b/pinot-minion/pom.xml
@@ -115,5 +115,10 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index cdb3ffb..3ea4caa 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -30,7 +30,6 @@ import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.pinot.common.Utils;
-import org.apache.pinot.common.metrics.MetricsHelper;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.utils.ClientSSLContextGenerator;
@@ -155,9 +154,6 @@ public class MinionStarter implements ServiceStartable {
     PinotMetricUtils.init(metricsConfiguration);
     PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
 
-    MetricsHelper.initializeMetrics(_config);
-
-    MetricsHelper.registerMetricsRegistry(metricsRegistry);
     MinionMetrics minionMetrics = new MinionMetrics(_config
         .getProperty(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY,
             CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX), metricsRegistry);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
index fa4815b..716bdd8 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/event/EventObserverFactoryRegistry.java
@@ -23,11 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.spi.annotations.minion.EventObserverFactory;
-import org.reflections.Reflections;
-import org.reflections.scanners.TypeAnnotationsScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.reflections.util.FilterBuilder;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,10 +43,8 @@ public class EventObserverFactoryRegistry {
    */
   public EventObserverFactoryRegistry(MinionTaskZkMetadataManager zkMetadataManager) {
     long startTimeMs = System.currentTimeMillis();
-    Reflections reflections = new Reflections(
-        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
-            .filterInputsBy(new FilterBuilder.Include(".*\\.event\\..*")).setScanners(new TypeAnnotationsScanner()));
-    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(EventObserverFactory.class, true);
+    Set<Class<?>> classes =
+        PinotReflectionUtils.getClassesThroughReflection(".*\\.event\\..*", EventObserverFactory.class);
     for (Class<?> clazz : classes) {
       EventObserverFactory annotation = clazz.getAnnotation(EventObserverFactory.class);
       if (annotation.enabled()) {
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
index e94e009..6b5f2c2 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/TaskExecutorFactoryRegistry.java
@@ -22,12 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.pinot.spi.annotations.minion.TaskExecutorFactory;
-import org.apache.pinot.spi.annotations.minion.TaskGenerator;
-import org.reflections.Reflections;
-import org.reflections.scanners.TypeAnnotationsScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
-import org.reflections.util.FilterBuilder;
+import org.apache.pinot.spi.utils.PinotReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,12 +67,8 @@ public class TaskExecutorFactoryRegistry {
   }
 
   public static Set<Class<?>> getTaskExecutorFactoryClasses() {
-    Reflections reflections = new Reflections(
-        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage("org.apache.pinot"))
-            .filterInputsBy(new FilterBuilder.Include(TASK_EXECUTOR_PACKAGE_REGEX_PATTERN))
-            .setScanners(new TypeAnnotationsScanner()));
-    Set<Class<?>> classes = reflections.getTypesAnnotatedWith(TaskExecutorFactory.class, true);
-    return classes;
+    return PinotReflectionUtils
+        .getClassesThroughReflection(TASK_EXECUTOR_PACKAGE_REGEX_PATTERN, TaskExecutorFactory.class);
   }
 
   /**
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml b/pinot-plugins/pinot-metrics/pinot-yammer/pom.xml
similarity index 81%
copy from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
copy to pinot-plugins/pinot-metrics/pinot-yammer/pom.xml
index 6a4fbae..1e96e71 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/pom.xml
@@ -23,14 +23,14 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot-minion-tasks</artifactId>
+    <artifactId>pinot-metrics</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>0.7.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
 
-  <artifactId>pinot-minion-builtin-tasks</artifactId>
-  <name>Pinot Minion Built-In Tasks</name>
+  <artifactId>pinot-yammer</artifactId>
+  <name>Pinot Yammer Metrics</name>
   <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
@@ -55,4 +55,16 @@
       </plugin>
     </plugins>
   </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-spi</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>2.2.0</version>
+    </dependency>
+  </dependencies>
 </project>
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerCounter.java
similarity index 96%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerCounter.java
index 0396a92..6f25f1f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerCounter.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerCounter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Metric;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerGauge.java
similarity index 96%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerGauge.java
index e791596..c08452b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerGauge.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerGauge.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Gauge;
 import java.util.function.Function;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerJmxReporter.java
similarity index 96%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerJmxReporter.java
index decc314..a7f055d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerJmxReporter.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerJmxReporter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.reporting.JmxReporter;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMeter.java
similarity index 96%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMeter.java
index 92a350b..76dd55d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMeter.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMeter.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Meter;
 import org.apache.pinot.spi.metrics.PinotMeter;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetered.java
similarity index 97%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetered.java
index 599e4cc..b3ca0be 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetered.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetered.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Metered;
 import java.util.concurrent.TimeUnit;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetric.java
similarity index 95%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetric.java
index 8b3753a..eb0c481 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetric.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetric.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Metric;
 import org.apache.pinot.spi.metrics.PinotMetric;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricName.java
similarity index 97%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricName.java
index fc21012..d461fdc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricName.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricName.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.MetricName;
 import org.apache.pinot.spi.metrics.PinotMetricName;
diff --git a/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsFactory.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsFactory.java
new file mode 100644
index 0000000..37b0aef
--- /dev/null
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsFactory.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.metrics.yammer;
+
+import java.util.function.Function;
+import org.apache.pinot.spi.annotations.metrics.MetricsFactory;
+import org.apache.pinot.spi.annotations.metrics.PinotMetricsFactory;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotGauge;
+import org.apache.pinot.spi.metrics.PinotJmxReporter;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+
+
+@MetricsFactory
+public class YammerMetricsFactory implements PinotMetricsFactory {
+  private PinotMetricsRegistry _pinotMetricsRegistry = null;
+
+  @Override
+  public void init(PinotConfiguration metricsConfiguration) {
+
+  }
+
+  @Override
+  public PinotMetricsRegistry getPinotMetricsRegistry() {
+    if (_pinotMetricsRegistry == null) {
+      _pinotMetricsRegistry = new YammerMetricsRegistry();
+    }
+    return _pinotMetricsRegistry;
+  }
+
+  @Override
+  public PinotMetricName makePinotMetricName(Class<?> klass, String name) {
+    return new YammerMetricName(klass, name);
+  }
+
+  @Override
+  public <T> PinotGauge<T> makePinotGauge(Function<Void, T> condition) {
+    return new YammerGauge<T>(condition);
+  }
+
+  @Override
+  public PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry metricsRegistry) {
+    return new YammerJmxReporter(metricsRegistry);
+  }
+
+  @Override
+  public String getMetricsFactoryName() {
+    return "Yammer";
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistry.java
similarity index 98%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistry.java
index b6ef3d5..128b06a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistry.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistry.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Clock;
 import com.yammer.metrics.core.Gauge;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistryListener.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistryListener.java
similarity index 96%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistryListener.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistryListener.java
index 20646ae..5f723fe 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerMetricsRegistryListener.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerMetricsRegistryListener.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.MetricsRegistryListener;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistryListener;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerTimer.java
similarity index 97%
rename from pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java
rename to pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerTimer.java
index d4b3c01..07e004d 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/yammer/YammerTimer.java
+++ b/pinot-plugins/pinot-metrics/pinot-yammer/src/main/java/org/apache/pinot/plugin/metrics/yammer/YammerTimer.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics.yammer;
+package org.apache.pinot.plugin.metrics.yammer;
 
 import com.yammer.metrics.core.Timer;
 import java.util.concurrent.TimeUnit;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml b/pinot-plugins/pinot-metrics/pom.xml
similarity index 61%
copy from pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
copy to pinot-plugins/pinot-metrics/pom.xml
index 6a4fbae..b49f34b 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
+++ b/pinot-plugins/pinot-metrics/pom.xml
@@ -19,40 +19,34 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <modelVersion>4.0.0</modelVersion>
   <parent>
-    <artifactId>pinot-minion-tasks</artifactId>
+    <artifactId>pinot-plugins</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>0.7.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
-
-  <artifactId>pinot-minion-builtin-tasks</artifactId>
-  <name>Pinot Minion Built-In Tasks</name>
+  <artifactId>pinot-metrics</artifactId>
+  <packaging>pom</packaging>
+  <name>Pinot Metrics</name>
   <url>https://pinot.apache.org/</url>
   <properties>
-    <pinot.root>${basedir}/../../..</pinot.root>
-    <phase.prop>package</phase.prop>
+    <pinot.root>${basedir}/../..</pinot.root>
+    <plugin.type>pinot-metrics</plugin.type>
   </properties>
+  <modules>
+    <module>pinot-yammer</module>
+  </modules>
 
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-jar-plugin</artifactId>
-        <executions>
-          <execution>
-            <goals>
-              <goal>test-jar</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-enforcer-plugin</artifactId>
-      </plugin>
-    </plugins>
-  </build>
+  <dependencies>
+    <!-- Test -->
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 </project>
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
index 6a4fbae..90dc8fe 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/pom.xml
@@ -55,4 +55,12 @@
       </plugin>
     </plugins>
   </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
 </project>
diff --git a/pinot-plugins/pom.xml b/pinot-plugins/pom.xml
index a7ec854..5ec34dd 100644
--- a/pinot-plugins/pom.xml
+++ b/pinot-plugins/pom.xml
@@ -45,6 +45,7 @@
     <module>pinot-batch-ingestion</module>
     <module>pinot-stream-ingestion</module>
     <module>pinot-minion-tasks</module>
+    <module>pinot-metrics</module>
   </modules>
 
   <dependencies>
diff --git a/pinot-server/pom.xml b/pinot-server/pom.xml
index a51c8f7..c97d20c 100644
--- a/pinot-server/pom.xml
+++ b/pinot-server/pom.xml
@@ -247,6 +247,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-yammer</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-math3</artifactId>
       <scope>test</scope>
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 4815c02..8b25d8c 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.concurrent.atomic.LongAccumulator;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.function.FunctionRegistry;
-import org.apache.pinot.common.metrics.MetricsHelper;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.common.metrics.PinotMetricUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -71,8 +70,6 @@ public class ServerInstance {
     PinotConfiguration metricsConfiguration = serverConf.getMetricsConfig();
     PinotMetricUtils.init(metricsConfiguration);
     PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry();
-    MetricsHelper.initializeMetrics(metricsConfiguration);
-    MetricsHelper.registerMetricsRegistry(metricsRegistry);
     _serverMetrics =
         new ServerMetrics(serverConf.getMetricsPrefix(), metricsRegistry, serverConf.emitTableLevelMetrics(),
             serverConf.getAllowedTablesForEmittingMetrics());
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/MetricsFactory.java
similarity index 58%
copy from pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
copy to pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/MetricsFactory.java
index 0f82991..6bc2c9c 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/MetricsRegistryRegistrationListener.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/MetricsFactory.java
@@ -16,15 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.common.metrics;
+package org.apache.pinot.spi.annotations.metrics;
 
-import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 
 /**
- * Interface to implement operations that occur whenever a new MetricsRegistry is registered with the MetricsHelper.
+ * Annotation for Pinot metrics.
  *
+ * NOTE:
+ *   - The annotated class must implement the MinionEventObserverFactory interface
+ *   - The annotated class must be under the package of name 'org.apache.pinot.*.plugin.metrics.*' to be auto-registered.
  */
-public interface MetricsRegistryRegistrationListener {
-  void onMetricsRegistryRegistered(PinotMetricsRegistry metricsRegistry);
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface MetricsFactory {
+
+  boolean enabled() default true;
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java
new file mode 100644
index 0000000..d75e44b
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/metrics/PinotMetricsFactory.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.annotations.metrics;
+
+import java.util.function.Function;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotGauge;
+import org.apache.pinot.spi.metrics.PinotJmxReporter;
+import org.apache.pinot.spi.metrics.PinotMetricName;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
+
+/**
+ * Factory for generating objects of Pinot metrics.
+ */
+public interface PinotMetricsFactory {
+
+  /**
+   * Initializes the Pinot metrics factory.
+   */
+  void init(PinotConfiguration metricsConfiguration);
+
+  /**
+   * Gets {@link PinotMetricsRegistry}. There should be only one such instance in every {@link PinotMetricsRegistry}.
+   */
+  PinotMetricsRegistry getPinotMetricsRegistry();
+
+  /**
+   * Makes a {@link PinotMetricName} given the class and the metric name.
+   */
+  PinotMetricName makePinotMetricName(Class<?> klass, String name);
+
+  /**
+   * Makes a {@link PinotGauge} given a function.
+   */
+  <T> PinotGauge<T> makePinotGauge(Function<Void, T> condition);
+
+  /**
+   * Makes a {@link PinotJmxReporter} given a {@link PinotMetricsRegistry}.
+   */
+  PinotJmxReporter makePinotJmxReporter(PinotMetricsRegistry metricsRegistry);
+
+  /**
+   * Returns the name of metrics factory.
+   */
+  String getMetricsFactoryName();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/PinotReflectionUtils.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/PinotReflectionUtils.java
new file mode 100644
index 0000000..b626ce3
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/PinotReflectionUtils.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.utils;
+
+import java.lang.annotation.Annotation;
+import java.util.Set;
+import org.reflections.Reflections;
+import org.reflections.scanners.TypeAnnotationsScanner;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+import org.reflections.util.FilterBuilder;
+
+
+public class PinotReflectionUtils {
+  private static final String PINOT_PACKAGE_PREFIX = "org.apache.pinot";
+
+  public static Set<Class<?>> getClassesThroughReflection(final String regexPattern,
+      final Class<? extends Annotation> annotation) {
+    Reflections reflections = new Reflections(
+        new ConfigurationBuilder().setUrls(ClasspathHelper.forPackage(PINOT_PACKAGE_PREFIX))
+            .filterInputsBy(new FilterBuilder.Include(regexPattern)).setScanners(new TypeAnnotationsScanner()));
+    return reflections.getTypesAnnotatedWith(annotation, true);
+  }
+}
diff --git a/pom.xml b/pom.xml
index b868103..f2a4aac 100644
--- a/pom.xml
+++ b/pom.xml
@@ -275,6 +275,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-yammer</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
         <artifactId>pinot-java-client</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -390,11 +395,6 @@
         <version>20.0</version>
       </dependency>
       <dependency>
-        <groupId>com.yammer.metrics</groupId>
-        <artifactId>metrics-core</artifactId>
-        <version>2.2.0</version>
-      </dependency>
-      <dependency>
         <groupId>commons-cli</groupId>
         <artifactId>commons-cli</artifactId>
         <version>1.2</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org