You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2019/03/07 00:35:58 UTC

[incubator-pinot] branch sampling-query-log updated: Change to use maxRate concept, added forceLog() to ensure some outliers to get logged

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch sampling-query-log
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sampling-query-log by this push:
     new ed6041b  Change to use maxRate concept, added forceLog() to ensure some outliers to get logged
ed6041b is described below

commit ed6041bf250b9e02f98619394477b087bfe409fb
Author: Seunghyun Lee <sn...@linkedin.com>
AuthorDate: Wed Mar 6 16:35:27 2019 -0800

    Change to use maxRate concept, added forceLog() to ensure some outliers to get logged
---
 .../requesthandler/BaseBrokerRequestHandler.java   | 37 +++++++++++++----
 .../apache/pinot/common/utils/CommonConstants.java |  4 +-
 .../pinot/core/query/scheduler/QueryScheduler.java | 46 ++++++++++++++++------
 3 files changed, 64 insertions(+), 23 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index a1902fb..de1d8d7 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.requesthandler;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Splitter;
+import com.google.common.util.concurrent.RateLimiter;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -83,7 +84,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
   protected final long _brokerTimeoutMs;
   protected final int _queryResponseLimit;
   protected final int _queryLogLength;
-  protected final float _queryLogSamplingRate;
+  protected final RateLimiter _queryLogRateLimiter;
 
   public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
       TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
@@ -99,12 +100,12 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
     _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
     _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH);
