You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/03/07 04:32:06 UTC

[incubator-pinot] branch sampling-query-log updated: Addressed comments and added log for reporting dropped log

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch sampling-query-log
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sampling-query-log by this push:
     new 9ad107b  Addressed comments and added log for reporting dropped log
9ad107b is described below

commit 9ad107bc710db4887d7efe51be2d3e34f38cd460
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Wed Mar 6 20:31:51 2019 -0800

    Addressed comments and added log for reporting dropped log
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 29 +++++++++++++++++++---
 .../apache/pinot/common/utils/CommonConstants.java |  4 +--
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index a0f4672..48c2dc2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -26,8 +26,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
@@ -83,7 +83,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final long _brokerTimeoutMs;
   protected final int _queryResponseLimit;
   protected final int _queryLogLength;
-  protected final RateLimiter _queryLogRateLimiter;
+
+  private final RateLimiter _queryLogRateLimiter;
+  private final RateLimiter _numDroppedLogRateLimiter;
+  private final AtomicInteger _numDroppedLog;
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
@@ -99,8 +102,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
     _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH);
-    _queryLogRateLimiter = RateLimiter
-        .create(config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE, DEFAULT_BROKER_QUERY_LOG_MAX_RATE));
+    _queryLogRateLimiter = RateLimiter.create(
+        config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
+
+    _numDroppedLog = new AtomicInteger(0);
+    _numDroppedLogRateLimiter = RateLimiter.create(1.0);
 
     LOGGER.info(
         "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
@@ -310,6 +316,21 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
           brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
           brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           StringUtils.substring(query, 0, _queryLogLength));
+
+      // Limit the dropping log message at most once per second.
+      if (_numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+        // get() and set().
+        int numDroppedLog = _numDroppedLog.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {}", numDroppedLog,
+              _queryLogRateLimiter.getRate());
+          _numDroppedLog.set(0);
+        }
+      }
+    } else {
+      // Increment the count for dropped log
+      _numDroppedLog.incrementAndGet();
     }
     return brokerResponse;
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 16a7265..b76f2b8 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,8 +111,8 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
-    public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE = "pinot.broker.query.log.maxRate";
-    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE = 10_000d;
+    public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond";
+    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org