You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by an...@apache.org on 2024/02/23 21:34:12 UTC
(pinot) branch master updated: [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7f09cc8d56 [Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
7f09cc8d56 is described below
commit 7f09cc8d5664aa877005e37cc1219b3b10286cde
Author: Christina Li <42...@users.noreply.github.com>
AuthorDate: Fri Feb 23 13:34:06 2024 -0800
[Adaptive Server Selector] Add metrics for Stats Manager Queue Size (#12340)
---
.../broker/broker/helix/BaseBrokerStarter.java | 2 +-
.../AdaptiveServerSelectorTest.java | 44 ++++++++++++++++++----
.../apache/pinot/common/metrics/BrokerGauge.java | 7 +++-
.../common/reader/PinotServerDataFetcher.scala | 2 +-
.../routing/stats/ServerRoutingStatsManager.java | 13 ++++++-
.../pinot/core/transport/QueryRoutingTest.java | 2 +-
.../stats/ServerRoutingStatsManagerTest.java | 39 +++++++++++++++++--
7 files changed, 91 insertions(+), 18 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index b95b820f25..218eab54b6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -262,7 +262,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
_brokerMetrics.setValueOfGlobalGauge(BrokerGauge.VERSION, PinotVersion.VERSION_METRIC_NAME, 1);
BrokerMetrics.register(_brokerMetrics);
// Set up request handling classes
- _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf);
+ _serverRoutingStatsManager = new ServerRoutingStatsManager(_brokerConf, _brokerMetrics);
_serverRoutingStatsManager.init();
_routingManager = new BrokerRoutingManager(_brokerMetrics, _serverRoutingStatsManager, _brokerConf);
_routingManager.init(_spectatorHelixManager);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
index ed976b3b03..926de0699d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/adaptiveserverselector/AdaptiveServerSelectorTest.java
@@ -20,16 +20,21 @@ package org.apache.pinot.broker.routing.adaptiveserverselector;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.utils.ExponentialMovingAverage;
import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -39,16 +44,39 @@ import static org.testng.Assert.assertTrue;
public class AdaptiveServerSelectorTest {
+ private BrokerMetrics _brokerMetrics;
+
List<String> _servers = Arrays.asList("server1", "server2", "server3", "server4");
Map<String, Object> _properties = new HashMap<>();
+ @BeforeTest
+ public void initBrokerMetrics() {
+ // Set up metric registry and broker metrics
+ PinotConfiguration brokerConfig = new PinotConfiguration();
+ PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(
+ brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+ _brokerMetrics = new BrokerMetrics(
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+ CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+ metricsRegistry,
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+ CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+ Collections.emptyList()));
+ _brokerMetrics.initializeGlobalMeters();
+ BrokerMetrics.register(_brokerMetrics);
+ }
+
@Test
public void testAdaptiveServerSelectorFactory() {
// Test 1: Test disabling Adaptive Server Selection .
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
CommonConstants.Broker.AdaptiveServerSelector.Type.NO_OP.name());
PinotConfiguration cfg = new PinotConfiguration(_properties);
- ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
assertNull(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager, cfg));
// Enable stats collection. Without this, AdaptiveServerSelectors cannot be used.
@@ -58,7 +86,7 @@ public class AdaptiveServerSelectorTest {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
CommonConstants.Broker.AdaptiveServerSelector.Type.NUM_INFLIGHT_REQ.name());
cfg = new PinotConfiguration(_properties);
- serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
cfg) instanceof NumInFlightReqSelector);
@@ -66,7 +94,7 @@ public class AdaptiveServerSelectorTest {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
CommonConstants.Broker.AdaptiveServerSelector.Type.LATENCY.name());
cfg = new PinotConfiguration(_properties);
- serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
cfg) instanceof LatencySelector);
@@ -74,7 +102,7 @@ public class AdaptiveServerSelectorTest {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE,
CommonConstants.Broker.AdaptiveServerSelector.Type.HYBRID.name());
cfg = new PinotConfiguration(_properties);
- serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
assertTrue(AdaptiveServerSelectorFactory.getAdaptiveServerSelector(serverRoutingStatsManager,
cfg) instanceof HybridSelector);
@@ -82,7 +110,7 @@ public class AdaptiveServerSelectorTest {
assertThrows(IllegalArgumentException.class, () -> {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_TYPE, "Dummy");
PinotConfiguration config = new PinotConfiguration(_properties);
- ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config);
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(config, _brokerMetrics);
AdaptiveServerSelectorFactory.getAdaptiveServerSelector(manager, config);
});
}
@@ -91,7 +119,7 @@ public class AdaptiveServerSelectorTest {
public void testNumInFlightReqSelector() {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
PinotConfiguration cfg = new PinotConfiguration(_properties);
- ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
serverRoutingStatsManager.init();
assertTrue(serverRoutingStatsManager.isEnabled());
long taskCount = 0;
@@ -290,7 +318,7 @@ public class AdaptiveServerSelectorTest {
_properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL,
avgInitializationVal);
PinotConfiguration cfg = new PinotConfiguration(_properties);
- ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
serverRoutingStatsManager.init();
assertTrue(serverRoutingStatsManager.isEnabled());
long taskCount = 0;
@@ -430,7 +458,7 @@ public class AdaptiveServerSelectorTest {
hybridSelectorExponent);
PinotConfiguration cfg = new PinotConfiguration(_properties);
- ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ ServerRoutingStatsManager serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, _brokerMetrics);
serverRoutingStatsManager.init();
assertTrue(serverRoutingStatsManager.isEnabled());
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 25e66eabd0..601544b9f1 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -56,7 +56,12 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
* The cache size used by the allocator for normal arenas
*/
NETTY_POOLED_THREADLOCALCACHE("bytes", true),
- NETTY_POOLED_CHUNK_SIZE("bytes", true);
+ NETTY_POOLED_CHUNK_SIZE("bytes", true),
+
+ /**
+ * The queue size of ServerRoutingStatsManager main executor service.
+ */
+ ROUTING_STATS_MANAGER_QUEUE_SIZE("routingStatsManagerQueueSize", true);
private final String _brokerGaugeName;
private final String _unit;
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
index 48255233a9..1a0443203b 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
@@ -48,7 +48,7 @@ private[reader] class PinotServerDataFetcher(
private val metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry
private val brokerMetrics = new BrokerMetrics(metricsRegistry)
private val pinotConfig = new PinotConfiguration()
- private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig)
+ private val serverRoutingStatsManager = new ServerRoutingStatsManager(pinotConfig, brokerMetrics)
private val queryRouter = new QueryRouter(brokerId, brokerMetrics, serverRoutingStatsManager)
// TODO add support for TLS-secured server
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
index 87c80566c1..1057fc084f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManager.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants.Broker.AdaptiveServerSelector;
import org.slf4j.Logger;
@@ -46,6 +48,7 @@ public class ServerRoutingStatsManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerRoutingStatsManager.class);
private final PinotConfiguration _config;
+ private final BrokerMetrics _brokerMetrics;
private volatile boolean _isEnabled;
private ConcurrentHashMap<String, ServerRoutingStatsEntry> _serverQueryStatsMap;
@@ -61,8 +64,9 @@ public class ServerRoutingStatsManager {
private double _avgInitializationVal;
private int _hybridScoreExponent;
- public ServerRoutingStatsManager(PinotConfiguration pinotConfig) {
+ public ServerRoutingStatsManager(PinotConfiguration pinotConfig, BrokerMetrics brokerMetrics) {
_config = pinotConfig;
+ _brokerMetrics = brokerMetrics;
}
public void init() {
@@ -138,9 +142,9 @@ public class ServerRoutingStatsManager {
return;
}
- // TODO: Track Executor qSize and alert if it crosses a threshold.
_executorService.execute(() -> {
try {
+ recordQueueSizeMetrics();
updateStatsAfterQuerySubmission(serverInstanceId);
} catch (Exception e) {
LOGGER.error("Exception caught while updating stats. requestId={}, exception={}", requestId, e);
@@ -377,4 +381,9 @@ public class ServerRoutingStatsManager {
stats.getServerReadLock().unlock();
}
}
+
+ private void recordQueueSizeMetrics() {
+ int queueSize = getQueueSize();
+ _brokerMetrics.setValueOfGlobalGauge(BrokerGauge.ROUTING_STATS_MANAGER_QUEUE_SIZE, queueSize);
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 07aa0e2c08..cbf4174bca 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -73,7 +73,7 @@ public class QueryRoutingTest {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
PinotConfiguration cfg = new PinotConfiguration(properties);
- _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg);
+ _serverRoutingStatsManager = new ServerRoutingStatsManager(cfg, mock(BrokerMetrics.class));
_serverRoutingStatsManager.init();
_queryRouter = new QueryRouter("testBroker", mock(BrokerMetrics.class), _serverRoutingStatsManager);
_requestCount = 0;
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
index 73dd623b5b..84983f87ec 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/server/routing/stats/ServerRoutingStatsManagerTest.java
@@ -18,13 +18,18 @@
*/
package org.apache.pinot.core.transport.server.routing.stats;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -34,20 +39,44 @@ import static org.testng.Assert.assertTrue;
public class ServerRoutingStatsManagerTest {
+ private BrokerMetrics _brokerMetrics;
+
+ @BeforeTest
+ public void initBrokerMetrics() {
+ // Set up metric registry and broker metrics
+ PinotConfiguration brokerConfig = new PinotConfiguration();
+ PinotMetricsRegistry metricsRegistry = PinotMetricUtils.getPinotMetricsRegistry(
+ brokerConfig.subset(CommonConstants.Broker.METRICS_CONFIG_PREFIX));
+ _brokerMetrics = new BrokerMetrics(
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_METRICS_NAME_PREFIX,
+ CommonConstants.Broker.DEFAULT_METRICS_NAME_PREFIX),
+ metricsRegistry,
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_ENABLE_TABLE_LEVEL_METRICS,
+ CommonConstants.Broker.DEFAULT_ENABLE_TABLE_LEVEL_METRICS),
+ brokerConfig.getProperty(
+ CommonConstants.Broker.CONFIG_OF_ALLOWED_TABLES_FOR_EMITTING_METRICS,
+ Collections.emptyList()));
+ _brokerMetrics.initializeGlobalMeters();
+ BrokerMetrics.register(_brokerMetrics);
+ }
+
@Test
public void testInitAndShutDown() {
Map<String, Object> properties = new HashMap<>();
// Test 1: Test disabled.
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, false);
- ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+ _brokerMetrics);
assertFalse(manager.isEnabled());
manager.init();
assertFalse(manager.isEnabled());
// Test 2: Test enabled.
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
- manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+ manager = new ServerRoutingStatsManager(new PinotConfiguration(properties), _brokerMetrics);
assertFalse(manager.isEnabled());
manager.init();
assertTrue(manager.isEnabled());
@@ -64,7 +93,8 @@ public class ServerRoutingStatsManagerTest {
public void testEmptyStats() {
Map<String, Object> properties = new HashMap<>();
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_ENABLE_STATS_COLLECTION, true);
- ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+ _brokerMetrics);
manager.init();
List<Pair<String, Integer>> numInFlightReqList = manager.fetchNumInFlightRequestsForAllServers();
@@ -94,7 +124,8 @@ public class ServerRoutingStatsManagerTest {
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_WARMUP_DURATION_MS, 0);
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_AVG_INITIALIZATION_VAL, 0.0);
properties.put(CommonConstants.Broker.AdaptiveServerSelector.CONFIG_OF_HYBRID_SCORE_EXPONENT, 3);
- ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties));
+ ServerRoutingStatsManager manager = new ServerRoutingStatsManager(new PinotConfiguration(properties),
+ _brokerMetrics);
manager.init();
int requestId = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org