You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2022/10/06 16:20:59 UTC

[pinot] branch master updated: refactor query logging to allow separate log file (#9513)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 749857af4e refactor query logging to allow separate log file (#9513)
749857af4e is described below

commit 749857af4e267d75cb50bff7b6664af93790d25b
Author: Almog Gavra <al...@gmail.com>
AuthorDate: Thu Oct 6 09:20:53 2022 -0700

    refactor query logging to allow separate log file (#9513)
    
    * refactor query logging to allow separate log file
    
    * apply pinot style
---
 .../images/pinot/etc/conf/pinot-broker-log4j2.xml  |  19 ++
 .../apache/pinot/broker/querylog/QueryLogger.java  | 282 ++++++++++++++++++++
 .../requesthandler/BaseBrokerRequestHandler.java   |  89 +------
 .../pinot/broker/querylog/QueryLoggerTest.java     | 287 +++++++++++++++++++++
 .../main/resources/conf/pinot-broker-log4j2.xml    |   9 +
 5 files changed, 610 insertions(+), 76 deletions(-)

diff --git a/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml b/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
index 8716e4ca6b..b1ef2cb883 100644
--- a/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
+++ b/docker/images/pinot/etc/conf/pinot-broker-log4j2.xml
@@ -45,6 +45,21 @@
       </Policies>
       <DefaultRolloverStrategy max="10"/>
     </RollingFile>
+    <RollingFile
+        name="querylog"
+        fileName="${env:LOG_DIR}/querylog.log"
+        filePattern="${env:LOG_DIR}/querylog.%d{yyyy-MM-dd}.%i.log.gz"
+        immediateFlush="false">
+      <PatternLayout>
+        <Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
+      </PatternLayout>
+      <Policies>
+        <OnStartupTriggeringPolicy/>
+        <SizeBasedTriggeringPolicy size="20 MB"/>
+        <TimeBasedTriggeringPolicy/>
+      </Policies>
+      <DefaultRolloverStrategy max="10"/>
+    </RollingFile>
   </Appenders>
   <Loggers>
     <Root level="info" additivity="false">
@@ -53,6 +68,10 @@
       <!-- Direct most logs to the log file -->
       <AppenderRef ref="brokerLog"/>
     </Root>
+    <!-- Output querylogs to its own file -->
+    <Logger name="org.apache.pinot.broker.querylog.QueryLogger" level="info" additivity="false">
+      <AppenderRef ref="queryLog"/>
+    </Logger>
     <!-- Output broker starter logs to the console -->
     <Logger name="org.apache.pinot.broker.broker.helix.HelixBrokerStarter" level="info" additivity="false">
       <AppenderRef ref="console"/>
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
new file mode 100644
index 0000000000..1a451b3ea2
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogger.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.pinot.spi.utils.CommonConstants.Broker;
+
+
+/**
+ * {@code QueryLogger} is responsible for logging query responses in a configurable
+ * fashion. Query logging can be useful to capture production traffic to assist with
+ * debugging or regression testing.
+ */
+@SuppressWarnings("UnstableApiUsage")
+public class QueryLogger {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(QueryLogger.class);
+
+  private final int _maxQueryLengthToLog;
+  private final RateLimiter _logRateLimiter;
+  private final boolean _enableIpLogging;
+  private final Logger _logger;
+  private final RateLimiter _droppedLogRateLimiter;
+  private final AtomicLong _numDroppedLogs = new AtomicLong(0L);
+
+  public QueryLogger(PinotConfiguration config) {
+    this(RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
+            Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND)),
+        config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH),
+        config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
+            Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING), LOGGER, RateLimiter.create(1)
+        // log once a second for dropped log count
+    );
+  }
+
+  @VisibleForTesting
+  QueryLogger(RateLimiter logRateLimiter, int maxQueryLengthToLog, boolean enableIpLogging, Logger logger,
+      RateLimiter droppedLogRateLimiter) {
+    _logRateLimiter = logRateLimiter;
+    _maxQueryLengthToLog = maxQueryLengthToLog;
+    _enableIpLogging = enableIpLogging;
+    _logger = logger;
+    _droppedLogRateLimiter = droppedLogRateLimiter;
+  }
+
+  public void log(QueryLogParams params) {
+    _logger.debug("Broker Response: {}", params._response);
+
+    if (!(_logRateLimiter.tryAcquire() || shouldForceLog(params))) {
+      _numDroppedLogs.incrementAndGet();
+      return;
+    }
+
+    final StringBuilder queryLogBuilder = new StringBuilder();
+    for (QueryLogEntry value : QueryLogEntry.values()) {
+      value.format(queryLogBuilder, this, params);
+      queryLogBuilder.append(',');
+    }
+
+    // always log the query last - don't add this to the QueryLogEntry enum
+    queryLogBuilder.append("query=").append(StringUtils.substring(params._query, 0, _maxQueryLengthToLog));
+    _logger.info(queryLogBuilder.toString());
+
+    if (_droppedLogRateLimiter.tryAcquire()) {
+      // use getAndSet to 0 so that there will be no race condition between
+      // loggers that increment this counter and this thread
+      long numDroppedLogsSinceLastLog = _numDroppedLogs.getAndSet(0);
+      if (numDroppedLogsSinceLastLog > 0) {
+        _logger.warn("{} logs were dropped. (log max rate per second: {})", numDroppedLogsSinceLastLog,
+            _droppedLogRateLimiter.getRate());
+      }
+    }
+  }
+
+  public int getMaxQueryLengthToLog() {
+    return _maxQueryLengthToLog;
+  }
+
+  public double getLogRateLimit() {
+    return _logRateLimiter.getRate();
+  }
+
+  private boolean shouldForceLog(QueryLogParams params) {
+    return params._response.isNumGroupsLimitReached() || params._response.getExceptionsSize() > 0
+        || params._timeUsedMs > TimeUnit.SECONDS.toMillis(1);
+  }
+
+  public static class QueryLogParams {
+    final long _requestId;
+    final String _query;
+    final RequestContext _requestContext;
+    final String _table;
+    final int _numUnavailableSegments;
+    final BaseBrokerRequestHandler.ServerStats _serverStats;
+    final BrokerResponse _response;
+    final long _timeUsedMs;
+    @Nullable
+    final RequesterIdentity _requester;
+
+    public QueryLogParams(long requestId, String query, RequestContext requestContext, String table,
+        int numUnavailableSegments, BaseBrokerRequestHandler.ServerStats serverStats, BrokerResponse response,
+        long timeUsedMs, @Nullable RequesterIdentity requester) {
+      _requestId = requestId;
+      _query = query;
+      _table = table;
+      _timeUsedMs = timeUsedMs;
+      _requestContext = requestContext;
+      _requester = requester;
+      _response = response;
+      _serverStats = serverStats;
+      _numUnavailableSegments = numUnavailableSegments;
+    }
+  }
+
+  /**
+   * NOTE: please maintain the order of this query log entry enum. If you want to add a new
+   * entry, add it to the end of the existing list.
+   */
+  private enum QueryLogEntry {
+    REQUEST_ID("requestId") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._requestId);
+      }
+    },
+    TABLE("table") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._table);
+      }
+    },
+    TIME_MS("timeMs") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._timeUsedMs);
+      }
+    },
+    DOCS("docs") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getNumDocsScanned()).append('/').append(params._response.getTotalDocs());
+      }
+    },
+    ENTRIES("entries") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getNumEntriesScannedInFilter()).append('/')
+            .append(params._response.getNumEntriesScannedPostFilter());
+      }
+    },
+    SEGMENT_INFO("segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable)",
+        ':') {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getNumSegmentsQueried()).append('/')
+            .append(params._response.getNumSegmentsProcessed()).append('/')
+            .append(params._response.getNumSegmentsMatched()).append('/')
+            .append(params._response.getNumConsumingSegmentsQueried()).append('/')
+            .append(params._response.getNumConsumingSegmentsProcessed()).append('/')
+            .append(params._response.getNumConsumingSegmentsMatched()).append('/')
+            .append(params._numUnavailableSegments);
+      }
+    },
+    CONSUMING_FRESHNESS_MS("consumingFreshnessTimeMs") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getMinConsumingFreshnessTimeMs());
+      }
+    },
+    SERVERS("servers") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getNumServersResponded()).append('/')
+            .append(params._response.getNumServersQueried());
+      }
+    },
+    GROUP_LIMIT_REACHED("groupLimitReached") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.isNumGroupsLimitReached());
+      }
+    },
+    BROKER_REDUCE_TIME_MS("brokerReduceTimeMs") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._requestContext.getReduceTimeMillis());
+      }
+    },
+    EXCEPTIONS("exceptions") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getExceptionsSize());
+      }
+    },
+    SERVER_STATS("serverStats") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._serverStats.getServerStats());
+      }
+    },
+    OFFLINE_THREAD_CPU_TIME("offlineThreadCpuTimeNs(total/thread/sysActivity/resSer)", ':') {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getOfflineTotalCpuTimeNs()).append('/')
+            .append(params._response.getOfflineThreadCpuTimeNs()).append('/')
+            .append(params._response.getOfflineSystemActivitiesCpuTimeNs()).append('/')
+            .append(params._response.getOfflineResponseSerializationCpuTimeNs());
+      }
+    },
+    REALTIME_THREAD_CPU_TIME("realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer)", ':') {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        builder.append(params._response.getRealtimeTotalCpuTimeNs()).append('/')
+            .append(params._response.getRealtimeThreadCpuTimeNs()).append('/')
+            .append(params._response.getRealtimeSystemActivitiesCpuTimeNs()).append('/')
+            .append(params._response.getRealtimeResponseSerializationCpuTimeNs());
+      }
+    },
+    CLIENT_IP("clientIp") {
+      @Override
+      void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+        if (logger._enableIpLogging && params._requester != null) {
+          builder.append(params._requester.getClientIp());
+        } else {
+          builder.append(CommonConstants.UNKNOWN);
+        }
+      }
+    };
+
+    public final String _entryName;
+    public final char _separator; // backwards compatibility for the entries that use ':' instead of '='
+
+    QueryLogEntry(String entryName) {
+      this(entryName, '=');
+    }
+
+    QueryLogEntry(String entryName, final char separator) {
+      _entryName = entryName;
+      _separator = separator;
+    }
+
+    abstract void doFormat(StringBuilder builder, QueryLogger logger, QueryLogParams params);
+
+    void format(StringBuilder builder, QueryLogger logger, QueryLogParams params) {
+      // use StringBuilder because the compiler will struggle to turn string complicated
+      // (as part of a loop) string concatenation into StringBuilder, which is significantly
+      // more efficient
+      builder.append(_entryName).append(_separator);
+      doFormat(builder, logger, params);
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5b27b2da4e..76553f971d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -21,7 +21,6 @@ package org.apache.pinot.broker.requesthandler;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -36,7 +35,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -48,6 +46,7 @@ import org.apache.commons.httpclient.methods.DeleteMethod;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.broker.querylog.QueryLogger;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.config.provider.TableCache;
@@ -121,13 +120,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final String _brokerId;
   protected final long _brokerTimeoutMs;
   protected final int _queryResponseLimit;
-  protected final int _queryLogLength;
-
-  private final RateLimiter _queryLogRateLimiter;
-
-  private final RateLimiter _numDroppedLogRateLimiter;
-  private final AtomicInteger _numDroppedLog;
 
+  private final QueryLogger _queryLogger;
   private final boolean _disableGroovy;
   private final boolean _useApproximateFunction;
   private final int _defaultHllLog2m;
@@ -156,19 +150,14 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _brokerTimeoutMs = config.getProperty(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit =
         config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
-    _queryLogLength =
-        config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
-    _queryLogRateLimiter = RateLimiter.create(config.getProperty(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
-        Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
-    _numDroppedLog = new AtomicInteger(0);
-    _numDroppedLogRateLimiter = RateLimiter.create(1.0);
+    _queryLogger = new QueryLogger(config);
     boolean enableQueryCancellation =
         Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
     _queriesById = enableQueryCancellation ? new ConcurrentHashMap<>() : null;
     LOGGER.info(
         "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps, "
-            + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
-        _queryLogRateLimiter.getRate(), enableQueryCancellation);
+            + "enabling query cancellation: {}", _brokerId, _brokerTimeoutMs, _queryResponseLimit,
+        _queryLogger.getMaxQueryLengthToLog(), _queryLogger.getLogRateLimit(), enableQueryCancellation);
   }
 
   private String getDefaultBrokerId() {
@@ -560,8 +549,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       // Send empty response since we don't need to evaluate either offline or realtime request.
       BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
       // Extract source info from incoming request
-      logBrokerResponse(requestId, query, requestContext, tableName, 0, new ServerStats(), brokerResponse,
-          System.nanoTime(), requesterIdentity);
+      _queryLogger.log(new QueryLogger.QueryLogParams(
+          requestId, query, requestContext, tableName, 0, new ServerStats(),
+          brokerResponse, System.nanoTime(), requesterIdentity));
       return brokerResponse;
     }
 
@@ -719,8 +709,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         TimeUnit.MILLISECONDS);
 
     // Extract source info from incoming request
-    logBrokerResponse(requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, brokerResponse,
-        totalTimeMs, requesterIdentity);
+    _queryLogger.log(
+        new QueryLogger.QueryLogParams(
+            requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, brokerResponse,
+            totalTimeMs, requesterIdentity));
     return brokerResponse;
   }
 
