You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/03/08 02:24:40 UTC

[incubator-pinot] branch master updated: Adding the support for sampling logs (#3913)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d684d5e  Adding the support for sampling logs (#3913)
d684d5e is described below

commit d684d5eb26e6a9b01a2db4505267967fe68d3924
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Mar 7 18:24:35 2019 -0800

    Adding the support for sampling logs (#3913)
    
    * Adding the support for sampling logs
    For high throughput use cases, logging becomes the bottleneck
    on both server and broker. "Category.callAppenders()" is using
    a synchronized block that all the worker threads can be stalled
    due to logging. Long term solution should be migration towards
    log4j2. As a short term fix, this pr adds the configuration for
    sampling logs.
    
    * Change to use maxRate concept, added forceLog() to ensure some outliers to get logged
    
    * Addressed comments and added log for reporting dropped log
    
    * Added dropped log for server side as well
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 80 ++++++++++++++++++----
 .../apache/pinot/common/utils/CommonConstants.java |  2 +
 .../core/query/scheduler/PriorityScheduler.java    |  8 ++-
 .../pinot/core/query/scheduler/QueryScheduler.java | 72 ++++++++++++++++---
 .../query/scheduler/fcfs/BoundedFCFSScheduler.java |  9 +--
 .../query/scheduler/fcfs/FCFSQueryScheduler.java   |  2 +-
 .../tokenbucket/TokenPriorityScheduler.java        |  8 +--
 .../query/scheduler/PrioritySchedulerTest.java     | 14 ++--
 .../request/ScheduledRequestHandlerTest.java       | 10 +--
 9 files changed, 159 insertions(+), 46 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 47d478a..20d384e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -20,12 +20,14 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -82,6 +84,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final int _queryResponseLimit;
   protected final int _queryLogLength;
 
+  private final RateLimiter _queryLogRateLimiter;
+  private final RateLimiter _numDroppedLogRateLimiter;
+  private final AtomicInteger _numDroppedLog;
+
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
       TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
@@ -96,9 +102,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
     _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH);
+    _queryLogRateLimiter = RateLimiter.create(
+        config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
+
+    _numDroppedLog = new AtomicInteger(0);
+    _numDroppedLogRateLimiter = RateLimiter.create(1.0);
 
-    LOGGER.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}", _brokerId,
-        _brokerTimeoutMs, _queryResponseLimit, _queryLogLength);
+    LOGGER.info(
+        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
   }
 
   private String getDefaultBrokerId() {
@@ -110,6 +122,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
+  @SuppressWarnings("Duplicates")
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
       RequestStatistics requestStatistics)
@@ -292,22 +305,61 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
-    // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
-    LOGGER.info(
-        "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
-            + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
-        brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
-        brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
-        brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
-        brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
-        brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
-        brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
-        StringUtils.substring(query, 0, _queryLogLength));
-
+    if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
+      // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
+      LOGGER.info(
+          "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
+              + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+          brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
+          brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
+          brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
+          brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+          brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
+          brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
+          StringUtils.substring(query, 0, _queryLogLength));
+
+      // Limit the dropping log message at most once per second.
+      if (_numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+        // get() and set().
+        int numDroppedLog = _numDroppedLog.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+              _queryLogRateLimiter.getRate());
+          _numDroppedLog.set(0);
+        }
+      }
+    } else {
+      // Increment the count for dropped log
+      _numDroppedLog.incrementAndGet();
+    }
     return brokerResponse;
   }
 
   /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with better numbers
