You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/10/22 20:14:13 UTC
[incubator-pinot] branch master updated: add execution timeout and
fix exception stats (#6177)
This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 669bd59 add execution timeout and fix exception stats (#6177)
669bd59 is described below
commit 669bd595baa671f2420476da94524635ab1d14db
Author: Alexander Pucher <ap...@apache.org>
AuthorDate: Thu Oct 22 13:13:54 2020 -0700
add execution timeout and fix exception stats (#6177)
We add a "-timeout" param to the query runner to support time-limited execution, e.g. for performance benchmarks.
This PR also fixes a display issue with query exception statistics not being reset on every interval.
---
.../org/apache/pinot/tools/perf/QueryRunner.java | 82 +++++++++++++++-------
1 file changed, 57 insertions(+), 25 deletions(-)
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
index 2ced758..d9d6d6a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
@@ -77,6 +77,8 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
private int _brokerPort = 8099;
@Option(name = "-queueDepth", required = false, metaVar = "<int>", usage = "Queue size limit for multi-threaded execution (default 64).")
private int _queueDepth = 64;
+ @Option(name = "-timeout", required = false, metaVar = "<long>", usage = "Timeout in milliseconds for completing all queries (default: unlimited).")
+ private long _timeout = 0;
@Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
private boolean _help;
@@ -151,10 +153,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
switch (_mode) {
case "singleThread":
LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, "
- + "numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries, _reportIntervalMs,
- _numIntervalsToReportAndClearStatistics);
+ + "numIntervalsToReportAndClearStatistics: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
+ _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs,
- _numIntervalsToReportAndClearStatistics);
+ _numIntervalsToReportAndClearStatistics, _timeout);
break;
case "multiThreads":
if (_numThreads <= 0) {
@@ -163,11 +165,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
break;
}
LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, "
- + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile,
- _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
- _queueDepth);
+ + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
+ _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
+ _queueDepth, _timeout);
multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _reportIntervalMs,
- _numIntervalsToReportAndClearStatistics);
+ _numIntervalsToReportAndClearStatistics, _timeout);
break;
case "targetQPS":
if (_numThreads <= 0) {
@@ -181,11 +183,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
break;
}
LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
- + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}", _queryFile,
- _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
- _queueDepth);
+ + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, queueDepth: {}, timeout: {}",
+ _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
+ _numIntervalsToReportAndClearStatistics, _queueDepth, _timeout);
targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS,
- _reportIntervalMs, _numIntervalsToReportAndClearStatistics);
+ _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _timeout);
break;
case "increasingQPS":
if (_numThreads <= 0) {
@@ -211,11 +213,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
}
LOGGER.info("MODE increasingQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
+ "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, "
- + "numIntervalsToIncreaseQPS: {}, queueDepth: {}", _queryFile, _numTimesToRunQueries, _numThreads,
- _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
- _numIntervalsToIncreaseQPS, _queueDepth);
+ + "numIntervalsToIncreaseQPS: {}, queueDepth: {}, timeout: {}", _queryFile, _numTimesToRunQueries,
+ _numThreads, _startQPS, _deltaQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics,
+ _numIntervalsToIncreaseQPS, _queueDepth, _timeout);
increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _queueDepth, _startQPS, _deltaQPS,
- _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS);
+ _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS, _timeout);
break;
default:
LOGGER.error("Invalid mode: {}", _mode);
@@ -237,10 +239,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
* @param reportIntervalMs report interval in milliseconds.
* @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
* them, 0 means never.
+ * @param timeout timeout in milliseconds for completing all queries.
* @throws Exception
*/
public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
- int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
+ int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout)
throws Exception {
PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
int numQueriesExecuted = 0;
@@ -249,6 +252,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
long totalClientTime = 0L;
List<Statistics> statisticsList = Collections.singletonList(new Statistics(CLIENT_TIME_STATISTICS));
+ final long startTimeAbsolute = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long reportStartTime = startTime;
int numReportIntervals = 0;
@@ -256,6 +260,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
Iterator<String> itQuery = queries.iterator();
while (itQuery.hasNext()) {
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+ return;
+ }
+
String query = itQuery.next();
JsonNode response = driver.postQuery(query);
@@ -284,6 +293,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
numReportIntervals = 0;
startTime = currentTime;
numQueriesExecuted = 0;
+ numExceptions = 0;
totalBrokerTime = 0L;
totalClientTime = 0L;
for (Statistics statistics : statisticsList) {
@@ -325,10 +335,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
* @param reportIntervalMs report interval in milliseconds.
* @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
* them, 0 means never.
+ * @param timeout timeout in milliseconds for completing all queries
* @throws Exception
*/
public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
- int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
+ int numThreads, int queueDepth, int reportIntervalMs, int numIntervalsToReportAndClearStatistics, long timeout)
throws Exception {
PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -346,6 +357,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
}
executorService.shutdown();
+ final long startTimeAbsolute = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long reportStartTime = startTime;
int numReportIntervals = 0;
@@ -356,6 +368,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
return;
}
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+ return;
+ }
+
Iterator<String> itQuery = queries.iterator();
while (itQuery.hasNext()) {
String query = itQuery.next();
@@ -381,7 +398,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
== numIntervalsToReportAndClearStatistics)) {
numReportIntervals = 0;
startTime = currentTime;
- reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+ reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
}
}
}
@@ -428,11 +445,12 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
* @param reportIntervalMs report interval in milliseconds.
* @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
* them, 0 means never.
+ * @param timeout timeout in milliseconds for completing all queries
* @throws Exception
*/
public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
int numThreads, int queueDepth, double startQPS, int reportIntervalMs,
- int numIntervalsToReportAndClearStatistics)
+ int numIntervalsToReportAndClearStatistics, long timeout)
throws Exception {
PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -450,6 +468,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
}
executorService.shutdown();
+ final long startTimeAbsolute = System.currentTimeMillis();
final int queryIntervalNanos = (int) (1E9 / startQPS);
long startTime = System.currentTimeMillis();
long reportStartTime = startTime;
@@ -461,6 +480,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
return;
}
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+ return;
+ }
+
long nextQueryNanos = System.nanoTime();
Iterator<String> itQuery = queries.iterator();
while (itQuery.hasNext()) {
@@ -495,7 +519,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
== numIntervalsToReportAndClearStatistics)) {
numReportIntervals = 0;
startTime = currentTime;
- reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+ reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
}
}
}
@@ -544,13 +568,14 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
* @param reportIntervalMs report interval in milliseconds.
* @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
* them, 0 means never.
+ * @param timeout timeout in milliseconds for completing all queries.
* @param numIntervalsToIncreaseQPS number of intervals to increase QPS.
* @throws Exception
*/
public static void increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
int numThreads, int queueDepth, double startQPS, double deltaQPS, int reportIntervalMs,
- int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS)
+ int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS, long timeout)
throws Exception {
PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
Queue<String> queryQueue = new LinkedBlockingDeque<>(queueDepth);
@@ -568,6 +593,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
}
executorService.shutdown();
+ final long startTimeAbsolute = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long reportStartTime = startTime;
int numReportIntervals = 0;
@@ -580,6 +606,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
return;
}
+ if (timeout > 0 && System.currentTimeMillis() - startTimeAbsolute > timeout) {
+ LOGGER.warn("Timeout of {} sec reached. Aborting", timeout);
+ return;
+ }
+
long nextQueryNanos = System.nanoTime();
Iterator<String> itQuery = queries.iterator();
while (itQuery.hasNext()) {
@@ -616,7 +647,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
totalClientTime.get() / (double) numQueriesExecutedInt, queryQueue.size());
numReportIntervals = 0;
startTime = currentTime;
- reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+ reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
currentQPS += deltaQPS;
queryIntervalNanos = (long) (1E9 / currentQPS);
@@ -633,7 +664,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
if ((numIntervalsToReportAndClearStatistics != 0) && (
numReportIntervals % numIntervalsToReportAndClearStatistics == 0)) {
startTime = currentTime;
- reportAndClearStatistics(numQueriesExecuted, totalBrokerTime, totalClientTime, statisticsList);
+ reportAndClearStatistics(numQueriesExecuted, numExceptions, totalBrokerTime, totalClientTime, statisticsList);
}
}
}
@@ -679,9 +710,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
}
}
- private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicLong totalBrokerTime,
- AtomicLong totalClientTime, List<Statistics> statisticsList) {
+ private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicInteger numExceptions,
+ AtomicLong totalBrokerTime, AtomicLong totalClientTime, List<Statistics> statisticsList) {
numQueriesExecuted.set(0);
+ numExceptions.set(0);
totalBrokerTime.set(0L);
totalClientTime.set(0L);
for (Statistics statistics : statisticsList) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org