You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2022/04/18 22:19:17 UTC
[pinot] branch master updated: Add brokerURL and better handle TimeoutException in QueryRunner (#8539)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 399cb97f30 Add brokerURL and better handle TimeoutException in QueryRunner (#8539)
399cb97f30 is described below
commit 399cb97f30223fbd40b41e72ea19f13bc3f92421
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon Apr 18 15:19:10 2022 -0700
Add brokerURL and better handle TimeoutException in QueryRunner (#8539)
* Add brokerURL and better handle TimeoutException in QueryRunner
* Boolean
---
.../pinot/tools/perf/PerfBenchmarkDriver.java | 4 +-
.../pinot/tools/perf/PerfBenchmarkDriverConf.java | 9 ++
.../org/apache/pinot/tools/perf/QueryRunner.java | 115 +++++++++++----------
3 files changed, 75 insertions(+), 53 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index b3af753127..36bb6d4838 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -144,7 +145,8 @@ public class PerfBenchmarkDriver {
}
// Init broker.
- _brokerBaseApiUrl = "http://" + _conf.getBrokerHost() + ":" + _conf.getBrokerPort();
+ _brokerBaseApiUrl = StringUtils.isNotBlank(_conf.getBrokerURL()) ? _conf.getBrokerURL()
+ : "http://" + _conf.getBrokerHost() + ":" + _conf.getBrokerPort();
// Init server.
String serverInstanceName = _conf.getServerInstanceName();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
index 530883117b..3ce806f56f 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriverConf.java
@@ -53,6 +53,7 @@ public class PerfBenchmarkDriverConf {
//broker configuration
int _brokerPort = CommonConstants.Helix.DEFAULT_BROKER_QUERY_PORT;
String _brokerHost = "localhost";
+ String _brokerURL;
boolean _startBroker = true;
//resource configuration
@@ -146,6 +147,14 @@ public class PerfBenchmarkDriverConf {
_brokerHost = brokerHost;
}
+ public String getBrokerURL() {
+ return _brokerURL;
+ }
+
+ public void setBrokerURL(String brokerURL) {
+ _brokerURL = brokerURL;
+ }
+
public boolean shouldStartBroker() {
return _startBroker;
}
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
index d0a2535433..2396a08090 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
@@ -32,7 +32,6 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
@@ -53,56 +52,58 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
private static final long NANO_DELTA = (long) 5E5;
private static final String CLIENT_TIME_STATISTICS = "CLIENT TIME STATISTICS";
- @CommandLine.Option(names = {"-mode"}, required = true,
- description = "Mode of query runner (singleThread|multiThreads|targetQPS|increasingQPS).")
+ @CommandLine.Option(names = {"-mode"}, required = true, description = "Mode of query runner "
+ + "(singleThread|multiThreads|targetQPS|increasingQPS).")
private String _mode;
@CommandLine.Option(names = {"-queryFile"}, required = true, description = "Path to query file.")
private String _queryFile;
- @CommandLine.Option(names = {"-queryMode"}, required = false,
- description = "Mode of query generator (full|resample).")
+ @CommandLine.Option(names = {"-queryMode"}, required = false, description = "Mode of query generator "
+ + "(full|resample).")
private String _queryMode = QueryMode.FULL.toString();
- @CommandLine.Option(names = {"-queryCount"}, required = false,
- description = "Number of queries to run (default 0 = all).")
+ @CommandLine.Option(names = {"-queryCount"}, required = false, description = "Number of queries to run (default 0 ="
+ + " all).")
private int _queryCount = 0;
- @CommandLine.Option(names = {"-numTimesToRunQueries"}, required = false,
- description = "Number of times to run all queries in the query file, 0 means infinite times (default 1).")
+ @CommandLine.Option(names = {"-numTimesToRunQueries"}, required = false, description = "Number of times to run all "
+ + "queries in the query file, 0 means infinite times (default 1).")
private int _numTimesToRunQueries = 1;
- @CommandLine.Option(names = {"-reportIntervalMs"}, required = false,
- description = "Interval in milliseconds to report simple statistics (default 3000).")
+ @CommandLine.Option(names = {"-reportIntervalMs"}, required = false, description = "Interval in milliseconds to "
+ + "report simple statistics (default 3000).")
private int _reportIntervalMs = 3000;
- @CommandLine.Option(names = {"-numIntervalsToReportAndClearStatistics"}, required = false,
- description = "Number of report intervals to report detailed statistics and clear them,"
- + " 0 means never (default 10).")
+ @CommandLine.Option(names = {"-numIntervalsToReportAndClearStatistics"}, required = false, description =
+ "Number of report intervals to report detailed statistics and clear them," + " 0 means never (default 10).")
private int _numIntervalsToReportAndClearStatistics = 10;
- @CommandLine.Option(names = {"-numThreads"}, required = false,
- description = "Number of threads sending queries for multiThreads, targetQPS and increasingQPS mode (default 5). "
+ @CommandLine.Option(names = {"-numThreads"}, required = false, description =
+ "Number of threads sending queries for multiThreads, targetQPS and increasingQPS mode (default 5). "
+ "This can be used to simulate multiple clients sending queries concurrently.")
private int _numThreads = 5;
- @CommandLine.Option(names = {"-startQPS"}, required = false,
- description = "Start QPS for targetQPS and increasingQPS mode")
+ @CommandLine.Option(names = {"-startQPS"}, required = false, description = "Start QPS for targetQPS and "
+ + "increasingQPS mode")
private double _startQPS;
@CommandLine.Option(names = {"-deltaQPS"}, required = false, description = "Delta QPS for increasingQPS mode.")
private double _deltaQPS;
- @CommandLine.Option(names = {"-numIntervalsToIncreaseQPS"}, required = false,
- description = "Number of report intervals to increase QPS for increasingQPS mode (default 10).")
+ @CommandLine.Option(names = {"-numIntervalsToIncreaseQPS"}, required = false, description = "Number of report "
+ + "intervals to increase QPS for increasingQPS mode (default 10).")
private int _numIntervalsToIncreaseQPS = 10;
@CommandLine.Option(names = {"-brokerHost"}, required = false, description = "Broker host name (default localhost).")
private String _brokerHost = "localhost";
@CommandLine.Option(names = {"-brokerPort"}, required = false, description = "Broker port number (default 8099).")
private int _brokerPort = 8099;
- @CommandLine.Option(names = {"-queueDepth"}, required = false,
- description = "Queue size limit for multi-threaded execution (default 64).")
+ @CommandLine.Option(names = {"-brokerURL"}, required = false, description = "Broker URL (no default, uses "
+ + "brokerHost:brokerPort by default.")
+ private String _brokerURL;
+ @CommandLine.Option(names = {"-queueDepth"}, required = false, description = "Queue size limit for multi-threaded "
+ + "execution (default 64).")
private int _queueDepth = 64;
- @CommandLine.Option(names = {"-timeout"}, required = false,
- description = "Timeout in milliseconds for completing all queries (default: unlimited).")
+ @CommandLine.Option(names = {"-timeout"}, required = false, description = "Timeout in milliseconds for completing "
+ + "all queries (default: unlimited).")
private long _timeout = 0;
- @CommandLine.Option(names = {"-verbose"}, required = false,
- description = "Enable verbose query logging (default: false).")
+ @CommandLine.Option(names = {"-verbose"}, required = false, description = "Enable verbose query logging (default: "
+ + "false).")
private boolean _verbose = false;
@CommandLine.Option(names = {"-dialect"}, required = false, description = "Query dialect to use (pql|sql).")
private String _dialect = "pql";
- @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true,
- description = "Print this message.")
+ @CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, required = false, help = true, description = "Print "
+ + "this message.")
private boolean _help;
private enum QueryMode {
@@ -168,6 +169,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
PerfBenchmarkDriverConf conf = new PerfBenchmarkDriverConf();
conf.setBrokerHost(_brokerHost);
conf.setBrokerPort(_brokerPort);
+ conf.setBrokerURL(_brokerURL);
conf.setRunQueries(true);
conf.setStartZookeeper(false);
conf.setStartController(false);
@@ -295,15 +297,18 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
List<Statistics> statisticsList = Collections.singletonList(new Statistics(CLIENT_TIME_STATISTICS));
final long startTimeAbsolute = System.currentTimeMillis();
+ boolean timeoutReached = false;
long startTime = System.currentTimeMillis();
long reportStartTime = startTime;
int numReportIntervals = 0;
int numTimesExecuted = 0;
- while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+
+ while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
for (String query : queries) {
if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
- LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
- throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
+ LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+ timeoutReached = true;
+ break;
}
JsonNode response = driver.postQuery(query, headers);
@@ -319,9 +324,8 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
long currentTime = System.currentTimeMillis();
if (currentTime - reportStartTime >= reportIntervalMs) {
long timePassed = currentTime - startTime;
- LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, "
- + "Average Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecuted,
- numExceptions,
+ LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, " + "Average "
+ + "Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecuted, numExceptions,
numQueriesExecuted / ((double) timePassed / MILLIS_PER_SECOND),
totalBrokerTime / (double) numQueriesExecuted, totalClientTime / (double) numQueriesExecuted);
reportStartTime = currentTime;
@@ -416,16 +420,18 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
long reportStartTime = startTime;
int numReportIntervals = 0;
int numTimesExecuted = 0;
- while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+ boolean timeoutReached = false;
+ while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
if (executorService.isTerminated()) {
throw new IllegalThreadStateException("All threads got exception and already dead.");
}
- if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
- throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
- }
-
for (String query : queries) {
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+ timeoutReached = true;
+ break;
+ }
while (!queryQueue.offer(query)) {
Thread.sleep(1);
}
@@ -434,9 +440,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
if (currentTime - reportStartTime >= reportIntervalMs) {
long timePassed = currentTime - startTime;
int numQueriesExecutedInt = numQueriesExecuted.get();
- LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, "
- + "Average Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecutedInt,
- numExceptions.get(), numQueriesExecutedInt / ((double) timePassed / MILLIS_PER_SECOND),
+ LOGGER.info("Time Passed: {}ms, Queries Executed: {}, Exceptions: {}, Average QPS: {}, " + "Average "
+ + "Broker Time: {}ms, Average Client Time: {}ms.", timePassed, numQueriesExecutedInt,
+ numExceptions.get(),
+ numQueriesExecutedInt / ((double) timePassed / MILLIS_PER_SECOND),
totalBrokerTime.get() / (double) numQueriesExecutedInt,
totalClientTime.get() / (double) numQueriesExecutedInt);
reportStartTime = currentTime;
@@ -535,17 +542,19 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
long reportStartTime = startTime;
int numReportIntervals = 0;
int numTimesExecuted = 0;
- while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+ boolean timeoutReached = false;
+ while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
if (executorService.isTerminated()) {
throw new IllegalThreadStateException("All threads got exception and already dead.");
}
- if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
- throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
- }
-
long nextQueryNanos = System.nanoTime();
for (String query : queries) {
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+ timeoutReached = true;
+ break;
+ }
long nanoTime = System.nanoTime();
while (nextQueryNanos > nanoTime - NANO_DELTA) {
Thread.sleep(Math.max((int) ((nextQueryNanos - nanoTime) / 1E6), 1));
@@ -671,17 +680,19 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
int numTimesExecuted = 0;
double currentQPS = startQPS;
long queryIntervalNanos = (long) (1E9 / currentQPS);
- while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
+ boolean timeoutReached = false;
+ while (!timeoutReached && (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries)) {
if (executorService.isTerminated()) {
throw new IllegalThreadStateException("All threads got exception and already dead.");
}
- if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
- throw new TimeoutException("Timeout of " + timeout + " sec reached. Aborting");
- }
-
long nextQueryNanos = System.nanoTime();
for (String query : queries) {
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.info("Timeout of {} sec reached. Aborting", timeout);
+ timeoutReached = true;
+ break;
+ }
long nanoTime = System.nanoTime();
while (nextQueryNanos > nanoTime - NANO_DELTA) {
Thread.sleep(Math.max((int) ((nextQueryNanos - nanoTime) / 1E6), 1));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org