+   *
+   */
+  private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
+    if (brokerResponse.isNumGroupsLimitReached()) {
+      return true;
+    }
+
+    if (brokerResponse.getExceptionsSize() > 0) {
+      return true;
+    }
+
+    // If response time is more than 1 sec, force the log
+    if (totalTimeMs > 1000L) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
    * Broker side validation on the broker request.
    * <p>Throw RuntimeException if query does not pass validation.
    * <p>Current validations are:
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0e6f5a5..b76f2b8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,8 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
+    public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond";
+    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
index afb2831..5f968b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -53,9 +54,10 @@ public abstract class PriorityScheduler extends QueryScheduler {
   @VisibleForTesting
   Thread scheduler;
 
-  public PriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
-      @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) {
-    super(queryExecutor, resourceManager, metrics, latestQueryTime);
+  public PriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    super(config, queryExecutor, resourceManager, metrics, latestQueryTime);
     Preconditions.checkNotNull(queue);
     this.queryQueue = queue;
     this.numRunners = resourceManager.getNumQueryRunnerThreads();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 82d2b5d..627cda7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,11 +22,14 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import org.apache.commons.configuration.Configuration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -48,8 +51,15 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class QueryScheduler {
   private static final Logger LOGGER = LoggerFactory.getLogger(QueryScheduler.class);
+
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
+  private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
+  private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
+
+  private final RateLimiter queryLogRateLimiter;
+  private final RateLimiter numDroppedLogRateLimiter;
+  private final AtomicInteger numDroppedLogCounter;
 
   protected final ServerMetrics serverMetrics;
   protected final QueryExecutor queryExecutor;
@@ -63,8 +73,10 @@ public abstract class QueryScheduler {
    * @param resourceManager for managing server thread resources
    * @param serverMetrics server metrics collector
    */
-  public QueryScheduler(@Nonnull QueryExecutor queryExecutor, @Nonnull ResourceManager resourceManager,
-      @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) {
+  public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
+      @Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    Preconditions.checkNotNull(config);
     Preconditions.checkNotNull(queryExecutor);
     Preconditions.checkNotNull(resourceManager);
     Preconditions.checkNotNull(serverMetrics);
@@ -73,6 +85,11 @@ public abstract class QueryScheduler {
     this.resourceManager = resourceManager;
     this.queryExecutor = queryExecutor;
     this.latestQueryTime = latestQueryTime;
+    this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
+    this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
+    this.numDroppedLogCounter = new AtomicInteger(0);
+
+    LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
   }
 
   /**
@@ -122,6 +139,7 @@ public abstract class QueryScheduler {
    * @param executorService Executor service to use for parallelizing query processing
    * @return serialized query response
    */
+  @SuppressWarnings("Duplicates")
   @Nullable
   protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryRequest,
       @Nonnull ExecutorService executorService) {
@@ -170,13 +188,30 @@ public abstract class QueryScheduler {
 
     TimerContext timerContext = queryRequest.getTimerContext();
     int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
-    LOGGER.info(
-        "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-        requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
-        timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT),
-        timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-        timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned,
-        numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+    long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+
+    if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
+      LOGGER.info(
+          "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+          timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
+          numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+
+      // Limit the dropping log message at most once per second.
+      if (numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+        // get() and set().
+        int numDroppedLog = numDroppedLogCounter.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+              queryLogRateLimiter.getRate());
+          numDroppedLogCounter.set(0);
+        }
+      }
+    } else {
+      numDroppedLogCounter.incrementAndGet();
+    }
 
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
@@ -186,6 +221,25 @@ public abstract class QueryScheduler {
   }
 
   /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with better numbers
+   *
+   */
+  private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+    // If scheduler wait time is larger than 100ms, force the log
+    if (schedulerWaitMs > 100L) {
+      return true;
+    }
+
+    // If the number of document scanned is larger than 1 million rows, force the log
+    if (numDocsScanned > 1_000_000L) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Serialize the DataTable response for query request
    * @param queryRequest Server query request for which response is serialized
    * @param dataTable DataTable to serialize
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index 8a425b1..301fb06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -53,12 +53,13 @@ public class BoundedFCFSScheduler extends PriorityScheduler {
       }
     };
     MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
-    return new BoundedFCFSScheduler(rm, queryExecutor, queue, serverMetrics, latestQueryTime);
+    return new BoundedFCFSScheduler(config, rm, queryExecutor, queue, serverMetrics, latestQueryTime);
   }
 
-  private BoundedFCFSScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
-      @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) {
-    super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+  private BoundedFCFSScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+      @Nonnull LongAccumulator latestQueryTime) {
+    super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index 95004c2..92016d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -42,7 +42,7 @@ public class FCFSQueryScheduler extends QueryScheduler {
 
   public FCFSQueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
       @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) {
-    super(queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
+    super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
   }
 
   @Nonnull
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
index fdc4670..7a28bf2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
@@ -59,13 +59,13 @@ public class TokenPriorityScheduler extends PriorityScheduler {
     };
 
     MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
-    return new TokenPriorityScheduler(rm, queryExecutor, queue, metrics, latestQueryTime);
+    return new TokenPriorityScheduler(config, rm, queryExecutor, queue, metrics, latestQueryTime);
   }
 
-  private TokenPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
-      @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics,
+  private TokenPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+      @Nonnull QueryExecutor queryExecutor, @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics,
       @Nonnull LongAccumulator latestQueryTime) {
-    super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+    super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 1bdb81b..06611c2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -231,13 +231,13 @@ public class PrioritySchedulerTest {
     static TestSchedulerGroupFactory groupFactory;
     static LongAccumulator latestQueryTime;
 
-    public static TestPriorityScheduler create(Configuration conf) {
-      ResourceManager rm = new PolicyBasedResourceManager(conf);
+    public static TestPriorityScheduler create(Configuration config) {
+      ResourceManager rm = new PolicyBasedResourceManager(config);
       QueryExecutor qe = new TestQueryExecutor();
       groupFactory = new TestSchedulerGroupFactory();
-      MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(conf, rm, groupFactory, new TableBasedGroupMapper());
+      MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
       latestQueryTime = new LongAccumulator(Long::max, 0);
-      return new TestPriorityScheduler(rm, qe, queue, metrics, latestQueryTime);
+      return new TestPriorityScheduler(config, rm, qe, queue, metrics, latestQueryTime);
     }
 
     public static TestPriorityScheduler create() {
@@ -246,10 +246,10 @@ public class PrioritySchedulerTest {
     }
 
     // store locally for easy access
-    public TestPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
-        @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+    public TestPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+        @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
         @Nonnull LongAccumulator latestQueryTime) {
-      super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+      super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
     }
 
     ResourceManager getResourceManager() {
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
index 48578f5..00db1bd 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
@@ -65,6 +66,7 @@ import static org.mockito.Mockito.when;
 public class ScheduledRequestHandlerTest {
   private static final BrokerRequest DUMMY_BROKER_REQUEST =
       new Pql2Compiler().compileToBrokerRequest("SELECT * FROM myTable_OFFLINE");
+  private static final Configuration DEFAULT_SCHEDULER_CONFIG = new PropertiesConfiguration();
 
   private ServerMetrics serverMetrics;
   private ChannelHandlerContext channelHandlerContext;
@@ -119,8 +121,8 @@ public class ScheduledRequestHandlerTest {
   @Test
   public void testQueryProcessingException()
       throws Exception {
-    ScheduledRequestHandler handler =
-        new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
+    ScheduledRequestHandler handler = new ScheduledRequestHandler(
+        new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
           @Nonnull
           @Override
           public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) {
@@ -159,8 +161,8 @@ public class ScheduledRequestHandlerTest {
   @Test
   public void testValidQueryResponse()
       throws InterruptedException, ExecutionException, TimeoutException, IOException {
-    ScheduledRequestHandler handler =
-        new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
+    ScheduledRequestHandler handler = new ScheduledRequestHandler(
+        new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
           @Nonnull
           @Override
           public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org