You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ap...@apache.org on 2020/10/13 18:51:28 UTC

[incubator-pinot] branch query-runner-sampling-mode created (now b635bac)

This is an automated email from the ASF dual-hosted git repository.

apucher pushed a change to branch query-runner-sampling-mode
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at b635bac  add query runner support for query file resampling

This branch includes the following new commits:

     new b635bac  add query runner support for query file resampling

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: add query runner support for query file resampling

Posted by ap...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

apucher pushed a commit to branch query-runner-sampling-mode
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit b635bac6a63d6229d33d0f9ef45761794fb8b09b
Author: Alexander Pucher <al...@alexpucher.com>
AuthorDate: Tue Oct 13 11:51:03 2020 -0700

    add query runner support for query file resampling
---
 .../org/apache/pinot/tools/perf/QueryRunner.java   | 98 ++++++++++++++--------
 1 file changed, 62 insertions(+), 36 deletions(-)

diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
index d87996d..fb02183 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/QueryRunner.java
@@ -22,12 +22,16 @@ import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
 import java.io.FileInputStream;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
@@ -49,6 +53,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
   private String _mode;
   @Option(name = "-queryFile", required = true, metaVar = "<String>", usage = "Path to query file.")
   private String _queryFile;
+  @Option(name = "-queryMode", required = false, metaVar = "<String>", usage = "Mode of query generator (list|sample).")
+  private String _queryMode = QueryMode.LIST.toString();
+  @Option(name = "-queryCount", required = false, metaVar = "<int>", usage = "Number of queries to run (default 0 = all).")
+  private int _queryCount = 0;
   @Option(name = "-numTimesToRunQueries", required = false, metaVar = "<int>", usage = "Number of times to run all queries in the query file, 0 means infinite times (default 1).")
   private int _numTimesToRunQueries = 1;
   @Option(name = "-reportIntervalMs", required = false, metaVar = "<int>", usage = "Interval in milliseconds to report simple statistics (default 3000).")
@@ -72,6 +80,11 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
   @Option(name = "-help", required = false, help = true, aliases = {"-h", "--h", "--help"}, usage = "Print this message.")
   private boolean _help;
 
