You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/12/14 17:01:24 UTC

[pinot] branch master updated: [multistage][test] add multi-server, multi-segment test (#9943)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f54e3e73ff [multistage][test] add multi-server, multi-segment test (#9943)
f54e3e73ff is described below

commit f54e3e73ffca1f919488e1b6413334b535acb194
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Dec 14 09:01:17 2022 -0800

    [multistage][test] add multi-server, multi-segment test (#9943)
    
    * randomized test
    
    * [stash] use line breaker and partitionColumn
    
    * adding line breaker and example test
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../pinot/query/runtime/QueryRunnerTestBase.java   | 26 +++++++++----
 .../runtime/queries/ResourceBasedQueriesTest.java  | 45 ++++++++++++++++++----
 .../src/test/resources/queries/BasicQuery.json     |  8 +++-
 3 files changed, 64 insertions(+), 15 deletions(-)

diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index a0a5cedf9d..65fe73e29d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -62,7 +62,9 @@ import org.testng.Assert;
 
 public abstract class QueryRunnerTestBase extends QueryTestSet {
   protected static final double DOUBLE_CMP_EPSILON = 0.0001d;
-
+  protected static final String SEGMENT_BREAKER_KEY = "__SEGMENT_BREAKER_KEY__";
+  protected static final String SEGMENT_BREAKER_STR = "------";
+  protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow();
   protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
   protected QueryEnvironment _queryEnvironment;
   protected String _reducerHostname;
@@ -70,6 +72,10 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
   protected Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>();
   protected GrpcMailboxService _mailboxService;
 
+  static {
+    SEGMENT_BREAKER_ROW.putValue(SEGMENT_BREAKER_KEY, SEGMENT_BREAKER_STR);
+  }
+
   // --------------------------------------------------------------------------
   // QUERY UTILS
   // --------------------------------------------------------------------------
@@ -210,13 +216,17 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
     for (int rowId = 0; rowId < value.size(); rowId++) {
       GenericRow row = new GenericRow();
       List<Object> rawRow = value.get(rowId);
-      int colId = 0;
-      for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
-        row.putValue(columnAndType._name, rawRow.get(colId++));
+      if (rawRow.size() == 1 && SEGMENT_BREAKER_STR.equals(rawRow.get(0))) {
+        result.add(SEGMENT_BREAKER_ROW);
+      } else {
+        int colId = 0;
+        for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
+          row.putValue(columnAndType._name, rawRow.get(colId++));
+        }
+        // TODO: ts is built-in, but we should allow user overwrite
+        row.putValue("ts", System.currentTimeMillis());
+        result.add(row);
       }
-      // TODO: ts is built-in, but we should allow user overwrite
-      row.putValue("ts", System.currentTimeMillis());
-      result.add(row);
     }
     return result;
   }
@@ -327,6 +337,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
       public List<ColumnAndType> _schema;
       @JsonProperty("inputs")
       public List<List<Object>> _inputs;
+      @JsonProperty("partitionColumns")
+      public List<String> _partitionColumns;
     }
 
     @JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5910416ac4..78553b5e1b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -47,6 +48,7 @@ import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
 import org.apache.pinot.query.testutils.QueryTestUtils;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.testng.Assert;
@@ -60,20 +62,20 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
   private static final ObjectMapper MAPPER = new ObjectMapper();
   private static final Pattern TABLE_NAME_REPLACE_PATTERN = Pattern.compile("\\{([\\w\\d]+)\\}");
   private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
+  private static final Random RANDOM = new Random(42);
 
   @BeforeClass
   public void setUp()
       throws Exception {
     DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+
     // Setting up mock server factories.
+    // All test data are loaded upfront b/c the mock server and brokers needs to be in sync.
     MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1");
     MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2");
     // Setting up H2 for validation
     setH2Connection();
 
-    // TODO: all test data are loaded upfront b/c the mock server and brokers needs to be in sync.
-    // doing it dynamically should be our next step.
-
     // Scan through all the test cases.
     for (Map.Entry<String, QueryTestCase> testCaseEntry : getTestCases().entrySet()) {
       String testCaseName = testCaseEntry.getKey();
@@ -86,16 +88,45 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
       Map<String, Schema> schemaMap = new HashMap<>();
       for (Map.Entry<String, QueryTestCase.Table> tableEntry : testCase._tables.entrySet()) {
         String tableName = testCaseName + "_" + tableEntry.getKey();
-        // TODO: able to choose table type, now default to OFFLINE
+        // Testing only OFFLINE table b/c Hybrid table test is a special case to test separately.
         String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
         org.apache.pinot.spi.data.Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema);
         schemaMap.put(tableName, pinotSchema);
         factory1.registerTable(pinotSchema, tableNameWithType);
         factory2.registerTable(pinotSchema, tableNameWithType);
         List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema;
-        // TODO: able to select add rows to server1 or server2 (now default server1)
-        // TODO: able to select add rows to existing segment or create new one (now default create one segment)
-        factory1.addSegment(tableNameWithType, toRow(columnAndTypes, tableEntry.getValue()._inputs));
+        List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs);
+
+        // generate segments and dump into server1 and server2
+        List<String> partitionColumns = tableEntry.getValue()._partitionColumns;
+
+        List<GenericRow> rows1 = new ArrayList<>();
+        List<GenericRow> rows2 = new ArrayList<>();
+
+        for (GenericRow row : genericRows) {
+          if (row == SEGMENT_BREAKER_ROW) {
+            factory1.addSegment(tableNameWithType, rows1);
+            factory2.addSegment(tableNameWithType, rows2);
+            rows1 = new ArrayList<>();
+            rows2 = new ArrayList<>();
+          } else {
+            long partition = 0;
+            if (partitionColumns == null) {
+              partition = RANDOM.nextInt(2);
+            } else {
+              for (String field : partitionColumns) {
+                partition = (partition + ((GenericRow) row).getValue(field).hashCode()) % 42;
+              }
+            }
+            if (partition % 2 == 0) {
+              rows1.add(row);
+            } else {
+              rows2.add(row);
+            }
+          }
+        }
+        factory1.addSegment(tableNameWithType, rows1);
+        factory2.addSegment(tableNameWithType, rows2);
       }
 
       boolean anyHaveOutput = testCase._queries.stream().anyMatch(q -> q._outputs != null && !q._outputs.isEmpty());
diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
index b664982c40..41fb23aa62 100644
--- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
@@ -39,7 +39,13 @@
         ],
         "inputs": [
           ["foo", 1, 3.1416],
-          ["bar", 2, 2.7183]
+          ["foo", 3, 3.1416],
+          ["bar", 2, 2.7183],
+          ["------"],
+          ["bar", 4, 2.7183]
+        ],
+        "partitionColumns": [
+          "col1", "col2"
         ]
       }
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org