You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/03/07 18:47:22 UTC

[incubator-pinot] branch sampling-query-log updated: Added dropped log for server side as well

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch sampling-query-log
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sampling-query-log by this push:
     new c86008c  Added dropped log for server side as well
c86008c is described below

commit c86008c215006b0eb008a37124fb87a731cd69af
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Thu Mar 7 10:47:08 2019 -0800

    Added dropped log for server side as well
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  1 +
 .../apache/pinot/common/utils/CommonConstants.java |  2 +-
 .../pinot/core/query/scheduler/QueryScheduler.java | 23 +++++++++++++++++++++-
 3 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index b408d40..20d384e 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -122,6 +122,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
   }
 
+  @SuppressWarnings("Duplicates")
   @Override
   public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
       RequestStatistics requestStatistics)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index b76f2b8..f18589b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -112,7 +112,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = "pinot.broker.query.log.maxRatePerSecond";
-    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10_000d;
+    public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND = 10d;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index e564eba..dc2f998 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFutureTask;
 import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAccumulator;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
@@ -57,6 +58,9 @@ public abstract class QueryScheduler {
   private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
   private final RateLimiter queryLogRateLimiter;
+  private final RateLimiter numDroppedLogRateLimiter;
+  private final AtomicInteger numDroppedLogCounter;
+
   protected final ServerMetrics serverMetrics;
   protected final QueryExecutor queryExecutor;
   protected final ResourceManager resourceManager;
@@ -82,6 +86,8 @@ public abstract class QueryScheduler {
     this.queryExecutor = queryExecutor;
     this.latestQueryTime = latestQueryTime;
     this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
+    this.numDroppedLogRateLimiter = RateLimiter.create(1.0d);
+    this.numDroppedLogCounter = new AtomicInteger(0);
 
     LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
   }
@@ -133,6 +139,7 @@ public abstract class QueryScheduler {
    * @param executorService Executor service to use for parallelizing query processing
    * @return serialized query response
    */
+  @SuppressWarnings("Duplicates")
   @Nullable
   protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryRequest,
       @Nonnull ExecutorService executorService) {
@@ -190,6 +197,20 @@ public abstract class QueryScheduler {
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
           timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
           numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+
+      // Limit the dropping log message at most once per second.
+      if (numDroppedLogRateLimiter.tryAcquire()) {
+        // NOTE: the reported number may not be accurate since we will be missing some increments happened between
+        // get() and set().
+        int numDroppedLog = numDroppedLogCounter.get();
+        if (numDroppedLog > 0) {
+          LOGGER.info("{} logs were dropped. (log max rate per second: {})", numDroppedLog,
+              queryLogRateLimiter.getRate());
+          numDroppedLogCounter.set(0);
+        }
+      }
+    } else {
+      numDroppedLogCounter.incrementAndGet();
     }
 
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
@@ -208,7 +229,7 @@ public abstract class QueryScheduler {
   private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
     // If scheduler wait time is larger than 100ms, force the log
     if (schedulerWaitMs > 100L) {
-      return false;
+      return true;
     }
 
     // If the number of document scanned is larger than 1 million rows, force the log


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org