+  private enum QueryMode {
+    LIST,
+    SAMPLE
+  }
+
   @Override
   public boolean getHelp() {
     return _help;
@@ -125,12 +138,17 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     conf.setStartBroker(false);
     conf.setStartServer(false);
 
+    Stream<String> queries = makeQueries(
+            IOUtils.readLines(new FileInputStream(_queryFile)),
+            QueryMode.valueOf(_queryMode.toUpperCase()),
+            _queryCount);
+
     switch (_mode) {
       case "singleThread":
         LOGGER.info("MODE singleThread with queryFile: {}, numTimesToRunQueries: {}, reportIntervalMs: {}, "
                 + "numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries, _reportIntervalMs,
             _numIntervalsToReportAndClearStatistics);
-        singleThreadedQueryRunner(conf, _queryFile, _numTimesToRunQueries, _reportIntervalMs,
+        singleThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _reportIntervalMs,
             _numIntervalsToReportAndClearStatistics);
         break;
       case "multiThreads":
@@ -142,7 +160,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         LOGGER.info("MODE multiThreads with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, "
                 + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries,
             _numThreads, _reportIntervalMs, _numIntervalsToReportAndClearStatistics);
-        multiThreadedQueryRunner(conf, _queryFile, _numTimesToRunQueries, _numThreads, _reportIntervalMs,
+        multiThreadedQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _reportIntervalMs,
             _numIntervalsToReportAndClearStatistics);
         break;
       case "targetQPS":
@@ -159,7 +177,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         LOGGER.info("MODE targetQPS with queryFile: {}, numTimesToRunQueries: {}, numThreads: {}, startQPS: {}, "
                 + "reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}", _queryFile, _numTimesToRunQueries,
             _numThreads, _startQPS, _reportIntervalMs, _numIntervalsToReportAndClearStatistics);
-        targetQPSQueryRunner(conf, _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
+        targetQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _startQPS, _reportIntervalMs,
             _numIntervalsToReportAndClearStatistics);
         break;
       case "increasingQPS":
@@ -188,7 +206,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
                 + "deltaQPS: {}, reportIntervalMs: {}, numIntervalsToReportAndClearStatistics: {}, "
                 + "numIntervalsToIncreaseQPS: {}", _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _deltaQPS,
             _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS);
-        increasingQPSQueryRunner(conf, _queryFile, _numTimesToRunQueries, _numThreads, _startQPS, _deltaQPS,
+        increasingQPSQueryRunner(conf, queries, _numTimesToRunQueries, _numThreads, _startQPS, _deltaQPS,
             _reportIntervalMs, _numIntervalsToReportAndClearStatistics, _numIntervalsToIncreaseQPS);
         break;
       default:
@@ -206,21 +224,16 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * <p>Query runner will stop when all queries in the query file has been executed number of times configured.
    *
    * @param conf perf benchmark driver config.
-   * @param queryFile query file.
+   * @param queries query stream.
    * @param numTimesToRunQueries number of times to run all queries in the query file, 0 means infinite times.
    * @param reportIntervalMs report interval in milliseconds.
    * @param numIntervalsToReportAndClearStatistics number of report intervals to report detailed statistics and clear
    *                                               them, 0 means never.
    * @throws Exception
    */
-  public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numTimesToRunQueries,
+  public static void singleThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
       throws Exception {
-    List<String> queries;
-    try (FileInputStream input = new FileInputStream(new File(queryFile))) {
-      queries = IOUtils.readLines(input);
-    }
-
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     int numQueriesExecuted = 0;
     long totalBrokerTime = 0L;
@@ -232,7 +245,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     int numReportIntervals = 0;
     int numTimesExecuted = 0;
     while (numTimesToRunQueries == 0 || numTimesExecuted < numTimesToRunQueries) {
-      for (String query : queries) {
+      Iterator<String> itQuery = queries.iterator();
+      while (itQuery.hasNext()) {
+        String query = itQuery.next();
+
         JsonNode response = driver.postQuery(query);
         numQueriesExecuted++;
         long brokerTime = response.get("timeUsedMs").asLong();
@@ -290,7 +306,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * <p>Query runner will stop when all queries in the query file has been executed number of times configured.
    *
    * @param conf perf benchmark driver config.
-   * @param queryFile query file.
+   * @param queries query stream.
    * @param numTimesToRunQueries number of times to run all queries in the query file, 0 means infinite times.
    * @param numThreads number of threads sending queries.
    * @param reportIntervalMs report interval in milliseconds.
@@ -298,14 +314,9 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    *                                               them, 0 means never.
    * @throws Exception
    */
-  public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numTimesToRunQueries,
+  public static void multiThreadedQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int numThreads, int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
       throws Exception {
-    List<String> queries;
-    try (FileInputStream input = new FileInputStream(new File(queryFile))) {
-      queries = IOUtils.readLines(input);
-    }
-
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     ConcurrentLinkedQueue<String> queryQueue = new ConcurrentLinkedQueue<>();
     AtomicInteger numQueriesExecuted = new AtomicInteger(0);
@@ -330,7 +341,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
-      for (String query : queries) {
+      Iterator<String> itQuery = queries.iterator();
+      while (itQuery.hasNext()) {
+        String query = itQuery.next();
+
         queryQueue.add(query);
 
         // Keep 20 queries inside the query queue.
@@ -393,7 +407,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * <p>Query runner will stop when all queries in the query file has been executed number of times configured.
    *
    * @param conf perf benchmark driver config.
-   * @param queryFile query file.
+   * @param queries query stream.
    * @param numTimesToRunQueries number of times to run all queries in the query file, 0 means infinite times.
    * @param numThreads number of threads sending queries.
    * @param startQPS start QPS (target QPS).
@@ -402,14 +416,9 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    *                                               them, 0 means never.
    * @throws Exception
    */
-  public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numTimesToRunQueries,
+  public static void targetQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int numThreads, double startQPS, int reportIntervalMs, int numIntervalsToReportAndClearStatistics)
       throws Exception {
-    List<String> queries;
-    try (FileInputStream input = new FileInputStream(new File(queryFile))) {
-      queries = IOUtils.readLines(input);
-    }
-
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     ConcurrentLinkedQueue<String> queryQueue = new ConcurrentLinkedQueue<>();
     AtomicInteger numQueriesExecuted = new AtomicInteger(0);
@@ -435,7 +444,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
-      for (String query : queries) {
+      Iterator<String> itQuery = queries.iterator();
+      while (itQuery.hasNext()) {
+        String query = itQuery.next();
+
         queryQueue.add(query);
         Thread.sleep(queryIntervalMs);
 
@@ -495,7 +507,7 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * <p>Query runner will stop when all queries in the query file has been executed number of times configured.
    *
    * @param conf perf benchmark driver config.
-   * @param queryFile query file.
+   * @param queries query stream.
    * @param numTimesToRunQueries number of times to run all queries in the query file, 0 means infinite times.
    * @param numThreads number of threads sending queries.
    * @param startQPS start QPS.
@@ -507,15 +519,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
    * @throws Exception
    */
 
-  public static void increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, String queryFile, int numTimesToRunQueries,
+  public static void increasingQPSQueryRunner(PerfBenchmarkDriverConf conf, Stream<String> queries, int numTimesToRunQueries,
       int numThreads, double startQPS, double deltaQPS, int reportIntervalMs,
       int numIntervalsToReportAndClearStatistics, int numIntervalsToIncreaseQPS)
       throws Exception {
-    List<String> queries;
-    try (FileInputStream input = new FileInputStream(new File(queryFile))) {
-      queries = IOUtils.readLines(input);
-    }
-
     PerfBenchmarkDriver driver = new PerfBenchmarkDriver(conf);
     ConcurrentLinkedQueue<String> queryQueue = new ConcurrentLinkedQueue<>();
     AtomicInteger numQueriesExecuted = new AtomicInteger(0);
@@ -542,7 +549,10 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
         return;
       }
 
-      for (String query : queries) {
+      Iterator<String> itQuery = queries.iterator();
+      while (itQuery.hasNext()) {
+        String query = itQuery.next();
+
         queryQueue.add(query);
         Thread.sleep(queryIntervalMs);
 
@@ -622,6 +632,22 @@ public class QueryRunner extends AbstractBaseCommand implements Command {
     }
   }
 
+  private static Stream<String> makeQueries(List<String> inputs, QueryMode queryMode, int queryCount) {
+    queryCount = queryCount > 0 ? queryCount : inputs.size();
+
+    switch (queryMode) {
+      case LIST:
+        return inputs.stream().limit(queryCount);
+
+      case SAMPLE:
+        Random r = new Random(inputs.hashCode()); // anything deterministic will do
+        return r.ints(queryCount, 0, inputs.size()).boxed().map(inputs::get);
+
+      default:
+        throw new IllegalArgumentException(String.format("Unsupported queryMode '%s", queryMode));
+    }
+  }
+
   private static void reportAndClearStatistics(AtomicInteger numQueriesExecuted, AtomicLong totalBrokerTime,
       AtomicLong totalClientTime, List<Statistics> statisticsList) {
     numQueriesExecuted.set(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org