You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by an...@apache.org on 2024/02/23 21:34:12 UTC

(pinot) branch master updated: [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)

This is an automated email from the ASF dual-hosted git repository.

ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f09cc8d56 [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
7f09cc8d56 is described below

commit 7f09cc8d5664aa877005e37cc1219b3b10286cde
Author: Christina Li <42...@users.noreply.github.com>
AuthorDate: Fri Feb 23 13:34:06 2024 -0800

    [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
---
 .../broker/broker/helix/BaseBrokerStarter.java     |  2 +-
 .../AdaptiveServerSelectorTest.java                | 44 ++++++++++++++++++----
 .../apache/pinot/common/metrics/BrokerGauge.java   |  7 +++-
 .../common/reader/PinotServerDataFetcher.scala     |  2 +-
 .../routing/stats/ServerRoutingStatsManager.java   | 13 ++++++-
 .../pinot/core/transport/QueryRoutingTest.java     |  2 +-
 .../stats/ServerRoutingStatsManagerTest.java       | 39 +++++++++++++++++--
 7 files changed, 91 insertions(+), 18 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index b95b820f25..218eab54b6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -262,7 +262,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
     _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
     BrokerMetrics.register(_brokerMetrics);
     // Set up request handling classes
-    _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics);
     _serverRoutingStatsManager.init();
     _routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
     _routingManager.init(_spectatorHelixManager);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index ed976b3b03..926de0699d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -20,16 +20,21 @@ package org.apache.pinot.broker.routing.adaptiveserverselector;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.utils.ExponentialMovingAverage;
 import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -39,16 +44,39 @@ import static org.testng.Assert.assertTrue;
 
 
 public class AdaptiveServerSelectorTest {
+  private BrokerMetrics _brokerMetrics;
+
   List<String> _servers = Arrays.asList("server1", "server2", "server3", "server4");
   Map<String, Object> _properties = new HashMap<>();
 
+  @BeforeTest
+  public void initBrokerMetrics() {
+    // Set up metric registry and broker metrics
+    PinotConfiguration brokerConfig = new PinotConfiguration();
+    PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(
+        brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+    _brokerMetrics = new BrokerMetrics(
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+            CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+        metricsRegistry,
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+            CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+            Collections.emptyList()));
+    _brokerMetrics.initializeGlobalMeters();
+    BrokerMetrics.register(_brokerMetrics);
+  }
+
   @Test
   public void testAdaptiveServerSelectorFactory() {
     // Test 1: Test disabling Adaptive Server Selection .
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.NO_OP.name());
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     assertNull(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg));
 
     // Enable stats collection. Without this, AdaptiveServerSelectors cannot be used.
@@ -58,7 +86,7 @@ public class AdaptiveServerSelectorTest {
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.NUM_INFLIGHT_REQ.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof NumInFlightReqSelector);
 
@@ -66,7 +94,7 @@ public class AdaptiveServerSelectorTest {
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.LATENCY.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof LatencySelector);
 
@@ -74,7 +102,7 @@ public class AdaptiveServerSelectorTest {
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
         CommonConstants.Broker.AdaptiveServerSelector.Type.HYBRID.name());
     cfg = new PinotConfiguration(_properties);
-    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
         cfg) instanceof HybridSelector);
 
@@ -82,7 +110,7 @@ public class AdaptiveServerSelectorTest {
     assertThrows(IllegalArgumentException.class, () -> {
       _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, "Dummy");
       PinotConfiguration config = new PinotConfiguration(_properties);
-      ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config);
+      ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config, _brokerMetrics);
       AdaptiveServerSelectorFactory.getAdaptiveServerSelector(manager, config);
     });
   }
@@ -91,7 +119,7 @@ public class AdaptiveServerSelectorTest {
   public void testNumInFlightReqSelector() {
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
     long taskCount = 0;
@@ -290,7 +318,7 @@ public class AdaptiveServerSelectorTest {
     _properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
         avgInitializationVal);
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
     long taskCount = 0;
@@ -430,7 +458,7 @@ public class AdaptiveServerSelectorTest {
         hybridSelectorExponent);
 
     PinotConfiguration cfg = new PinotConfiguration(_properties);
-    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
     serverRoutingStatsManager.init();
     assertTrue(serverRoutingStatsManager.isEnabled());
 
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 25e66eabd0..601544b9f1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -56,7 +56,12 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
    * The cache size used by the allocator for normal arenas
    */
   NETTY_POOLED_THREADLOCALCACHE("bytes", true),
-  NETTY_POOLED_CHUNK_SIZE("bytes", true);
+  NETTY_POOLED_CHUNK_SIZE("bytes", true),
+
+  /**
+   * The queue size of ServerRoutingStatsManager main executor service.
+   */
+  ROUTING_STATS_MANAGER_QUEUE_SIZE("routingStatsManagerQueueSize", true);
 
   private final String _brokerGaugeName;
   private final String _unit;
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
index 48255233a9..1a0443203b 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
@@ -48,7 +48,7 @@ private[reader] class PinotServerDataFetcher(
   private val metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry
   private val brokerMetrics = new BrokerMetrics(metricsRegistry)
   private val pinotConfig = new PinotConfiguration()
-  private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig)
+  private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig, brokerMetrics)
   private val queryRouter = new QueryRouter(brokerId, brokerMetrics, serverRoutingStatsManager)
   // TODO add support for TLS-secured server
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 87c80566c1..1057fc084f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.AdaptiveServerSelector;
 import org.slf4j.Logger;
@@ -46,6 +48,7 @@ public class ServerRoutingStatsManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServerRoutingStatsManager.class);
 
   private final PinotConfiguration _config;