-    _queryLogSamplingRate =
-        config.getFloat(CONFIG_OF_BROKER_QUERY_LOG_SAMPLING_RATE, DEFAULT_BROKER_QUERY_LOG_SAMPLING_RATE);
+    _queryLogRateLimiter = RateLimiter
+        .create(config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE, DEFAULT_BROKER_QUERY_LOG_MAX_RATE));
 
     LOGGER.info(
-        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log sampling rate: {}",
-        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogSamplingRate);
+        "Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
+        _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
   }
 
   private String getDefaultBrokerId() {
@@ -298,8 +299,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
 
     LOGGER.debug("Broker Response: {}", brokerResponse);
 
-    // Sampling the query log based on the sampling rate configuration
-    if(_queryLogSamplingRate > RANDOM.nextFloat()) {
+    if(_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
       // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
       LOGGER.info(
           "RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{}, segments(queried/processed/matched):{}/{}/{} "
@@ -312,9 +312,30 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
           brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
           StringUtils.substring(query, 0, _queryLogLength));
     }
+    return brokerResponse;
+  }
 
+  /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with better numbers
+   *
+   */
+  private boolean forceLog(BrokerResponse brokerResponse, long totalTimeMs) {
+    if (brokerResponse.isNumGroupsLimitReached()) {
+      return true;
+    }
 
-    return brokerResponse;
+    if (brokerResponse.getExceptionsSize() > 0) {
+      return true;
+    }
+
+    // If response time is more than 1 sec, force the log
+    if (totalTimeMs > 1000L) {
+      return true;
+    }
+
+    return false;
   }
 
   /**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index cf5ec95..67375bb 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -111,8 +111,8 @@ public class CommonConstants {
     public static final int DEFAULT_BROKER_QUERY_RESPONSE_LIMIT = Integer.MAX_VALUE;
     public static final String CONFIG_OF_BROKER_QUERY_LOG_LENGTH = "pinot.broker.query.log.length";
     public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH = Integer.MAX_VALUE;
-    public static final String CONFIG_OF_BROKER_QUERY_LOG_SAMPLING_RATE = "pinot.broker.query.log.samplingRate";
-    public static final float DEFAULT_BROKER_QUERY_LOG_SAMPLING_RATE = 1.0f;
+    public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE = "pinot.broker.query.log.maxRate";
+    public static final float DEFAULT_BROKER_QUERY_LOG_MAX_RATE = 10_000L;
     public static final String CONFIG_OF_BROKER_TIMEOUT_MS = "pinot.broker.timeoutMs";
     public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
     public static final String CONFIG_OF_BROKER_ID = "pinot.broker.id";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 99cf31e..5fc9e44 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.RateLimiter;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -54,10 +55,10 @@ public abstract class QueryScheduler {
 
   private static final String INVALID_NUM_SCANNED = "-1";
   private static final String INVALID_SEGMENTS_COUNT = "-1";
-  private static final String QUERY_LOG_SAMPLING_RATE_KEY = "query_log_sampling_rate";
-  private static final float DEFAULT_QUERY_LOG_SAMPLING_RATE = 1.0f;
+  private static final String QUERY_LOG_MAX_RATE_KEY = "query_log_max_rate";
+  private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
 
-  private final float queryLogSamplingRate;
+  private final RateLimiter queryLogRateLimiter;
   protected final ServerMetrics serverMetrics;
   protected final QueryExecutor queryExecutor;
   protected final ResourceManager resourceManager;
@@ -70,8 +71,9 @@ public abstract class QueryScheduler {
    * @param resourceManager for managing server thread resources
    * @param serverMetrics server metrics collector
    */
-  public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor, @Nonnull ResourceManager resourceManager,
-      @Nonnull ServerMetrics serverMetrics, @Nonnull LongAccumulator latestQueryTime) {
+  public QueryScheduler(@Nonnull Configuration config, @Nonnull QueryExecutor queryExecutor,
+      @Nonnull ResourceManager resourceManager, @Nonnull ServerMetrics serverMetrics,
+      @Nonnull LongAccumulator latestQueryTime) {
     Preconditions.checkNotNull(config);
     Preconditions.checkNotNull(queryExecutor);
     Preconditions.checkNotNull(resourceManager);
@@ -81,9 +83,9 @@ public abstract class QueryScheduler {
     this.resourceManager = resourceManager;
     this.queryExecutor = queryExecutor;
     this.latestQueryTime = latestQueryTime;
-    this.queryLogSamplingRate = config.getFloat(QUERY_LOG_SAMPLING_RATE_KEY, DEFAULT_QUERY_LOG_SAMPLING_RATE);
+    this.queryLogRateLimiter = RateLimiter.create(config.getDouble(QUERY_LOG_MAX_RATE_KEY, DEFAULT_QUERY_LOG_MAX_RATE));
 
-    LOGGER.info("Query log sampling rate: {}", queryLogSamplingRate);
+    LOGGER.info("Query log max rate: {}", queryLogRateLimiter.getRate());
   }
 
   /**
@@ -181,16 +183,15 @@ public abstract class QueryScheduler {
 
     TimerContext timerContext = queryRequest.getTimerContext();
     int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
+    long schedulerWaitMs = timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT);
 
-    // Sampling the query log based on the sampling rate configuration
-    if (queryLogSamplingRate > RANDOM.nextFloat()) {
+    if (queryLogRateLimiter.tryAcquire() || forceLog(schedulerWaitMs, numDocsScanned)) {
       LOGGER.info(
           "Processed requestId={},table={},segments(queried/processed/matched)={}/{}/{},schedulerWaitMs={},totalExecMs={},totalTimeMs={},broker={},numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}",
-          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched,
-          timerContext.getPhaseDurationMs(ServerQueryPhase.SCHEDULER_WAIT),
+          requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, schedulerWaitMs,
           timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
-          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(), numDocsScanned,
-          numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
+          timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), queryRequest.getBrokerId(),
+          numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name());
     }
 
     serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
@@ -201,6 +202,25 @@ public abstract class QueryScheduler {
   }
 
   /**
+   * Helper function to decide whether to force the log
+   *
+   * TODO: come up with other criteria for forcing a log and come up with better numbers
+   *
+   */
+  private boolean forceLog(long schedulerWaitMs, long numDocsScanned) {
+    // If scheduler wait time is larger than 100ms, force the log
+    if (schedulerWaitMs > 100L) {
+      return false;
+    }
+
+    // If the number of document scanned is larger than 1 million rows, force the log
+    if (numDocsScanned > 1_000_000L) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
    * Serialize the DataTable response for query request
    * @param queryRequest Server query request for which response is serialized
    * @param dataTable DataTable to serialize


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org