You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/10/06 16:20:59 UTC
[pinot] branch master updated: refactor query logging to allow separate log file (#9513)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 749857af4e refactor query logging to allow separate log file (#9513)
749857af4e is described below
commit 749857af4e267d75cb50bff7b6664af93790d25b
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Thu Oct 6 09:20:53 2022 -0700
refactor query logging to allow separate log file (#9513)
* refactor query logging to allow separate log file
* apply pinot style
---
.../images/pinot/etc/conf/pinot-broker-log4j2.xml | 19 ++
.../apache/pinot/broker/querylog/QueryLogger.java | 282 ++++++++++++++++++++
.../requesthandler/BaseBrokerRequestHandler.java | 89 +------
.../pinot/broker/querylog/QueryLoggerTest.java | 287 +++++++++++++++++++++
.../main/resources/conf/pinot-broker-log4j2.xml | 9 +
5 files changed, 610 insertions(+), 76 deletions(-)
diff --git a/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml b/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
index 8716e4ca6b..b1ef2cb883 100644
--- a/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
+++ b/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
@@ -45,6 +45,21 @@
</Policies>
<DefaultRolloverStrategy max="10"/>
</RollingFile>
+ <RollingFile
+ name="querylog"
+ fileName="${env:LOG_DIR}/querylog.log"
+ filePattern="${env:LOG_DIR}/querylog.%d{yyyy-MM-dd}.%i.log.gz"
+ immediateFlush="false">
+ <PatternLayout>
+ <Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
+ </PatternLayout>
+ <Policies>
+ <OnStartupTriggeringPolicy/>
+ <SizeBasedTriggeringPolicy size="20 MB"/>
+ <TimeBasedTriggeringPolicy/>
+ </Policies>
+ <DefaultRolloverStrategy max="10"/>
+ </RollingFile>
</Appenders>
<Loggers>
<Root level="info" additivity="false">
@@ -53,6 +68,10 @@
<!-- Direct most logs to the log file -->
<AppenderRef ref="brokerLog"/>
</Root>
+ <!-- Output querylogs to its own file -->
+ <Logger name="org.apache.pinot.broker.querylog.QueryLogger" level="info" additivity="false">
+ <AppenderRef ref="queryLog"/>
+ </Logger>
<!-- Output broker starter logs to the console -->
<Logger name="org.apache.pinot.broker.broker.helix.HelixBrokerStarter" level="info" additivity="false">
<AppenderRef ref="console"/>
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
new file mode 100644
index 0000000000..1a451b3ea2
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.Broker;
+
+
+/**
+ * {@code QueryLogger} is responsible for logging query responses in a configurable
+ * fashion. Query logging can be useful to capture production traffic to assist with
+ * debugging or regression testing.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class QueryLogger {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogger.class);
+
+ private final int _maxQueryLengthToLog;
+ private final RateLimiter _logRateLimiter;
+ private final boolean _enableIpLogging;
+ private final Logger _logger;
+ private final RateLimiter _droppedLogRateLimiter;
+ private final AtomicLong _numDroppedLogs = new AtomicLong(0L);
+
+ public QueryLogger(PinotConfiguration config) {
+ this(RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
+ Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND)),
+ config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH),
+ config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
+ Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING), LOGGER, RateLimiter.create(1)
+ // log once a second for dropped log count
+ );
+ }
+
+ @VisibleForTesting
+ QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean enableIpLogging, Logger logger,
+ RateLimiter droppedLogRateLimiter) {
+ _logRateLimiter = logRateLimiter;
+ _maxQueryLengthToLog = maxQueryLengthToLog;
+ _enableIpLogging = enableIpLogging;
+ _logger = logger;
+ _droppedLogRateLimiter = droppedLogRateLimiter;
+ }
+
+ public void log(QueryLogParams params) {
+ _logger.debug("Broker Response: {}", params._response);
+
+ if (!(_logRateLimiter.tryAcquire() || shouldForceLog(params))) {
+ _numDroppedLogs.incrementAndGet();
+ return;
+ }
+
+ final StringBuilder queryLogBuilder = new StringBuilder();
+ for (QueryLogEntry value : QueryLogEntry.values()) {
+ value.format(queryLogBuilder, this, params);
+ queryLogBuilder.append(',');
+ }
+
+ // always log the query last - don't add this to the QueryLogEntry enum
+ queryLogBuilder.append("query=").append(StringUtils.substring(params._query, 0, _maxQueryLengthToLog));
+ _logger.info(queryLogBuilder.toString());
+
+ if (_droppedLogRateLimiter.tryAcquire()) {
+ // use getAndSet to 0 so that there will be no race condition between
+ // loggers that increment this counter and this thread
+ long numDroppedLogsSinceLastLog = _numDroppedLogs.getAndSet(0);
+ if (numDroppedLogsSinceLastLog > 0) {
+ _logger.warn("{} logs were dropped. (log max rate per second: {})", numDroppedLogsSinceLastLog,
+ _droppedLogRateLimiter.getRate());
+ }
+ }
+ }
+
+ public int getMaxQueryLengthToLog() {
+ return _maxQueryLengthToLog;
+ }
+
+ public double getLogRateLimit() {
+ return _logRateLimiter.getRate();
+ }
+
+ private boolean shouldForceLog(QueryLogParams params) {
+ return params._response.isNumGroupsLimitReached() || params._response.getExceptionsSize() > 0
+ || params._timeUsedMs > TimeUnit.SECONDS.toMillis(1);
+ }
+
+ public static class QueryLogParams {
+ final long _requestId;
+ final String _query;
+ final RequestContext _requestContext;
+ final String _table;
+ final int _numUnavailableSegments;
+ final BaseBrokerRequestHandler.ServerStats _serverStats;
+ final BrokerResponse _response;
+ final long _timeUsedMs;
+ @Nullable
+ final RequesterIdentity _requester;
+
+ public QueryLogParams(long requestId, String query, RequestContext requestContext, String table,
+ int numUnavailableSegments, BaseBrokerRequestHandler.ServerStats serverStats, BrokerResponse response,
+ long timeUsedMs, @Nullable RequesterIdentity requester) {
+ _requestId = requestId;
+ _query = query;
+ _table = table;
+ _timeUsedMs = timeUsedMs;
+ _requestContext = requestContext;
+ _requester = requester;
+ _response = response;
+ _serverStats = serverStats;
+ _numUnavailableSegments = numUnavailableSegments;
+ }
+ }
+
+ /**
+ * NOTE: please maintain the order of this query log entry enum. If you want to add a new
+ * entry, add it to the end of the existing list.
+ */
+ private enum QueryLogEntry {
+ REQUEST_ID("requestId") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._requestId);
+ }
+ },
+ TABLE("table") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._table);
+ }
+ },
+ TIME_MS("timeMs") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._timeUsedMs);
+ }
+ },
+ DOCS("docs") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getNumDocsScanned()).append('/').append(params._response.getTotalDocs());
+ }
+ },
+ ENTRIES("entries") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getNumEntriesScannedInFilter()).append('/')
+ .append(params._response.getNumEntriesScannedPostFilter());
+ }
+ },
+ SEGMENT_INFO("segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable)",
+ ':') {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getNumSegmentsQueried()).append('/')
+ .append(params._response.getNumSegmentsProcessed()).append('/')
+ .append(params._response.getNumSegmentsMatched()).append('/')
+ .append(params._response.getNumConsumingSegmentsQueried()).append('/')
+ .append(params._response.getNumConsumingSegmentsProcessed()).append('/')
+ .append(params._response.getNumConsumingSegmentsMatched()).append('/')
+ .append(params._numUnavailableSegments);
+ }
+ },
+ CONSUMING_FRESHNESS_MS("consumingFreshnessTimeMs") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getMinConsumingFreshnessTimeMs());
+ }
+ },
+ SERVERS("servers") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getNumServersResponded()).append('/')
+ .append(params._response.getNumServersQueried());
+ }
+ },
+ GROUP_LIMIT_REACHED("groupLimitReached") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.isNumGroupsLimitReached());
+ }
+ },
+ BROKER_REDUCE_TIME_MS("brokerReduceTimeMs") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._requestContext.getReduceTimeMillis());
+ }
+ },
+ EXCEPTIONS("exceptions") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getExceptionsSize());
+ }
+ },
+ SERVER_STATS("serverStats") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._serverStats.getServerStats());
+ }
+ },
+ OFFLINE_THREAD_CPU_TIME("offlineThreadCpuTimeNs(total/thread/sysActivity/resSer)", ':') {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getOfflineTotalCpuTimeNs()).append('/')
+ .append(params._response.getOfflineThreadCpuTimeNs()).append('/')
+ .append(params._response.getOfflineSystemActivitiesCpuTimeNs()).append('/')
+ .append(params._response.getOfflineResponseSerializationCpuTimeNs());
+ }
+ },
+ REALTIME_THREAD_CPU_TIME("realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer)", ':') {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ builder.append(params._response.getRealtimeTotalCpuTimeNs()).append('/')
+ .append(params._response.getRealtimeThreadCpuTimeNs()).append('/')
+ .append(params._response.getRealtimeSystemActivitiesCpuTimeNs()).append('/')
+ .append(params._response.getRealtimeResponseSerializationCpuTimeNs());
+ }
+ },
+ CLIENT_IP("clientIp") {
+ @Override
+ void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ if (logger._enableIpLogging && params._requester != null) {
+ builder.append(params._requester.getClientIp());
+ } else {
+ builder.append(CommonConstants.UNKNOWN);
+ }
+ }
+ };
+
+ public final String _entryName;
+ public final char _separator; // backwards compatibility for the entries that use ':' instead of '='
+
+ QueryLogEntry(String entryName) {
+ this(entryName, '=');
+ }
+
+ QueryLogEntry(String entryName, final char separator) {
+ _entryName = entryName;
+ _separator = separator;
+ }
+
+ abstract void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params);
+
+ void format(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+ // use StringBuilder because the compiler will struggle to turn string complicated
+ // (as part of a loop) string concatenation into StringBuilder, which is significantly
+ // more efficient
+ builder.append(_entryName).append(_separator);
+ doFormat(builder, logger, params);
+ }
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5b27b2da4e..76553f971d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -36,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -48,6 +46,7 @@ import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
@@ -121,13 +120,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
protected final String _brokerId;
protected final long _brokerTimeoutMs;
protected final int _queryResponseLimit;
- protected final int _queryLogLength;
-
- private final RateLimiter _queryLogRateLimiter;
-
- private final RateLimiter _numDroppedLogRateLimiter;
- private final AtomicInteger _numDroppedLog;
+ private final QueryLogger _queryLogger;
private final boolean _disableGroovy;
private final boolean _useApproximateFunction;
private final int _defaultHllLog2m;
@@ -156,19 +150,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
_brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
_queryResponseLimit =
config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
- _queryLogLength =
- config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
- _queryLogRateLimiter = RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
- Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
- _numDroppedLog = new AtomicInteger(0);
- _numDroppedLogRateLimiter = RateLimiter.create(1.0);
+ _queryLogger = new QueryLogger(config);
boolean enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
_queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
LOGGER.info(
"Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
- + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
- _queryLogRateLimiter.getRate(), enableQueryCancellation);
+ + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit,
+ _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation);
}
private String getDefaultBrokerId() {
@@ -560,8 +549,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Send empty response since we don't need to evaluate either offline or realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
// Extract source info from incoming request
- logBrokerResponse(requestId, query, requestContext, tableName, 0, new ServerStats(), brokerResponse,
- System.nanoTime(), requesterIdentity);
+ _queryLogger.log(new QueryLogger.QueryLogParams(
+ requestId, query, requestContext, tableName, 0, new ServerStats(),
+ brokerResponse, System.nanoTime(), requesterIdentity));
return brokerResponse;
}
@@ -719,8 +709,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
TimeUnit.MILLISECONDS);
// Extract source info from incoming request
- logBrokerResponse(requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, brokerResponse,
- totalTimeMs, requesterIdentity);
+ _queryLogger.log(
+ new QueryLogger.QueryLogParams(
+ requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, brokerResponse,
+ totalTimeMs, requesterIdentity));
return brokerResponse;
}
@@ -790,61 +782,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
return TRUE.equals(pinotQuery.getFilterExpression());
}
- private void logBrokerResponse(long requestId, String query, RequestContext requestContext, String tableName,
- int numUnavailableSegments, ServerStats serverStats, BrokerResponseNative brokerResponse, long totalTimeMs,
- @Nullable RequesterIdentity requesterIdentity) {
- LOGGER.debug("Broker Response: {}", brokerResponse);
-
- boolean enableClientIpLogging = _config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
- Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING);
- String clientIp = CommonConstants.UNKNOWN;
- if (enableClientIpLogging && requesterIdentity != null) {
- clientIp = requesterIdentity.getClientIp();
- }
-
- // Please keep the format as name=value comma-separated with no spaces
- // Please keep all the name value pairs together, then followed by the query. To add a new entry, please add it to
- // the end of existing pairs, but before the query.
- if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
- // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
- LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{},"
- + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable):"
- + "{}/{}/{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
- + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
- + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
- + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},clientIp={},query={}", requestId,
- tableName, totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(),
- brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(),
- brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(),
- brokerResponse.getNumSegmentsMatched(), brokerResponse.getNumConsumingSegmentsQueried(),
- brokerResponse.getNumConsumingSegmentsProcessed(), brokerResponse.getNumConsumingSegmentsMatched(),
- numUnavailableSegments, brokerResponse.getMinConsumingFreshnessTimeMs(),
- brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
- brokerResponse.isNumGroupsLimitReached(), requestContext.getReduceTimeMillis(),
- brokerResponse.getExceptionsSize(), serverStats.getServerStats(), brokerResponse.getOfflineTotalCpuTimeNs(),
- brokerResponse.getOfflineThreadCpuTimeNs(), brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
- brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(),
- brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(),
- brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), clientIp,
- StringUtils.substring(query, 0, _queryLogLength));
-
- // Limit the dropping log message at most once per second.
- if (_numDroppedLogRateLimiter.tryAcquire()) {
- // NOTE: the reported number may not be accurate since we will be missing some increments happened between
- // get() and set().
- int numDroppedLog = _numDroppedLog.get();
- if (numDroppedLog > 0) {
- LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
- _queryLogRateLimiter.getRate());
- _numDroppedLog.set(0);
- }
- }
- } else {
- // Increment the count for dropped log
- _numDroppedLog.incrementAndGet();
- }
- }
-
private String getServerTenant(String tableNameWithType) {
TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
if (tableConfig == null) {
@@ -1716,7 +1653,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
/**
* Helper class to pass the per server statistics.
*/
- protected static class ServerStats {
+ public static class ServerStats {
private String _serverStats;
public String getServerStats() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
new file mode 100644
index 0000000000..797999cdf1
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.MockitoAnnotations.openMocks;
+
+
+@SuppressWarnings("UnstableApiUsage")
+public class QueryLoggerTest {
+
+ @Mock
+ RateLimiter _logRateLimiter;
+ @Mock
+ RateLimiter _droppedRateLimiter;
+ @Mock
+ Logger _logger;
+
+ private final List<String> _infoLog = new ArrayList<>();
+ private final List<Long> _numDropped = new ArrayList<>();
+
+ private AutoCloseable _closeMocks;
+
+ @BeforeMethod
+ public void setUp() {
+ _closeMocks = openMocks(this);
+
+ _infoLog.clear();
+ _numDropped.clear();
+
+ Mockito.doAnswer(invocationOnMock -> {
+ _infoLog.add(invocationOnMock.getArgument(0));
+ return null;
+ }).when(_logger).info(Mockito.anyString());
+
+ Mockito.doAnswer(inv -> {
+ _numDropped.add(inv.getArgument(1));
+ return null;
+ }).when(_logger).warn(Mockito.anyString(), Mockito.anyLong(), Mockito.anyDouble());
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ _closeMocks.close();
+ }
+
+ @Test
+ public void shouldFormatLogLineProperly() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
+ QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 1);
+ Assert.assertEquals(_infoLog.get(0), "requestId=123,"
+ + "table=table,"
+ + "timeMs=456,"
+ + "docs=1/2,"
+ + "entries=3/4,"
+ + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable)"
+ + ":5/6/7/8/9/10/24,"
+ + "consumingFreshnessTimeMs=11,"
+ + "servers=12/13,"
+ + "groupLimitReached=false,"
+ + "brokerReduceTimeMs=22,"
+ + "exceptions=0,"
+ + "serverStats=serverStats,"
+ + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):14/15/16/17,"
+ + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):18/19/20/21,"
+ + "clientIp=ip,"
+ + "query=SELECT * FROM foo");
+ }
+
+ @Test
+ public void shouldOmitClientId() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
+ QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 1);
+ Assert.assertFalse(
+ _infoLog.get(0).contains("clientId"),
+ "did not expect to see clientId Logs. Got: " + _infoLog.get(0));
+ }
+
+ @Test
+ public void shouldNotForceLog() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+ QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 0);
+ }
+
+ @Test
+ public void shouldForceLogWhenNumGroupsLimitIsReached() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+ QueryLogger.QueryLogParams params = generateParams(true, 0, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 1);
+ }
+
+ @Test
+ public void shouldForceLogWhenExceptionsExist() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+ QueryLogger.QueryLogParams params = generateParams(false, 1, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 1);
+ }
+
+ @Test
+ public void shouldForceLogWhenTimeIsMoreThanOneSecond() {
+ // Given:
+ Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+ QueryLogger.QueryLogParams params = generateParams(false, 0, 1456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ // When:
+ queryLogger.log(params);
+
+ // Then:
+ Assert.assertEquals(_infoLog.size(), 1);
+ }
+
+ @Test(timeOut = 10_000L)
+ public void shouldHandleRaceConditionsWithDroppedQueries()
+ throws InterruptedException {
+ // Given:
+ // first and third request get dropped
+ final CountDownLatch dropLogLatch = new CountDownLatch(1);
+ final CountDownLatch logLatch = new CountDownLatch(1);
+ Mockito.when(_logRateLimiter.tryAcquire())
+ .thenReturn(false)
+ .thenReturn(true) // this one will block when it hits tryAcquire()
+ .thenReturn(false) // this one just increments the dropped logs
+ .thenAnswer(invocation -> {
+ // this one will unblock the tryAcquire, but only after
+ // the first thread has reached _droppedRateLimiter#tryAcquire()
+ logLatch.await();
+ dropLogLatch.countDown();
+ return true;
+ });
+
+ Mockito.when(_droppedRateLimiter.tryAcquire())
+ .thenAnswer(invocation -> {
+ logLatch.countDown();
+ dropLogLatch.await();
+ return true;
+ }).thenReturn(true);
+
+ QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+ QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+ // When:
+ try {
+ executorService.submit(() -> queryLogger.log(params)); // this one gets dropped
+ executorService.submit(() -> queryLogger.log(params)); // this one succeeds, but blocks
+ executorService.submit(() -> queryLogger.log(params)); // this one gets dropped
+ executorService.submit(() -> queryLogger.log(params)); // this one succeeds, and unblocks (2)
+ } finally {
+ executorService.shutdown();
+ Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS), "expected shutdown to complete");
+ }
+
+ // Then:
+ Assert.assertEquals(_numDropped.size(), 1); // the second successful one never logs to warn
+ Assert.assertEquals((long) _numDropped.get(0), 2L);
+ }
+
+ private QueryLogger.QueryLogParams generateParams(boolean isGroupLimitHit, int numExceptions, long timeUsed) {
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setNumDocsScanned(1);
+ response.setTotalDocs(2);
+ response.setNumEntriesScannedInFilter(3);
+ response.setNumEntriesScannedPostFilter(4);
+ response.setNumSegmentsQueried(5);
+ response.setNumSegmentsProcessed(6);
+ response.setNumSegmentsMatched(7);
+ response.setNumConsumingSegmentsQueried(8);
+ response.setNumConsumingSegmentsProcessed(9);
+ response.setNumConsumingSegmentsMatched(10);
+ response.setMinConsumingFreshnessTimeMs(11);
+ response.setNumServersResponded(12);
+ response.setNumServersQueried(13);
+ response.setNumGroupsLimitReached(isGroupLimitHit);
+ response.setExceptions(
+ IntStream.range(0, numExceptions)
+ .mapToObj(i -> new ProcessingException()).collect(Collectors.toList()));
+ response.setOfflineTotalCpuTimeNs(14);
+ response.setOfflineThreadCpuTimeNs(15);
+ response.setOfflineSystemActivitiesCpuTimeNs(16);
+ response.setOfflineResponseSerializationCpuTimeNs(17);
+ response.setRealtimeTotalCpuTimeNs(18);
+ response.setRealtimeThreadCpuTimeNs(19);
+ response.setRealtimeSystemActivitiesCpuTimeNs(20);
+ response.setRealtimeResponseSerializationCpuTimeNs(21);
+
+ RequestContext request = new DefaultRequestContext();
+ request.setReduceTimeMillis(22);
+
+ BaseBrokerRequestHandler.ServerStats serverStats = new BaseBrokerRequestHandler.ServerStats();
+ serverStats.setServerStats("serverStats");
+ RequesterIdentity identity = new RequesterIdentity() {
+ @Override public String getClientIp() {
+ return "ip";
+ }
+ };
+
+ return new QueryLogger.QueryLogParams(
+ 123,
+ "SELECT * FROM foo",
+ request,
+ "table",
+ 24,
+ serverStats,
+ response,
+ timeUsed,
+ identity
+ );
+ }
+}
diff --git a/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml b/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
index a2b12a810c..190c3fe1be 100644
--- a/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
+++ b/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
@@ -31,6 +31,11 @@
<Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
</PatternLayout>
</RandomAccessFile>
+ <RandomAccessFile name="querylog" fileName="querylog.log" immediateFlush="false">
+ <PatternLayout>
+ <Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
+ </PatternLayout>
+ </RandomAccessFile>
</Appenders>
<Loggers>
<Root level="info" additivity="false">
@@ -39,6 +44,10 @@
<!-- Direct most logs to the log file -->
<AppenderRef ref="brokerLog"/>
</Root>
+ <!-- Output query logs to a dedicated file-->
+ <Logger name="org.apache.pinot.broker.querylog.QueryLogger" level="debug" additivity="false">
+ <AppenderRef ref="querylog"/>
+ </Logger>
<!-- Output broker starter logs to the console -->
<Logger name="org.apache.pinot.broker.broker.helix.HelixBrokerStarter" level="info" additivity="false">
<AppenderRef ref="console"/>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org