@@ -790,61 +782,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     return TRUE.equals(pinotQuery.getFilterExpression());
   }
 
-  private void logBrokerResponse(long requestId, String query, RequestContext requestContext, String tableName,
-      int numUnavailableSegments, ServerStats serverStats, BrokerResponseNative brokerResponse, long totalTimeMs,
-      @Nullable RequesterIdentity requesterIdentity) {
-    LOGGER.debug("Broker Response: {}", brokerResponse);
-
-    boolean enableClientIpLogging = _config.getProperty(Broker.CONFIG_OF_BROKER_REQUEST_CLIENT_IP_LOGGING,
-        Broker.DEFAULT_BROKER_REQUEST_CLIENT_IP_LOGGING);
-    String clientIp = CommonConstants.UNKNOWN;
-    if (enableClientIpLogging && requesterIdentity != null) {
-      clientIp = requesterIdentity.getClientIp();
-    }
-
-    // Please keep the format as name=value comma-separated with no spaces
-    // Please keep all the name value pairs together, then followed by the query. To add a new entry, please add it to
-    // the end of existing pairs, but before the query.
-    if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
-      // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
-      LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{},"
-              + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable):"
-              + "{}/{}/{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
-              + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
-              + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
-              + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},clientIp={},query={}", requestId,
-          tableName, totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(),
-          brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(),
-          brokerResponse.getNumSegmentsQueried(), brokerResponse.getNumSegmentsProcessed(),
-          brokerResponse.getNumSegmentsMatched(), brokerResponse.getNumConsumingSegmentsQueried(),
-          brokerResponse.getNumConsumingSegmentsProcessed(), brokerResponse.getNumConsumingSegmentsMatched(),
-          numUnavailableSegments, brokerResponse.getMinConsumingFreshnessTimeMs(),
-          brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
-          brokerResponse.isNumGroupsLimitReached(), requestContext.getReduceTimeMillis(),
-          brokerResponse.getExceptionsSize(), serverStats.getServerStats(), brokerResponse.getOfflineTotalCpuTimeNs(),
-          brokerResponse.getOfflineThreadCpuTimeNs(), brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
-          brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(),
-          brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(),
-          brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), clientIp,
-          StringUtils.substring(query, 0, _queryLogLength));
-
-      // Limit the dropping log message at most once per second.
-      if (_numDroppedLogRateLimiter.tryAcquire()) {
-        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
-        // get() and set().
-        int numDroppedLog = _numDroppedLog.get();
-        if (numDroppedLog > 0) {
-          LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
-              _queryLogRateLimiter.getRate());
-          _numDroppedLog.set(0);
-        }
-      }
-    } else {
-      // Increment the count for dropped log
-      _numDroppedLog.incrementAndGet();
-    }
-  }
-
   private String getServerTenant(String tableNameWithType) {
     TableConfig tableConfig = _tableCache.getTableConfig(tableNameWithType);
     if (tableConfig == null) {
@@ -1716,7 +1653,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   /**
    * Helper class to pass the per server statistics.
    */
-  protected static class ServerStats {
+  public static class ServerStats {
     private String _serverStats;
 
     public String getServerStats() {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
new file mode 100644
index 0000000000..797999cdf1
--- /dev/null
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/querylog/QueryLoggerTest.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.util.concurrent.RateLimiter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.spi.trace.DefaultRequestContext;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.MockitoAnnotations.openMocks;
+
+
+@SuppressWarnings("UnstableApiUsage")
+public class QueryLoggerTest {
+
+  @Mock
+  RateLimiter _logRateLimiter;
+  @Mock
+  RateLimiter _droppedRateLimiter;
+  @Mock
+  Logger _logger;
+
+  private final List<String> _infoLog = new ArrayList<>();
+  private final List<Long> _numDropped = new ArrayList<>();
+
+  private AutoCloseable _closeMocks;
+
+  @BeforeMethod
+  public void setUp() {
+    _closeMocks = openMocks(this);
+
+    _infoLog.clear();
+    _numDropped.clear();
+
+    Mockito.doAnswer(invocationOnMock -> {
+      _infoLog.add(invocationOnMock.getArgument(0));
+      return null;
+    }).when(_logger).info(Mockito.anyString());
+
+    Mockito.doAnswer(inv -> {
+      _numDropped.add(inv.getArgument(1));
+      return null;
+    }).when(_logger).warn(Mockito.anyString(), Mockito.anyLong(), Mockito.anyDouble());
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    _closeMocks.close();
+  }
+
+  @Test
+  public void shouldFormatLogLineProperly() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
+    QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 1);
+    Assert.assertEquals(_infoLog.get(0), "requestId=123,"
+        + "table=table,"
+        + "timeMs=456,"
+        + "docs=1/2,"
+        + "entries=3/4,"
+        + "segments(queried/processed/matched/consumingQueried/consumingProcessed/consumingMatched/unavailable)"
+        + ":5/6/7/8/9/10/24,"
+        + "consumingFreshnessTimeMs=11,"
+        + "servers=12/13,"
+        + "groupLimitReached=false,"
+        + "brokerReduceTimeMs=22,"
+        + "exceptions=0,"
+        + "serverStats=serverStats,"
+        + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):14/15/16/17,"
+        + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):18/19/20/21,"
+        + "clientIp=ip,"
+        + "query=SELECT * FROM foo");
+  }
+
+  @Test
+  public void shouldOmitClientId() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(true);
+    QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, false, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 1);
+    Assert.assertFalse(
+        _infoLog.get(0).contains("clientId"),
+        "did not expect to see clientId Logs. Got: " + _infoLog.get(0));
+  }
+
+  @Test
+  public void shouldNotForceLog() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+    QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 0);
+  }
+
+  @Test
+  public void shouldForceLogWhenNumGroupsLimitIsReached() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+    QueryLogger.QueryLogParams params = generateParams(true, 0, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 1);
+  }
+
+  @Test
+  public void shouldForceLogWhenExceptionsExist() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+    QueryLogger.QueryLogParams params = generateParams(false, 1, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 1);
+  }
+
+  @Test
+  public void shouldForceLogWhenTimeIsMoreThanOneSecond() {
+    // Given:
+    Mockito.when(_logRateLimiter.tryAcquire()).thenReturn(false);
+    QueryLogger.QueryLogParams params = generateParams(false, 0, 1456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    // When:
+    queryLogger.log(params);
+
+    // Then:
+    Assert.assertEquals(_infoLog.size(), 1);
+  }
+
+  @Test(timeOut = 10_000L)
+  public void shouldHandleRaceConditionsWithDroppedQueries()
+      throws InterruptedException {
+    // Given:
+    // first and third request get dropped
+    final CountDownLatch dropLogLatch = new CountDownLatch(1);
+    final CountDownLatch logLatch = new CountDownLatch(1);
+    Mockito.when(_logRateLimiter.tryAcquire())
+        .thenReturn(false)
+        .thenReturn(true)   // this one will block when it hits tryAcquire()
+        .thenReturn(false)  // this one just increments the dropped logs
+        .thenAnswer(invocation -> {
+          // this one will unblock the tryAcquire, but only after
+          // the first thread has reached _droppedRateLimiter#tryAcquire()
+          logLatch.await();
+          dropLogLatch.countDown();
+          return true;
+        });
+
+    Mockito.when(_droppedRateLimiter.tryAcquire())
+        .thenAnswer(invocation -> {
+          logLatch.countDown();
+          dropLogLatch.await();
+          return true;
+        }).thenReturn(true);
+
+    QueryLogger.QueryLogParams params = generateParams(false, 0, 456);
+    QueryLogger queryLogger = new QueryLogger(_logRateLimiter, 100, true, _logger, _droppedRateLimiter);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(4);
+
+    // When:
+    try {
+      executorService.submit(() -> queryLogger.log(params)); // this one gets dropped
+      executorService.submit(() -> queryLogger.log(params)); // this one succeeds, but blocks
+      executorService.submit(() -> queryLogger.log(params)); // this one gets dropped
+      executorService.submit(() -> queryLogger.log(params)); // this one succeeds, and unblocks (2)
+    } finally {
+      executorService.shutdown();
+      Assert.assertTrue(executorService.awaitTermination(10, TimeUnit.SECONDS), "expected shutdown to complete");
+    }
+
+    // Then:
+    Assert.assertEquals(_numDropped.size(), 1); // the second successful one never logs to warn
+    Assert.assertEquals((long) _numDropped.get(0), 2L);
+  }
+
+  private QueryLogger.QueryLogParams generateParams(boolean isGroupLimitHit, int numExceptions, long timeUsed) {
+    BrokerResponseNative response = new BrokerResponseNative();
+    response.setNumDocsScanned(1);
+    response.setTotalDocs(2);
+    response.setNumEntriesScannedInFilter(3);
+    response.setNumEntriesScannedPostFilter(4);
+    response.setNumSegmentsQueried(5);
+    response.setNumSegmentsProcessed(6);
+    response.setNumSegmentsMatched(7);
+    response.setNumConsumingSegmentsQueried(8);
+    response.setNumConsumingSegmentsProcessed(9);
+    response.setNumConsumingSegmentsMatched(10);
+    response.setMinConsumingFreshnessTimeMs(11);
+    response.setNumServersResponded(12);
+    response.setNumServersQueried(13);
+    response.setNumGroupsLimitReached(isGroupLimitHit);
+    response.setExceptions(
+        IntStream.range(0, numExceptions)
+            .mapToObj(i -> new ProcessingException()).collect(Collectors.toList()));
+    response.setOfflineTotalCpuTimeNs(14);
+    response.setOfflineThreadCpuTimeNs(15);
+    response.setOfflineSystemActivitiesCpuTimeNs(16);
+    response.setOfflineResponseSerializationCpuTimeNs(17);
+    response.setRealtimeTotalCpuTimeNs(18);
+    response.setRealtimeThreadCpuTimeNs(19);
+    response.setRealtimeSystemActivitiesCpuTimeNs(20);
+    response.setRealtimeResponseSerializationCpuTimeNs(21);
+
+    RequestContext request = new DefaultRequestContext();
+    request.setReduceTimeMillis(22);
+
+    BaseBrokerRequestHandler.ServerStats serverStats = new BaseBrokerRequestHandler.ServerStats();
+    serverStats.setServerStats("serverStats");
+    RequesterIdentity identity = new RequesterIdentity() {
+      @Override public String getClientIp() {
+        return "ip";
+      }
+    };
+
+    return new QueryLogger.QueryLogParams(
+        123,
+        "SELECT * FROM foo",
+        request,
+        "table",
+        24,
+        serverStats,
+        response,
+        timeUsed,
+        identity
+    );
+  }
+}
diff --git a/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml b/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
index a2b12a810c..190c3fe1be 100644
--- a/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
+++ b/pinot-tools/src/main/resources/conf/pinot-broker-log4j2.xml
@@ -31,6 +31,11 @@
         <Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
       </PatternLayout>
     </RandomAccessFile>
+    <RandomAccessFile name="querylog" fileName="querylog.log" immediateFlush="false">
+      <PatternLayout>
+        <Pattern>%d{yyyy/MM/dd HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n</Pattern>
+      </PatternLayout>
+    </RandomAccessFile>
   </Appenders>
   <Loggers>
     <Root level="info" additivity="false">
@@ -39,6 +44,10 @@
       <!-- Direct most logs to the log file -->
       <AppenderRef ref="brokerLog"/>
     </Root>
+    <!-- Output query logs to a dedicated file-->
+    <Logger name="org.apache.pinot.broker.querylog.QueryLogger" level="debug" additivity="false">
+      <AppenderRef ref="querylog"/>
+    </Logger>
     <!-- Output broker starter logs to the console -->
     <Logger name="org.apache.pinot.broker.broker.helix.HelixBrokerStarter" level="info" additivity="false">
       <AppenderRef ref="console"/>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org