You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/12/14 17:01:24 UTC
[pinot] branch master updated: [multistage][test] add multi-server, multi-segment test (#9943)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f54e3e73ff [multistage][test] add multi-server, multi-segment test (#9943)
f54e3e73ff is described below
commit f54e3e73ffca1f919488e1b6413334b535acb194
Author: Rong Rong <ro...@apache.org>
AuthorDate: Wed Dec 14 09:01:17 2022 -0800
[multistage][test] add multi-server, multi-segment test (#9943)
* randomized test
* [stash] use line breaker and partitionColumn
* adding line breaker and example test
Co-authored-by: Rong Rong <ro...@startree.ai>
---
.../pinot/query/runtime/QueryRunnerTestBase.java | 26 +++++++++----
.../runtime/queries/ResourceBasedQueriesTest.java | 45 ++++++++++++++++++----
.../src/test/resources/queries/BasicQuery.json | 8 +++-
3 files changed, 64 insertions(+), 15 deletions(-)
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
index a0a5cedf9d..65fe73e29d 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTestBase.java
@@ -62,7 +62,9 @@ import org.testng.Assert;
public abstract class QueryRunnerTestBase extends QueryTestSet {
protected static final double DOUBLE_CMP_EPSILON = 0.0001d;
-
+ protected static final String SEGMENT_BREAKER_KEY = "__SEGMENT_BREAKER_KEY__";
+ protected static final String SEGMENT_BREAKER_STR = "------";
+ protected static final GenericRow SEGMENT_BREAKER_ROW = new GenericRow();
protected static final Random RANDOM_REQUEST_ID_GEN = new Random();
protected QueryEnvironment _queryEnvironment;
protected String _reducerHostname;
@@ -70,6 +72,10 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
protected Map<ServerInstance, QueryServerEnclosure> _servers = new HashMap<>();
protected GrpcMailboxService _mailboxService;
+ static {
+ SEGMENT_BREAKER_ROW.putValue(SEGMENT_BREAKER_KEY, SEGMENT_BREAKER_STR);
+ }
+
// --------------------------------------------------------------------------
// QUERY UTILS
// --------------------------------------------------------------------------
@@ -210,13 +216,17 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
for (int rowId = 0; rowId < value.size(); rowId++) {
GenericRow row = new GenericRow();
List<Object> rawRow = value.get(rowId);
- int colId = 0;
- for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
- row.putValue(columnAndType._name, rawRow.get(colId++));
+ if (rawRow.size() == 1 && SEGMENT_BREAKER_STR.equals(rawRow.get(0))) {
+ result.add(SEGMENT_BREAKER_ROW);
+ } else {
+ int colId = 0;
+ for (QueryTestCase.ColumnAndType columnAndType : columnAndTypes) {
+ row.putValue(columnAndType._name, rawRow.get(colId++));
+ }
+ // TODO: ts is built-in, but we should allow user overwrite
+ row.putValue("ts", System.currentTimeMillis());
+ result.add(row);
}
- // TODO: ts is built-in, but we should allow user overwrite
- row.putValue("ts", System.currentTimeMillis());
- result.add(row);
}
return result;
}
@@ -327,6 +337,8 @@ public abstract class QueryRunnerTestBase extends QueryTestSet {
public List<ColumnAndType> _schema;
@JsonProperty("inputs")
public List<List<Object>> _inputs;
+ @JsonProperty("partitionColumns")
+ public List<String> _partitionColumns;
}
@JsonIgnoreProperties(ignoreUnknown = true)
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
index 5910416ac4..78553b5e1b 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/ResourceBasedQueriesTest.java
@@ -32,6 +32,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -47,6 +48,7 @@ import org.apache.pinot.query.testutils.MockInstanceDataManagerFactory;
import org.apache.pinot.query.testutils.QueryTestUtils;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.testng.Assert;
@@ -60,20 +62,20 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Pattern TABLE_NAME_REPLACE_PATTERN = Pattern.compile("\\{([\\w\\d]+)\\}");
private static final String QUERY_TEST_RESOURCE_FOLDER = "queries";
+ private static final Random RANDOM = new Random(42);
@BeforeClass
public void setUp()
throws Exception {
DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4);
+
// Setting up mock server factories.
+ // All test data are loaded upfront b/c the mock server and brokers needs to be in sync.
MockInstanceDataManagerFactory factory1 = new MockInstanceDataManagerFactory("server1");
MockInstanceDataManagerFactory factory2 = new MockInstanceDataManagerFactory("server2");
// Setting up H2 for validation
setH2Connection();
- // TODO: all test data are loaded upfront b/c the mock server and brokers needs to be in sync.
- // doing it dynamically should be our next step.
-
// Scan through all the test cases.
for (Map.Entry<String, QueryTestCase> testCaseEntry : getTestCases().entrySet()) {
String testCaseName = testCaseEntry.getKey();
@@ -86,16 +88,45 @@ public class ResourceBasedQueriesTest extends QueryRunnerTestBase {
Map<String, Schema> schemaMap = new HashMap<>();
for (Map.Entry<String, QueryTestCase.Table> tableEntry : testCase._tables.entrySet()) {
String tableName = testCaseName + "_" + tableEntry.getKey();
- // TODO: able to choose table type, now default to OFFLINE
+ // Testing only OFFLINE table b/c Hybrid table test is a special case to test separately.
String tableNameWithType = TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(tableName);
org.apache.pinot.spi.data.Schema pinotSchema = constructSchema(tableName, tableEntry.getValue()._schema);
schemaMap.put(tableName, pinotSchema);
factory1.registerTable(pinotSchema, tableNameWithType);
factory2.registerTable(pinotSchema, tableNameWithType);
List<QueryTestCase.ColumnAndType> columnAndTypes = tableEntry.getValue()._schema;
- // TODO: able to select add rows to server1 or server2 (now default server1)
- // TODO: able to select add rows to existing segment or create new one (now default create one segment)
- factory1.addSegment(tableNameWithType, toRow(columnAndTypes, tableEntry.getValue()._inputs));
+ List<GenericRow> genericRows = toRow(columnAndTypes, tableEntry.getValue()._inputs);
+
+ // generate segments and dump into server1 and server2
+ List<String> partitionColumns = tableEntry.getValue()._partitionColumns;
+
+ List<GenericRow> rows1 = new ArrayList<>();
+ List<GenericRow> rows2 = new ArrayList<>();
+
+ for (GenericRow row : genericRows) {
+ if (row == SEGMENT_BREAKER_ROW) {
+ factory1.addSegment(tableNameWithType, rows1);
+ factory2.addSegment(tableNameWithType, rows2);
+ rows1 = new ArrayList<>();
+ rows2 = new ArrayList<>();
+ } else {
+ long partition = 0;
+ if (partitionColumns == null) {
+ partition = RANDOM.nextInt(2);
+ } else {
+ for (String field : partitionColumns) {
+ partition = (partition + ((GenericRow) row).getValue(field).hashCode()) % 42;
+ }
+ }
+ if (partition % 2 == 0) {
+ rows1.add(row);
+ } else {
+ rows2.add(row);
+ }
+ }
+ }
+ factory1.addSegment(tableNameWithType, rows1);
+ factory2.addSegment(tableNameWithType, rows2);
}
boolean anyHaveOutput = testCase._queries.stream().anyMatch(q -> q._outputs != null && !q._outputs.isEmpty());
diff --git a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
index b664982c40..41fb23aa62 100644
--- a/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
+++ b/pinot-query-runtime/src/test/resources/queries/BasicQuery.json
@@ -39,7 +39,13 @@
],
"inputs": [
["foo", 1, 3.1416],
- ["bar", 2, 2.7183]
+ ["foo", 3, 3.1416],
+ ["bar", 2, 2.7183],
+ ["------"],
+ ["bar", 4, 2.7183]
+ ],
+ "partitionColumns": [
+ "col1", "col2"
]
}
},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org