+  private final BrokerMetrics _brokerMetrics;
   private volatile boolean _isEnabled;
   private ConcurrentHashMap<String, ServerRoutingStatsEntry> _serverQueryStatsMap;
 
@@ -61,8 +64,9 @@ public class ServerRoutingStatsManager {
   private double _avgInitializationVal;
   private int _hybridScoreExponent;
 
-  public ServerRoutingStatsManager(PinotConfiguration pinotConfig) {
+  public ServerRoutingStatsManager(PinotConfiguration pinotConfig, BrokerMetrics brokerMetrics) {
     _config = pinotConfig;
+    _brokerMetrics = brokerMetrics;
   }
 
   public void init() {
@@ -138,9 +142,9 @@ public class ServerRoutingStatsManager {
       return;
     }
 
-    // TODO: Track Executor qSize and alert if it crosses a threshold.
     _executorService.execute(() -> {
       try {
+        recordQueueSizeMetrics();
         updateStatsAfterQuerySubmission(serverInstanceId);
       } catch (Exception e) {
         LOGGER.error("Exception caught while updating stats. requestId={}, exception={}", requestId, e);
@@ -377,4 +381,9 @@ public class ServerRoutingStatsManager {
       stats.getServerReadLock().unlock();
     }
   }
+
+  private void recordQueueSizeMetrics() {
+    int queueSize = getQueueSize();
+    _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE, queueSize);
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 07aa0e2c08..cbf4174bca 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -73,7 +73,7 @@ public class QueryRoutingTest {
     Map<String, Object> properties = new HashMap<>();
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
     PinotConfiguration cfg = new PinotConfiguration(properties);
-    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+    _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, mock(BrokerMetrics.class));
     _serverRoutingStatsManager.init();
     _queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), _serverRoutingStatsManager);
     _requestCount = 0;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 73dd623b5b..84983f87ec 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -18,13 +18,18 @@
  */
 package org.apache.pinot.core.transport.server.routing.stats;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -34,20 +39,44 @@ import static org.testng.Assert.assertTrue;
 
 
 public class ServerRoutingStatsManagerTest {
+  private BrokerMetrics _brokerMetrics;
+
+  @BeforeTest
+  public void initBrokerMetrics() {
+    // Set up metric registry and broker metrics
+    PinotConfiguration brokerConfig = new PinotConfiguration();
+    PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(
+        brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+    _brokerMetrics = new BrokerMetrics(
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+            CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+        metricsRegistry,
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+            CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+        brokerConfig.getProperty(
+            CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+            Collections.emptyList()));
+    _brokerMetrics.initializeGlobalMeters();
+    BrokerMetrics.register(_brokerMetrics);
+  }
+
   @Test
   public void testInitAndShutDown() {
     Map<String, Object> properties = new HashMap<>();
 
     // Test 1: Test disabled.
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, false);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+        _brokerMetrics);
     assertFalse(manager.isEnabled());
     manager.init();
     assertFalse(manager.isEnabled());
 
     // Test 2: Test enabled.
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
-    manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+    manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), _brokerMetrics);
     assertFalse(manager.isEnabled());
     manager.init();
     assertTrue(manager.isEnabled());
@@ -64,7 +93,8 @@ public class ServerRoutingStatsManagerTest {
   public void testEmptyStats() {
     Map<String, Object> properties = new HashMap<>();
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+        _brokerMetrics);
     manager.init();
 
     List<Pair<String, Integer>> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
@@ -94,7 +124,8 @@ public class ServerRoutingStatsManagerTest {
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS, 0);
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL, 0.0);
     properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT, 3);
-    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+    ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+        _brokerMetrics);
     manager.init();
 
     int requestId = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org