You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/10/22 20:14:13 UTC

[incubator-pinot] branch master updated: add execution timeout and fix exception stats (#6177)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 669bd59  add execution timeout and fix exception stats (#6177)
669bd59 is described below

commit 669bd595baa671f2420476da94524635ab1d14db
Author: Alexander Pucher <ap...@apache.org>
AuthorDate: Thu Oct 22 13:13:54 2020 -0700

    add execution timeout and fix exception stats (#6177)
    
    We add a "-timeout" param to the query runner to support time-limited execution, e.g. for performance benchmarks.
    This PR also fixes a display issue with query exception statistics not being reset on every interval.
---
 .../org/apache/pinot/tools/perf/QueryRunner.java   | 82 +++++++++++++++-------
 1 file changed, 57 insertions(+), 25 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
index 2ced758..d9d6d6a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
@@ -77,6 +77,8 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
   private int _brokerPort = 8099;
   @Option(name = "-queueDepth", required = false, metaVar = "<int>", usage = "Queue size limit for multi-threaded execution (default 64).")
   private int _queueDepth = 64;
+  @Option(name = "-timeout", required = false, metaVar = "<long>", usage = "Timeout in milliseconds for completing all queries (default: unlimited).")
+  private long _timeout = 0;
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help;
 
@@ -151,10 +153,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     switch (_mode) {
       case "singleThread":
         LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, "
-                + "numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries, _reportIntervalMs,
-            _numIntervalsToReportAndClearStatistics);
+                + "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
+            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
         singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs,
-            _numIntervalsToReportAndClearStatistics);
+            _numIntervalsToReportAndClearStatistics, _timeout);
         break;
       case "multiThreads":
         if (_numThreads <= 0) {
@@ -163,11 +165,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
           break;
         }
         LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, "
-                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile,
-            _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
-            _queueDepth);
+                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
+            _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
+            _queueDepth, _timeout);
         multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs,
-            _numIntervalsToReportAndClearStatistics);
+            _numIntervalsToReportAndClearStatistics, _timeout);
         break;
       case "targetQPS":
         if (_numThreads <= 0) {
@@ -181,11 +183,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
           break;
         }
         LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
-                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile,
-            _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
-            _queueDepth);
+                + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
+            _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
+            _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout);
         targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS,
-            _reportIntervalMs, _numIntervalsToReportAndClearStatistics);
+            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
         break;
       case "increasingQPS":
         if (_numThreads <= 0) {
@@ -211,11 +213,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         }
         LOGGER.info("MODE increasingQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
                 + "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, "
-                + "numIntervalsToIncreaseQPS: {}, queueDepth: {}", _queryFile, _numTimesToRunQueries, _numThreads,
-            _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
-            _numIntervalsToIncreaseQPS, _queueDepth);
+                + "numIntervalsToIncreaseQPS: {}, queueDepth: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
+            _numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
+            _numIntervalsToIncreaseQPS, _queueDepth, _timeout);
         increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS,
-            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS);
+            _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout);
         break;
       default:
         LOGGER.error("Invalid mode: {}", _mode);
@@ -237,10 +239,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * @param reportIntervalMs report interval in milliseconds.
    * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
    *                                               them, 0 means never.
+   * @param timeout timeout in milliseconds for completing all queries.
    * @throws Exception
    */
   public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
