You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/04/18 22:19:17 UTC

[pinot] branch master updated: Add brokerURL and better handle TimeoutException in QueryRunner (#8539)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 399cb97f30 Add brokerURL and better handle TimeoutException in QueryRunner (#8539)
399cb97f30 is described below

commit 399cb97f30223fbd40b41e72ea19f13bc3f92421
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Apr 18 15:19:10 2022 -0700

    Add brokerURL and better handle TimeoutException in QueryRunner (#8539)
    
    * Add brokerURL and better handle TimeoutException in QueryRunner
    
    * Boolean
---
 .../pinot/tools/perf/PerfBenchmarkDriver.java      |   4 +-
 .../pinot/tools/perf/PerfBenchmarkDriverConf.java  |   9 ++
 .../org/apache/pinot/tools/perf/QueryRunner.java   | 115 +++++++++++----------
 3 files changed, 75 insertions(+), 53 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index b3af753127..36bb6d4838 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -144,7 +145,8 @@ public class PerfBenchmarkDriver {
     }
 
     // Init broker.
-    _brokerBaseApiUrl = "http://" + _conf.getBrokerHost() + ":" + _conf.getBrokerPort();
+    _brokerBaseApiUrl = StringUtils.isNotBlank(_conf.getBrokerURL()) ? _conf.getBrokerURL()
+        : "http://" + _conf.getBrokerHost() + ":" + _conf.getBrokerPort();
 
     // Init server.
     String serverInstanceName = _conf.getServerInstanceName();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
index 530883117b..3ce806f56f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
@@ -53,6 +53,7 @@ public class PerfBenchmarkDriverConf {
   //broker configuration
   int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
   String _brokerHost = "localhost";
+  String _brokerURL;
   boolean _startBroker = true;
 
   //resource configuration
@@ -146,6 +147,14 @@ public class PerfBenchmarkDriverConf {
     _brokerHost = brokerHost;
   }
 
+  public String getBrokerURL() {
+    return _brokerURL;
+  }
+
+  public void setBrokerURL(String brokerURL) {
+    _brokerURL = brokerURL;
+  }
+
   public boolean shouldStartBroker() {
     return _startBroker;
   }
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
index d0a2535433..2396a08090 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
@@ -32,7 +32,6 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.ThreadSafe;
@@ -53,56 +52,58 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
   private static final long NANO_DELTA = (long) 5E5;
   private static final String CLIENT_TIME_STATISTICS = "CLIENT TIME STATISTICS";
 
-  @CommandLine.Option(names = {"-mode"}, required = true,
-      description = "Mode of query runner (singleThread|multiThreads|targetQPS|increasingQPS).")
+  @CommandLine.Option(names = {"-mode"}, required = true, description = "Mode of query runner "
+      + "(singleThread|multiThreads|targetQPS|increasingQPS).")
   private String _mode;
   @CommandLine.Option(names = {"-queryFile"}, required = true, description = "Path to query file.")
   private String _queryFile;
-  @CommandLine.Option(names = {"-queryMode"}, required = false,
-      description = "Mode of query generator (full|resample).")
+  @CommandLine.Option(names = {"-queryMode"}, required = false, description = "Mode of query generator "
+      + "(full|resample).")
   private String _queryMode = QueryMode.FULL.toString();
-  @CommandLine.Option(names = {"-queryCount"}, required = false,
-      description = "Number of queries to run (default 0 = all).")
+  @CommandLine.Option(names = {"-queryCount"}, required = false, description = "Number of queries to run (default 0 ="
+      + " all).")
   private int _queryCount = 0;
-  @CommandLine.Option(names = {"-numTimesToRunQueries"}, required = false,
-      description = "Number of times to run all queries in the query file, 0 means infinite times (default 1).")
+  @CommandLine.Option(names = {"-numTimesToRunQueries"}, required = false, description = "Number of times to run all "
+      + "queries in the query file, 0 means infinite times (default 1).")
   private int _numTimesToRunQueries = 1;
-  @CommandLine.Option(names = {"-reportIntervalMs"}, required = false,
-      description = "Interval in milliseconds to report simple statistics (default 3000).")
+  @CommandLine.Option(names = {"-reportIntervalMs"}, required = false, description = "Interval in milliseconds to "
+      + "report simple statistics (default 3000).")
   private int _reportIntervalMs = 3000;
-  @CommandLine.Option(names = {"-numIntervalsToReportAndClearStatistics"}, required = false,
-      description = "Number of report intervals to report detailed statistics and clear them,"
-          + " 0 means never (default 10).")
+  @CommandLine.Option(names = {"-numIntervalsToReportAndClearStatistics"}, required = false, description =
+      "Number of report intervals to report detailed statistics and clear them," + " 0 means never (default 10).")
   private int _numIntervalsToReportAndClearStatistics = 10;
-  @CommandLine.Option(names = {"-numThreads"}, required = false,
-      description = "Number of threads sending queries for multiThreads, targetQPS and increasingQPS mode (default 5). "
+  @CommandLine.Option(names = {"-numThreads"}, required = false, description =
+      "Number of threads sending queries for multiThreads, targetQPS and increasingQPS mode (default 5). "
           + "This can be used to simulate multiple clients sending queries concurrently.")
   private int _numThreads = 5;
-  @CommandLine.Option(names = {"-startQPS"}, required = false,
-      description = "Start QPS for targetQPS and increasingQPS mode")
+  @CommandLine.Option(names = {"-startQPS"}, required = false, description = "Start QPS for targetQPS and "
+      + "increasingQPS mode")
   private double _startQPS;
   @CommandLine.Option(names = {"-deltaQPS"}, required = false, description = "Delta QPS for increasingQPS mode.")
   private double _deltaQPS;
-  @CommandLine.Option(names = {"-numIntervalsToIncreaseQPS"}, required = false,
-      description = "Number of report intervals to increase QPS for increasingQPS mode (default 10).")
+  @CommandLine.Option(names = {"-numIntervalsToIncreaseQPS"}, required = false, description = "Number of report "
+      + "intervals to increase QPS for increasingQPS mode (default 10).")
   private int _numIntervalsToIncreaseQPS = 10;
   @CommandLine.Option(names = {"-brokerHost"}, required = false, description = "Broker host name (default localhost).")
   private String _brokerHost = "localhost";
   @CommandLine.Option(names = {"-brokerPort"}, required = false, description = "Broker port number (default 8099).")
   private int _brokerPort = 8099;
-  @CommandLine.Option(names = {"-queueDepth"}, required = false,
-      description = "Queue size limit for multi-threaded execution (default 64).")
+  @CommandLine.Option(names = {"-brokerURL"}, required = false, description = "Broker URL (no default, uses "
+      + "brokerHost:brokerPort by default.")
+  private String _brokerURL;
+  @CommandLine.Option(names = {"-queueDepth"}, required = false, description = "Queue size limit for multi-threaded "
+      + "execution (default 64).")
   private int _queueDepth = 64;
-  @CommandLine.Option(names = {"-timeout"}, required = false,
-      description = "Timeout in milliseconds for completing all queries (default: unlimited).")
+  @CommandLine.Option(names = {"-timeout"}, required = false, description = "Timeout in milliseconds for completing "
+      + "all queries (default: unlimited).")
   private long _timeout = 0;
-  @CommandLine.Option(names = {"-verbose"}, required = false,
-      description = "Enable verbose query logging (default: false).")
+  @CommandLine.Option(names = {"-verbose"}, required = false, description = "Enable verbose query logging (default: "
+      + "false).")
   private boolean _verbose = false;
   @CommandLine.Option(names = {"-dialect"}, required = false, description = "Query dialect to use (pql|sql).")
   private String _dialect = "pql";
-  @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true,
-      description = "Print this message.")
+  @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print "
+      + "this message.")
   private boolean _help;
 
   private enum QueryMode {
@@ -168,6 +169,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
     conf.setBrokerHost(_brokerHost);
     conf.setBrokerPort(_brokerPort);
+    conf.setBrokerURL(_brokerURL);
     conf.setRunQueries(true);
     conf.setStartZookeeper(false);
     conf.setStartController(false);
@@ -295,15 +297,18 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     List<Statistics> statisticsList = Collections.singletonList(new Statistics(CLIENT_TIME_STATISTICS));
 
     final long startTimeAbsolute = System.currentTimeMillis();
+    boolean timeoutReached = false;
     long startTime = System.currentTimeMillis();
     long reportStartTime = startTime;
     int numReportIntervals = 0;
     int numTimesExecuted = 0;
-    while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+
+    while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
       for (String query : queries) {
         if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
-          LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
-          throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
+          LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+          timeoutReached = true;
+          break;
         }
 
         JsonNode response = driver.postQuery(query, headers);
@@ -319,9 +324,8 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         long currentTime = System.currentTimeMillis();
         if (currentTime - reportStartTime >= reportIntervalMs) {
           long timePassed = currentTime - startTime;
-          LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, "
-                  + "Average Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecuted,
-              numExceptions,
+          LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, " + "Average "
+                  + "Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecuted, numExceptions,
               numQueriesExecuted / ((double) timePassed / MILLIS_PER_SECOND),
               totalBrokerTime / (double) numQueriesExecuted, totalClientTime / (double) numQueriesExecuted);
           reportStartTime = currentTime;
@@ -416,16 +420,18 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     long reportStartTime = startTime;
     int numReportIntervals = 0;
     int numTimesExecuted = 0;
-    while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+    boolean timeoutReached = false;
+    while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
       if (executorService.isTerminated()) {
         throw new IllegalThreadStateException("All threads got exception and already dead.");
       }
 
-      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
-        throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
-      }
-
       for (String query : queries) {
+        if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+          LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+          timeoutReached = true;
+          break;
+        }
         while (!queryQueue.offer(query)) {
           Thread.sleep(1);
         }
@@ -434,9 +440,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         if (currentTime - reportStartTime >= reportIntervalMs) {
           long timePassed = currentTime - startTime;
           int numQueriesExecutedInt = numQueriesExecuted.get();
-          LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, "
-                  + "Average Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecutedInt,
-              numExceptions.get(), numQueriesExecutedInt / ((double) timePassed / MILLIS_PER_SECOND),
+          LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, " + "Average "
+                  + "Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecutedInt,
+              numExceptions.get(),
+              numQueriesExecutedInt / ((double) timePassed / MILLIS_PER_SECOND),
               totalBrokerTime.get() / (double) numQueriesExecutedInt,
               totalClientTime.get() / (double) numQueriesExecutedInt);
           reportStartTime = currentTime;
@@ -535,17 +542,19 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     long reportStartTime = startTime;
     int numReportIntervals = 0;
     int numTimesExecuted = 0;
-    while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+    boolean timeoutReached = false;
+    while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
       if (executorService.isTerminated()) {
         throw new IllegalThreadStateException("All threads got exception and already dead.");
       }
 
-      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
-        throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
-      }
-
       long nextQueryNanos = System.nanoTime();
       for (String query : queries) {
+        if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+          LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+          timeoutReached = true;
+          break;
+        }
         long nanoTime = System.nanoTime();
         while (nextQueryNanos > nanoTime - NANO_DELTA) {
           Thread.sleep(Math.max((int) ((nextQueryNanos - nanoTime) / 1E6), 1));
@@ -671,17 +680,19 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     int numTimesExecuted = 0;
     double currentQPS = startQPS;
     long queryIntervalNanos = (long) (1E9 / currentQPS);
-    while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+    boolean timeoutReached = false;
+    while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
       if (executorService.isTerminated()) {
         throw new IllegalThreadStateException("All threads got exception and already dead.");
       }
 
-      if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
-        throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
-      }
-
       long nextQueryNanos = System.nanoTime();
       for (String query : queries) {
+        if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+          LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+          timeoutReached = true;
+          break;
+        }
         long nanoTime = System.nanoTime();
         while (nextQueryNanos > nanoTime - NANO_DELTA) {
           Thread.sleep(Math.max((int) ((nextQueryNanos - nanoTime) / 1E6), 1));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org