You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/03/08 02:24:40 UTC
[incubator-pinot] branch master updated: Adding the support for
sampling logs (#3913)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d684d5e Adding the support for sampling logs (#3913)
d684d5e is described below
commit d684d5eb26e6a9b01a2db4505267967fe68d3924
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Mar 7 18:24:35 2019 -0800
Adding the support for sampling logs (#3913)
* Adding the support for sampling logs
For high throughput use cases, logging becomes the bottleneck
on both server and broker. "Category.callAppenders()" is using
a synchronized block that all the worker threads can be stalled
due to logging. Long term solution should be migration towards
log4j2. As a short term fix, this pr adds the configuration for
sampling logs.
* Change to use maxRate concept, added forceLog() to ensure some outliers to get logged
* Addressed comments and added log for reporting dropped log
* Added dropped log for server side as well
---
.../requesthandler/BaseBrokerRequestHandler.java | 80 ++++++++++++++++++----
.../apache/pinot/common/utils/CommonConstants.java | 2 +
.../core/query/scheduler/PriorityScheduler.java | 8 ++-
.../pinot/core/query/scheduler/QueryScheduler.java | 72 ++++++++++++++++---
.../query/scheduler/fcfs/BoundedFCFSScheduler.java | 9 +--
.../query/scheduler/fcfs/FCFSQueryScheduler.java | 2 +-
.../tokenbucket/TokenPriorityScheduler.java | 8 +--
.../query/scheduler/PrioritySchedulerTest.java | 14 ++--
.../request/ScheduledRequestHandlerTest.java | 10 +--
9 files changed, 159 insertions(+), 46 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 47d478a..20d384e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -20,12 +20,14 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -82,6 +84,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final int _queryResponseLimit;
protected final int _queryLogLength;
+ private final RateLimiter _queryLogRateLimiter;
+ private final RateLimiter _numDroppedLogRateLimiter;
+ private final AtomicInteger _numDroppedLog;
+
public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
TableQueryQuotaManager tableQueryQuotaManager, BrokerMetrics brokerMetrics) {
@@ -96,9 +102,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
_queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH);
+ _queryLogRateLimiter = RateLimiter.create(
+ config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
+
+ _numDroppedLog = new AtomicInteger(0);
+ _numDroppedLogRateLimiter = RateLimiter.create(1.0);
- LOGGER.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}", _brokerId,
- _brokerTimeoutMs, _queryResponseLimit, _queryLogLength);
+ LOGGER.info(
+ "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+ _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
}
private String getDefaultBrokerId() {
@@ -110,6 +122,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
}
}
+ @SuppressWarnings("Duplicates")
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestStatistics requestStatistics)
@@ -292,22 +305,61 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
LOGGER.debug("Broker Response: {}", brokerResponse);
- // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
- LOGGER.info(
- "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
- + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
- brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
- brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
- brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
- brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
- brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
- brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
- StringUtils.substring(query, 0, _queryLogLength));
-
+ if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
+ // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
+ LOGGER.info(
+ "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
+ + "servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
+ brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
+ brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
+ brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
+ brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
+ brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
+ brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
+ StringUtils.substring(query, 0, _queryLogLength));
+
+ // Limit the dropping log message at most once per second.
+ if (_numDroppedLogRateLimiter.tryAcquire()) {
+ // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+ // get() and set().
+ int numDroppedLog = _numDroppedLog.get();
+ if (numDroppedLog > 0) {
+ LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+ _queryLogRateLimiter.getRate());
+ _numDroppedLog.set(0);
+ }
+ }
+ } else {
+ // Increment the count for dropped log
+ _numDroppedLog.incrementAndGet();
+ }
return brokerResponse;
}
/**
+ * Helper function to decide whether to force the log
+ *
+ * TODO: come up with other criteria for forcing a log and come up with better numbers
+ *
+ */
+ private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
+ if (brokerResponse.isNumGroupsLimitReached()) {
+ return true;
+ }
+
+ if (brokerResponse.getExceptionsSize() > 0) {
+ return true;
+ }
+
+ // If response time is more than 1 sec, force the log
+ if (totalTimeMs > 1000L) {
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
* Broker side validation on the broker request.
* <p>Throw RuntimeException if query does not pass validation.
* <p>Current validations are:
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 0e6f5a5..b76f2b8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,6 +111,8 @@ public class CommonConstants {
public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
+ public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond";
+ public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d;
public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
index afb2831..5f968b8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/PriorityScheduler.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -53,9 +54,10 @@ public abstract class PriorityScheduler extends QueryScheduler {
@VisibleForTesting
Thread scheduler;
- public PriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
- @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) {
- super(queryExecutor, resourceManager, metrics, latestQueryTime);
+ public PriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+ @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+ @Nonnull LongAccumulator latestQueryTime) {
+ super(config, queryExecutor, resourceManager, metrics, latestQueryTime);
Preconditions.checkNotNull(queue);
this.queryQueue = queue;
this.numRunners = resourceManager.getNumQueryRunnerThreads();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 82d2b5d..627cda7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,11 +22,14 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.RateLimiter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -48,8 +51,15 @@ import org.slf4j.LoggerFactory;
*/
public abstract class QueryScheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(QueryScheduler.class);
+
private static final String INVALID_NUM_SCANNED = "-1";
private static final String INVALID_SEGMENTS_COUNT = "-1";
+ private static final String QUERY_LOG_MAX_RATE_KEY = "query.log.maxRatePerSecond";
+ private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
+
+ private final RateLimiter queryLogRateLimiter;
+ private final RateLimiter numDroppedLogRateLimiter;
+ private final AtomicInteger numDroppedLogCounter;
protected final ServerMetrics serverMetrics;
protected final QueryExecutor queryExecutor;
@@ -63,8 +73,10 @@ public abstract class QueryScheduler {
* @param resourceManager for managing server thread resources
* @param serverMetrics server metrics collector
*/
- public QueryScheduler(@Nonnull QueryExecutor queryExecutor, @Nonnull ResourceManager resourceManager,
- @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) {
+ public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
+ @Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics,
+ @Nonnull LongAccumulator latestQueryTime) {
+ Preconditions.checkNotNull(config);
Preconditions.checkNotNull(queryExecutor);
Preconditions.checkNotNull(resourceManager);
Preconditions.checkNotNull(serverMetrics);
@@ -73,6 +85,11 @@ public abstract class QueryScheduler {
this.resourceManager = resourceManager;
this.queryExecutor = queryExecutor;
this.latestQueryTime = latestQueryTime;
+ this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
+ this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
+ this.numDroppedLogCounter = new AtomicInteger(0);
+
+ LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
}
/**
@@ -122,6 +139,7 @@ public abstract class QueryScheduler {
* @param executorService Executor service to use for parallelizing query processing
* @return serialized query response
*/
+ @SuppressWarnings("Duplicates")
@Nullable
protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryRequest,
@Nonnull ExecutorService executorService) {
@@ -170,13 +188,30 @@ public abstract class QueryScheduler {
TimerContext timerContext = queryRequest.getTimerContext();
int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
- LOGGER.info(
- "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
- requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
- timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT),
- timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
- timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned,
- numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+ long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
+
+ if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
+ LOGGER.info(
+ "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
+ requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
+ timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
+ timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
+ numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+
+ // Limit the dropping log message at most once per second.
+ if (numDroppedLogRateLimiter.tryAcquire()) {
+ // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+ // get() and set().
+ int numDroppedLog = numDroppedLogCounter.get();
+ if (numDroppedLog > 0) {
+ LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+ queryLogRateLimiter.getRate());
+ numDroppedLogCounter.set(0);
+ }
+ }
+ } else {
+ numDroppedLogCounter.incrementAndGet();
+ }
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_PROCESSED, numSegmentsProcessed);
@@ -186,6 +221,25 @@ public abstract class QueryScheduler {
}
/**
+ * Helper function to decide whether to force the log
+ *
+ * TODO: come up with other criteria for forcing a log and come up with better numbers
+ *
+ */
+ private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+ // If scheduler wait time is larger than 100ms, force the log
+ if (schedulerWaitMs > 100L) {
+ return true;
+ }
+
+ // If the number of document scanned is larger than 1 million rows, force the log
+ if (numDocsScanned > 1_000_000L) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Serialize the DataTable response for query request
* @param queryRequest Server query request for which response is serialized
* @param dataTable DataTable to serialize
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
index 8a425b1..301fb06 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/BoundedFCFSScheduler.java
@@ -53,12 +53,13 @@ public class BoundedFCFSScheduler extends PriorityScheduler {
}
};
MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
- return new BoundedFCFSScheduler(rm, queryExecutor, queue, serverMetrics, latestQueryTime);
+ return new BoundedFCFSScheduler(config, rm, queryExecutor, queue, serverMetrics, latestQueryTime);
}
- private BoundedFCFSScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
- @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics, @Nonnull LongAccumulator latestQueryTime) {
- super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+ private BoundedFCFSScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+ @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+ @Nonnull LongAccumulator latestQueryTime) {
+ super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
index 95004c2..92016d6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/fcfs/FCFSQueryScheduler.java
@@ -42,7 +42,7 @@ public class FCFSQueryScheduler extends QueryScheduler {
public FCFSQueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
@Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) {
- super(queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
+ super(config, queryExecutor, new UnboundedResourceManager(config), serverMetrics, latestQueryTime);
}
@Nonnull
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
index fdc4670..7a28bf2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/tokenbucket/TokenPriorityScheduler.java
@@ -59,13 +59,13 @@ public class TokenPriorityScheduler extends PriorityScheduler {
};
MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
- return new TokenPriorityScheduler(rm, queryExecutor, queue, metrics, latestQueryTime);
+ return new TokenPriorityScheduler(config, rm, queryExecutor, queue, metrics, latestQueryTime);
}
- private TokenPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
- @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics,
+ private TokenPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+ @Nonnull QueryExecutor queryExecutor, @Nonnull MultiLevelPriorityQueue queue, @Nonnull ServerMetrics metrics,
@Nonnull LongAccumulator latestQueryTime) {
- super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+ super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
index 1bdb81b..06611c2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/PrioritySchedulerTest.java
@@ -231,13 +231,13 @@ public class PrioritySchedulerTest {
static TestSchedulerGroupFactory groupFactory;
static LongAccumulator latestQueryTime;
- public static TestPriorityScheduler create(Configuration conf) {
- ResourceManager rm = new PolicyBasedResourceManager(conf);
+ public static TestPriorityScheduler create(Configuration config) {
+ ResourceManager rm = new PolicyBasedResourceManager(config);
QueryExecutor qe = new TestQueryExecutor();
groupFactory = new TestSchedulerGroupFactory();
- MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(conf, rm, groupFactory, new TableBasedGroupMapper());
+ MultiLevelPriorityQueue queue = new MultiLevelPriorityQueue(config, rm, groupFactory, new TableBasedGroupMapper());
latestQueryTime = new LongAccumulator(Long::max, 0);
- return new TestPriorityScheduler(rm, qe, queue, metrics, latestQueryTime);
+ return new TestPriorityScheduler(config, rm, qe, queue, metrics, latestQueryTime);
}
public static TestPriorityScheduler create() {
@@ -246,10 +246,10 @@ public class PrioritySchedulerTest {
}
// store locally for easy access
- public TestPriorityScheduler(@Nonnull ResourceManager resourceManager, @Nonnull QueryExecutor queryExecutor,
- @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
+ public TestPriorityScheduler(@Nonnull Configuration config, @Nonnull ResourceManager resourceManager,
+ @Nonnull QueryExecutor queryExecutor, @Nonnull SchedulerPriorityQueue queue, @Nonnull ServerMetrics metrics,
@Nonnull LongAccumulator latestQueryTime) {
- super(resourceManager, queryExecutor, queue, metrics, latestQueryTime);
+ super(config, resourceManager, queryExecutor, queue, metrics, latestQueryTime);
}
ResourceManager getResourceManager() {
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
index 48578f5..00db1bd 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/request/ScheduledRequestHandlerTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAccumulator;
import javax.annotation.Nonnull;
+import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
@@ -65,6 +66,7 @@ import static org.mockito.Mockito.when;
public class ScheduledRequestHandlerTest {
private static final BrokerRequest DUMMY_BROKER_REQUEST =
new Pql2Compiler().compileToBrokerRequest("SELECT * FROM myTable_OFFLINE");
+ private static final Configuration DEFAULT_SCHEDULER_CONFIG = new PropertiesConfiguration();
private ServerMetrics serverMetrics;
private ChannelHandlerContext channelHandlerContext;
@@ -119,8 +121,8 @@ public class ScheduledRequestHandlerTest {
@Test
public void testQueryProcessingException()
throws Exception {
- ScheduledRequestHandler handler =
- new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
+ ScheduledRequestHandler handler = new ScheduledRequestHandler(
+ new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
@Nonnull
@Override
public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) {
@@ -159,8 +161,8 @@ public class ScheduledRequestHandlerTest {
@Test
public void testValidQueryResponse()
throws InterruptedException, ExecutionException, TimeoutException, IOException {
- ScheduledRequestHandler handler =
- new ScheduledRequestHandler(new QueryScheduler(queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
+ ScheduledRequestHandler handler = new ScheduledRequestHandler(
+ new QueryScheduler(DEFAULT_SCHEDULER_CONFIG, queryExecutor, resourceManager, serverMetrics, latestQueryTime) {
@Nonnull
@Override
public ListenableFuture<byte[]> submit(@Nonnull ServerQueryRequest queryRequest) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org