-      int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
+      int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout)
       throws Exception {
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     int numQueriesExecuted = 0;
@@ -249,6 +252,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     long totalClientTime = 0L;
     List<Statistics> statisticsList = Collections.singletonList(new Statistics(CLIENT_TIME_STATISTICS));
 
+    final long startTimeAbsolute = System.currentTimeMillis();
     long startTime = System.currentTimeMillis();
     long reportStartTime = startTime;
     int numReportIntervals = 0;
@@ -256,6 +260,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
       Iterator<String> itQuery = queries.iterator();
       while (itQuery.hasNext()) {
+        if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+          LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+          return;
+        }
+
         String query = itQuery.next();
 
         JsonNode response = driver.postQuery(query);
@@ -284,6 +293,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
             numReportIntervals = 0;
             startTime = currentTime;
             numQueriesExecuted = 0;
+            numExceptions = 0;
             totalBrokerTime = 0L;
             totalClientTime = 0L;
             for (Statistics statistics : statisticsList) {
@@ -325,10 +335,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * @param reportIntervalMs report interval in milliseconds.
    * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
    *                                               them, 0 means never.
+   * @param timeout timeout in milliseconds for completing all queries
    * @throws Exception
    */
   public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
-      int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
+      int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout)
       throws Exception {
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -346,6 +357,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     }
     executorService.shutdown();
 
+    final long startTimeAbsolute = System.currentTimeMillis();
     long startTime = System.currentTimeMillis();
     long reportStartTime = startTime;
     int numReportIntervals = 0;
@@ -356,6 +368,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
+      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+        LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+        return;
+      }
+
       Iterator<String> itQuery = queries.iterator();
       while (itQuery.hasNext()) {
         String query = itQuery.next();
@@ -381,7 +398,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
               == numIntervalsToReportAndClearStatistics)) {
             numReportIntervals = 0;
             startTime = currentTime;
-            reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+            reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
           }
         }
       }
@@ -428,11 +445,12 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * @param reportIntervalMs report interval in milliseconds.
    * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
    *                                               them, 0 means never.
+   * @param timeout timeout in milliseconds for completing all queries
    * @throws Exception
    */
   public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int numThreads, int queueDepth, double startQPS, int reportIntervalMs,
-      int numIntervalsToReportAndClearStatistics)
+      int numIntervalsToReportAndClearStatistics, long timeout)
       throws Exception {
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -450,6 +468,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     }
     executorService.shutdown();
 
+    final long startTimeAbsolute = System.currentTimeMillis();
     final int queryIntervalNanos = (int) (1E9 / startQPS);
     long startTime = System.currentTimeMillis();
     long reportStartTime = startTime;
@@ -461,6 +480,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
+      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+        LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+        return;
+      }
+
       long nextQueryNanos = System.nanoTime();
       Iterator<String> itQuery = queries.iterator();
       while (itQuery.hasNext()) {
@@ -495,7 +519,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
               == numIntervalsToReportAndClearStatistics)) {
             numReportIntervals = 0;
             startTime = currentTime;
-            reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+            reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
           }
         }
       }
@@ -544,13 +568,14 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * @param reportIntervalMs report interval in milliseconds.
    * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
    *                                               them, 0 means never.
+   * @param timeout timeout in milliseconds for completing all queries.
    * @param numIntervalsToIncreaseQPS number of intervals to increase QPS.
    * @throws Exception
    */
 
   public static void increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int numThreads, int queueDepth, double startQPS, double deltaQPS, int reportIntervalMs,
-      int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS)
+      int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS, long timeout)
       throws Exception {
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -568,6 +593,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     }
     executorService.shutdown();
 
+    final long startTimeAbsolute = System.currentTimeMillis();
     long startTime = System.currentTimeMillis();
     long reportStartTime = startTime;
     int numReportIntervals = 0;
@@ -580,6 +606,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
+      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+        LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+        return;
+      }
+
       long nextQueryNanos = System.nanoTime();
       Iterator<String> itQuery = queries.iterator();
       while (itQuery.hasNext()) {
@@ -616,7 +647,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
                 totalClientTime.get() / (double) numQueriesExecutedInt, queryQueue.size());
             numReportIntervals = 0;
             startTime = currentTime;
-            reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+            reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
 
             currentQPS += deltaQPS;
             queryIntervalNanos = (long) (1E9 / currentQPS);
@@ -633,7 +664,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
             if ((numIntervalsToReportAndClearStatistics != 0) && (
                 numReportIntervals % numIntervalsToReportAndClearStatistics == 0)) {
               startTime = currentTime;
-              reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+              reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
             }
           }
         }
@@ -679,9 +710,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     }
   }
 
-  private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicLong totalBrokerTime,
-      AtomicLong totalClientTime, List<Statistics> statisticsList) {
+  private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicInteger numExceptions,
+      AtomicLong totalBrokerTime, AtomicLong totalClientTime, List<Statistics> statisticsList) {
     numQueriesExecuted.set(0);
+    numExceptions.set(0);
     totalBrokerTime.set(0L);
     totalClientTime.set(0L);
     for (Statistics statistics : statisticsList) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org