You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/06 17:51:07 UTC
(pinot) branch master updated: Fluent test framework (#12215)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new e99684328d Fluent test framework (#12215)
e99684328d is described below
commit e99684328dd163d7372057fb837037393b0a7248
Author: Gonzalo Ortiz Jaureguizar <go...@users.noreply.github.com>
AuthorDate: Tue Feb 6 18:51:01 2024 +0100
Fluent test framework (#12215)
---
.../function/AbstractAggregationFunctionTest.java | 121 ++++++
.../function/CountAggregationFunctionTest.java | 158 ++++++++
.../org/apache/pinot/queries/FluentQueryTest.java | 434 +++++++++++++++++++++
3 files changed, 713 insertions(+)
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AbstractAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AbstractAggregationFunctionTest.java
new file mode 100644
index 0000000000..122374d224
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/AbstractAggregationFunctionTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.config.table.FieldConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+public abstract class AbstractAggregationFunctionTest {
+
+ protected File _baseDir;
+
+ private static final FieldSpec.DataType[] VALID_DATA_TYPES = new FieldSpec.DataType[] {
+ FieldSpec.DataType.INT,
+ FieldSpec.DataType.LONG,
+ FieldSpec.DataType.FLOAT,
+ FieldSpec.DataType.DOUBLE,
+ FieldSpec.DataType.STRING,
+ FieldSpec.DataType.BYTES,
+ FieldSpec.DataType.BIG_DECIMAL,
+ FieldSpec.DataType.TIMESTAMP,
+ FieldSpec.DataType.BOOLEAN
+ };
+
+ protected static final Map<FieldSpec.DataType, Schema> SINGLE_FIELD_NULLABLE_SCHEMAS = Arrays.stream(VALID_DATA_TYPES)
+ .collect(Collectors.toMap(dt -> dt, dt -> new Schema.SchemaBuilder()
+ .setSchemaName("testTable")
+ .setEnableColumnBasedNullHandling(true)
+ .addDimensionField("myField", dt, f -> f.setNullable(true))
+ .build()));
+
+ protected static final TableConfig SINGLE_FIELD_TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE)
+ .setTableName("testTable")
+ .build();
+
+ protected FluentQueryTest.DeclaringTable givenSingleNullableFieldTable(FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled) {
+ return givenSingleNullableFieldTable(dataType, nullHandlingEnabled, null);
+ }
+
+ protected FluentQueryTest.DeclaringTable givenSingleNullableFieldTable(FieldSpec.DataType dataType,
+ boolean nullHandlingEnabled, @Nullable Consumer<FieldConfig.Builder> customize) {
+ TableConfig tableConfig;
+ if (customize == null) {
+ tableConfig = SINGLE_FIELD_TABLE_CONFIG;
+ } else {
+ TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable");
+ FieldConfig.Builder fieldConfigBuilder = new FieldConfig.Builder("myField");
+ customize.accept(fieldConfigBuilder);
+ FieldConfig fieldConfig = fieldConfigBuilder.build();
+ builder.setFieldConfigList(Collections.singletonList(fieldConfig));
+
+ tableConfig = builder.build();
+ }
+
+ return FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(nullHandlingEnabled)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(dataType), tableConfig);
+ }
+
+ protected FluentQueryTest.DeclaringTable givenSingleNullableIntFieldTable(boolean nullHandling) {
+ return givenSingleNullableFieldTable(FieldSpec.DataType.INT, nullHandling, null);
+ }
+
+ protected FluentQueryTest.DeclaringTable givenSingleNullableIntFieldTable(boolean nullHandling,
+ @Nullable Consumer<FieldConfig.Builder> customize) {
+ return givenSingleNullableFieldTable(FieldSpec.DataType.INT, nullHandling, customize);
+ }
+
+ @BeforeClass
+ void createBaseDir() {
+ try {
+ _baseDir = Files.createTempDirectory(getClass().getSimpleName()).toFile();
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ @AfterClass
+ void destroyBaseDir()
+ throws IOException {
+ if (_baseDir != null) {
+ FileUtils.deleteDirectory(_baseDir);
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunctionTest.java
new file mode 100644
index 0000000000..5f60e756a1
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/function/CountAggregationFunctionTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.query.aggregation.function;
+
+import org.apache.pinot.queries.FluentQueryTest;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.testng.annotations.Test;
+
+
+public class CountAggregationFunctionTest extends AbstractAggregationFunctionTest {
+
+ @Test
+ public void list() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(false)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[] {1}
+ )
+ .andOnSecondInstance(
+ new Object[] {2},
+ new Object[] {null}
+ )
+ .whenQuery("select myField from testTable order by myField")
+ .thenResultIs("INTEGER",
+ "-2147483648",
+ "1",
+ "2"
+ );
+ }
+
+ @Test
+ public void listNullHandlingEnabled() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(true)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ new Object[] {1}
+ )
+ .andOnSecondInstance(
+ new Object[] {2},
+ new Object[] {null}
+ )
+ .whenQuery("select myField from testTable order by myField")
+ .thenResultIs("INTEGER",
+ "1",
+ "2",
+ "null"
+ );
+ }
+
+ @Test
+ public void countNullWhenHandlingDisabled() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(false)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ "myField",
+ "1"
+ )
+ .andOnSecondInstance(
+ "myField",
+ "2",
+ "null"
+ )
+ .whenQuery("select myField, COUNT(myField) from testTable group by myField order by myField")
+ .thenResultIs("INTEGER | LONG",
+ "-2147483648 | 1",
+ "1 | 1",
+ "2 | 1"
+ );
+ }
+
+
+ @Test
+ public void countNullWhenHandlingEnabled() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(true)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ "myField",
+ "1"
+ )
+ .andOnSecondInstance(
+ "myField",
+ "2",
+ "null"
+ )
+ .whenQuery("select myField, COUNT(myField) from testTable group by myField order by myField")
+ .thenResultIs(
+ "INTEGER | LONG",
+ "1 | 1",
+ "2 | 1",
+ "null | 0"
+ );
+ }
+
+ @Test
+ public void countStarNullWhenHandlingDisabled() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(false)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ "myField",
+ "1"
+ )
+ .andOnSecondInstance(
+ "myField",
+ "2",
+ "null"
+ )
+ .whenQuery("select myField, COUNT(*) from testTable group by myField order by myField")
+ .thenResultIs("INTEGER | LONG",
+ "-2147483648 | 1",
+ "1 | 1",
+ "2 | 1"
+ );
+ }
+
+ @Test
+ public void countStarNullWhenHandlingEnabled() {
+ FluentQueryTest.withBaseDir(_baseDir)
+ .withNullHandling(true)
+ .givenTable(SINGLE_FIELD_NULLABLE_SCHEMAS.get(FieldSpec.DataType.INT), SINGLE_FIELD_TABLE_CONFIG)
+ .onFirstInstance(
+ "myField",
+ "1"
+ )
+ .andOnSecondInstance(
+ "myField",
+ "2",
+ "null"
+ )
+ .whenQuery("select myField, COUNT(*) from testTable group by myField order by myField")
+ .thenResultIs("INTEGER | LONG",
+ "1 | 1",
+ "2 | 1",
+ "null | 1"
+ );;
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/FluentQueryTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/FluentQueryTest.java
new file mode 100644
index 0000000000..ba6d22c429
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FluentQueryTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.testng.Assert;
+
+
+public class FluentQueryTest {
+
+ private final FluentBaseQueriesTest _baseQueriesTest;
+ private final File _baseDir;
+ private final Map<String, String> _extraQueryOptions = new HashMap<>();
+
+ private FluentQueryTest(FluentBaseQueriesTest baseQueriesTest, File baseDir) {
+ _baseQueriesTest = baseQueriesTest;
+ _baseDir = baseDir;
+ }
+
+ public static FluentQueryTest withBaseDir(File baseDir) {
+ return new FluentQueryTest(new FluentBaseQueriesTest(), baseDir);
+ }
+
+ public FluentQueryTest withExtraQueryOptions(Map<String, String> extraQueryOptions) {
+ _extraQueryOptions.clear();
+ _extraQueryOptions.putAll(extraQueryOptions);
+ return this;
+ }
+
+ public FluentQueryTest withNullHandling(boolean enabled) {
+ _extraQueryOptions.put("enableNullHandling", Boolean.toString(enabled));
+ return this;
+ }
+
+ public DeclaringTable givenTable(Schema schema, TableConfig tableConfig) {
+ return new DeclaringTable(_baseQueriesTest, tableConfig, schema, _baseDir, _extraQueryOptions);
+ }
+
+ public static class DeclaringTable {
+ private final FluentBaseQueriesTest _baseQueriesTest;
+ private final TableConfig _tableConfig;
+ private final Schema _schema;
+ private final File _baseDir;
+ private final Map<String, String> _extraQueryOptions;
+
+ DeclaringTable(FluentBaseQueriesTest baseQueriesTest, TableConfig tableConfig, Schema schema, File baseDir,
+ Map<String, String> extraQueryOptions) {
+ _baseQueriesTest = baseQueriesTest;
+ _tableConfig = tableConfig;
+ _schema = schema;
+ _baseDir = baseDir;
+ _extraQueryOptions = extraQueryOptions;
+ }
+
+ public OnFirstInstance onFirstInstance(String... content) {
+ return new OnFirstInstance(_tableConfig, _schema, _baseDir, false, _baseQueriesTest, _extraQueryOptions)
+ .andSegment(content);
+ }
+
+ public OnFirstInstance onFirstInstance(Object[]... content) {
+ return new OnFirstInstance(_tableConfig, _schema, _baseDir, false, _baseQueriesTest, _extraQueryOptions)
+ .andSegment(content);
+ }
+ }
+
+ static class TableWithSegments {
+ protected final TableConfig _tableConfig;
+ protected final Schema _schema;
+ protected final File _indexDir;
+ protected final boolean _onSecondInstance;
+ protected final FluentBaseQueriesTest _baseQueriesTest;
+ protected final List<FakeSegmentContent> _segmentContents = new ArrayList<>();
+ protected final Map<String, String> _extraQueryOptions;
+
+ TableWithSegments(TableConfig tableConfig, Schema schema, File baseDir, boolean onSecondInstance,
+ FluentBaseQueriesTest baseQueriesTest, Map<String, String> extraQueryOptions) {
+ _extraQueryOptions = extraQueryOptions;
+ try {
+ _tableConfig = tableConfig;
+ _schema = schema;
+ _indexDir = Files.createTempDirectory(baseDir.toPath(), schema.getSchemaName()).toFile();
+ _onSecondInstance = onSecondInstance;
+ _baseQueriesTest = baseQueriesTest;
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ }
+
+ TableWithSegments andSegment(String... tableText) {
+ _segmentContents.add(new FakeSegmentContent(_schema, tableText));
+ return this;
+ }
+
+ public TableWithSegments andSegment(Object[]... content) {
+ _segmentContents.add(new FakeSegmentContent(content));
+ return this;
+ }
+
+ protected void processSegments() {
+ List<ImmutableSegment> indexSegments = new ArrayList<>(_segmentContents.size());
+
+ try {
+ for (int i = 0; i < _segmentContents.size(); i++) {
+ FakeSegmentContent segmentContent = _segmentContents.get(i);
+ File inputFile = Files.createTempFile(_indexDir.toPath(), "data", ".csv").toFile();
+ try (CSVPrinter csvPrinter = new CSVPrinter(new FileWriter(inputFile), CSVFormat.DEFAULT)) {
+ for (List<Object> row : segmentContent) {
+ if (row.stream().anyMatch(Objects::isNull)) {
+ List<Object> newRow = row.stream().map(o -> o == null ? "null" : o).collect(Collectors.toList());
+ csvPrinter.printRecord(newRow);
+ } else {
+ csvPrinter.printRecord(row);
+ }
+ }
+ } catch (IOException ex) {
+ throw new UncheckedIOException(ex);
+ }
+ String tableName = _schema.getSchemaName();
+ SegmentGeneratorConfig config =
+ SegmentTestUtils.getSegmentGeneratorConfig(inputFile, FileFormat.CSV, _indexDir, tableName, _tableConfig,
+ _schema);
+ CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
+ String header = String.join(",", _schema.getPhysicalColumnNames());
+ csvRecordReaderConfig.setHeader(header);
+ csvRecordReaderConfig.setSkipHeader(false);
+ csvRecordReaderConfig.setNullStringValue("null");
+ config.setReaderConfig(csvRecordReaderConfig);
+ config.setSegmentNamePostfix(Integer.toString(i));
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+
+ indexSegments.add(ImmutableSegmentLoader.load(new File(_indexDir, driver.getSegmentName()), ReadMode.mmap));
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ if (_onSecondInstance) {
+ _baseQueriesTest._segments2.addAll(indexSegments);
+ } else {
+ _baseQueriesTest._segments1.addAll(indexSegments);
+ }
+ _segmentContents.clear();
+ }
+
+ public QueryExecuted whenQuery(String query) {
+ processSegments();
+ BrokerResponseNative brokerResponse = _baseQueriesTest.getBrokerResponse(query, _extraQueryOptions);
+ return new QueryExecuted(_baseQueriesTest, brokerResponse, _extraQueryOptions);
+ }
+
+ public DeclaringTable givenTable(Schema schema, TableConfig tableConfig) {
+ return new DeclaringTable(_baseQueriesTest, tableConfig, schema, _indexDir.getParentFile(), _extraQueryOptions);
+ }
+ }
+
+ public static class OnFirstInstance extends TableWithSegments {
+ OnFirstInstance(TableConfig tableConfig, Schema schema, File baseDir, boolean onSecondInstance,
+ FluentBaseQueriesTest baseQueriesTest, Map<String, String> extraQueryOptions) {
+ super(tableConfig, schema, baseDir, onSecondInstance, baseQueriesTest, extraQueryOptions);
+ }
+
+ public OnFirstInstance andSegment(Object[]... content) {
+ _segmentContents.add(new FakeSegmentContent(content));
+ return this;
+ }
+
+ public OnFirstInstance andSegment(String... tableText) {
+ super.andSegment(tableText);
+ return this;
+ }
+
+ public OnSecondInstance andOnSecondInstance(Object[]... content) {
+ processSegments();
+ return new OnSecondInstance(
+ _tableConfig, _schema, _indexDir.getParentFile(), !_onSecondInstance, _baseQueriesTest, _extraQueryOptions)
+ .andSegment(content);
+ }
+
+ public OnSecondInstance andOnSecondInstance(String... content) {
+ processSegments();
+ return new OnSecondInstance(
+ _tableConfig, _schema, _indexDir.getParentFile(), !_onSecondInstance, _baseQueriesTest, _extraQueryOptions)
+ .andSegment(content);
+ }
+ }
+
+ public static class OnSecondInstance extends TableWithSegments {
+ OnSecondInstance(TableConfig tableConfig, Schema schema, File baseDir, boolean onSecondInstance,
+ FluentBaseQueriesTest baseQueriesTest, Map<String, String> extraQueryOptions) {
+ super(tableConfig, schema, baseDir, onSecondInstance, baseQueriesTest, extraQueryOptions);
+ }
+
+ public OnSecondInstance andSegment(Object[]... content) {
+ _segmentContents.add(new FakeSegmentContent(content));
+ return this;
+ }
+
+ public OnSecondInstance andSegment(String... tableText) {
+ super.andSegment(tableText);
+ return this;
+ }
+ }
+
+ public static class QueryExecuted {
+ private final FluentBaseQueriesTest _baseQueriesTest;
+ private final BrokerResponse _brokerResponse;
+ private final Map<String, String> _extraQueryOptions;
+
+ public QueryExecuted(FluentBaseQueriesTest baseQueriesTest, BrokerResponse brokerResponse,
+ Map<String, String> extraQueryOptions) {
+ _baseQueriesTest = baseQueriesTest;
+ _brokerResponse = brokerResponse;
+ _extraQueryOptions = extraQueryOptions;
+ }
+
+ public QueryExecuted thenResultIs(String... tableText) {
+ Object[][] rows = tableAsRows(
+ headerCells -> Arrays.stream(headerCells)
+ .map(String::trim)
+ .map(txt -> txt.toUpperCase(Locale.US))
+ .map(PinotDataType::valueOf)
+ .collect(Collectors.toList()),
+ tableText
+ );
+ thenResultIs(rows);
+
+ return this;
+ }
+
+ public QueryExecuted thenResultIs(Object[]... expectedResult) {
+ if (_brokerResponse.getExceptionsSize() < 0) {
+ Assert.fail("Query failed with " + _brokerResponse.getProcessingExceptions());
+ }
+
+ List<Object[]> actualRows = _brokerResponse.getResultTable().getRows();
+ int rowsToAnalyze = Math.min(actualRows.size(), expectedResult.length);
+ for (int i = 0; i < rowsToAnalyze; i++) {
+ Object[] actualRow = actualRows.get(i);
+ Object[] expectedRow = expectedResult[i];
+ for (int j = 0; j < actualRow.length; j++) {
+ Object actualCell = actualRow[j];
+ Object expectedCell = expectedRow[j];
+ if (actualCell != null && expectedCell != null) {
+ Assert.assertEquals(actualCell.getClass(), expectedCell.getClass(), "On row " + i + " and column " + j);
+ }
+ if (expectedCell == null) {
+ Assert.assertNull(actualCell, "On row " + i + " and column " + j + ". "
+ + "Actual value is '" + actualCell + "', which is not null");
+ } else if (actualCell == null) {
+ Assert.fail("On row " + i + " and column " + j + ". Actual value is null when expecting not null "
+ + "value '" + expectedCell + "'");
+ } else {
+ Assert.assertEquals(actualCell, expectedCell, "On row " + i + " and column " + j);
+ }
+ }
+ }
+ Assert.assertEquals(actualRows.size(), expectedResult.length, "Unexpected number of rows");
+ return this;
+ }
+
+ public QueryExecuted withExtraQueryOptions(Map<String, String> extraQueryOptions) {
+ _extraQueryOptions.clear();
+ _extraQueryOptions.putAll(extraQueryOptions);
+ return this;
+ }
+
+ public QueryExecuted withNullHandling(boolean enabled) {
+ _extraQueryOptions.put("enableNullHandling", Boolean.toString(enabled));
+ return this;
+ }
+
+ public QueryExecuted whenQuery(String query) {
+ BrokerResponseNative brokerResponse = _baseQueriesTest.getBrokerResponse(query);
+ return new QueryExecuted(_baseQueriesTest, brokerResponse, _extraQueryOptions);
+ }
+ }
+
+ public static Object[][] tableAsRows(Function<String[], List<PinotDataType>> extractDataTypes, String... tableText) {
+ String header = tableText[0];
+ String[] headerCells = header.split("\\|");
+
+ List<PinotDataType> dataTypes = extractDataTypes.apply(headerCells);
+
+ for (int i = 0; i < dataTypes.size(); i++) {
+ PinotDataType dataType = dataTypes.get(i);
+ if (!dataType.isSingleValue()) {
+ throw new IllegalArgumentException(
+ "Multi value columns are not supported and the " + i + "th column is of type " + dataType
+ + " which is multivalued");
+ }
+ }
+
+ Object[][] rows = new Object[tableText.length - 1][];
+ for (int i = 1; i < tableText.length; i++) {
+ String[] rawCells = tableText[i].split("\\|");
+ Object[] convertedRow = new Object[dataTypes.size()];
+ for (int col = 0; col < rawCells.length; col++) {
+ String rawCell = rawCells[col].trim();
+ Object converted;
+ if (rawCell.equalsIgnoreCase("null")) {
+ converted = null;
+ } else if (rawCell.equalsIgnoreCase("\"null\"")) {
+ converted = dataTypes.get(col).convert("null", PinotDataType.STRING);
+ } else {
+ converted = dataTypes.get(col).convert(rawCell, PinotDataType.STRING);
+ }
+ convertedRow[col] = converted;
+ }
+ rows[i - 1] = convertedRow;
+ }
+ return rows;
+ }
+
+ public static class FakeSegmentContent extends ArrayList<List<Object>> {
+
+ public FakeSegmentContent(Schema schema, String... tableText) {
+ super(tableText.length - 1);
+
+ Object[][] rows = FluentQueryTest.tableAsRows(
+ headerCells -> {
+ List<PinotDataType> dataTypes = new ArrayList<>();
+ for (String headerCell : headerCells) {
+ String columnName = headerCell.trim();
+ FieldSpec fieldSpec = schema.getFieldSpecFor(columnName);
+ if (fieldSpec.isVirtualColumn()) {
+ throw new IllegalArgumentException("Virtual columns like " + columnName + " cannot be set here");
+ }
+ if (!fieldSpec.isSingleValueField()) {
+ throw new IllegalArgumentException(
+ "Multi valued columns like " + columnName + " cannot be set as text");
+ }
+ dataTypes.add(PinotDataType.getPinotDataTypeForIngestion(fieldSpec));
+ }
+ return dataTypes;
+ },
+ tableText);
+
+ for (Object[] row : rows) {
+ add(Arrays.asList(row));
+ }
+ }
+
+ public FakeSegmentContent(Object[]... rows) {
+ super(rows.length);
+ for (Object[] row : rows) {
+ add(Arrays.asList(row));
+ }
+ }
+ }
+
+ protected static class FluentBaseQueriesTest extends BaseQueriesTest {
+ List<IndexSegment> _segments1 = new ArrayList<>();
+ List<IndexSegment> _segments2 = new ArrayList<>();
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _segments1.get(0);
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ if (_segments2.isEmpty()) {
+ return _segments1;
+ }
+ ArrayList<IndexSegment> segments = new ArrayList<>(_segments1.size() + _segments2.size());
+ segments.addAll(_segments1);
+ segments.addAll(_segments2);
+ return segments;
+ }
+
+ @Override
+ protected List<List<IndexSegment>> getDistinctInstances() {
+ if (_segments2.isEmpty()) {
+ return super.getDistinctInstances();
+ }
+ return Lists.newArrayList(_segments1, _segments2);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org