You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/03/31 21:41:06 UTC
[incubator-pinot] branch master updated: Refactor
DistinctQueriesTest to cover more scenarios (#5168)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 189f3b8 Refactor DistinctQueriesTest to cover more scenarios (#5168)
189f3b8 is described below
commit 189f3b821f6cc5894331baff779ddeec090a1368
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Mar 31 14:40:56 2020 -0700
Refactor DistinctQueriesTest to cover more scenarios (#5168)
- Test all data types (including BYTES)
- Test empty responses
- Test merge of non-empty response and empty response for both server side and broker side
TODO:
- ORDER BY bytesColumn is not properly supported right now
- DistinceTable limit is not properly set
---
.../core/query/aggregation/DistinctTable.java | 3 +-
.../query/reduce/DistinctDataTableReducer.java | 2 +-
.../org/apache/pinot/queries/BaseQueriesTest.java | 8 +-
.../apache/pinot/queries/DistinctQueriesTest.java | 1281 ++++++++------------
4 files changed, 497 insertions(+), 797 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
index 4bf4628..6accdde 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Set;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
@@ -36,6 +35,7 @@ import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.data.table.BaseTable;
import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.spi.utils.BytesUtils;
/**
* This serves the following purposes:
@@ -188,6 +188,7 @@ public class DistinctTable extends BaseTable {
break;
case BYTES:
columnValues[colIndex] = dataTable.getString(rowIndex, colIndex);
+ break;
default:
throw new IllegalStateException(
"Unexpected column data type " + columnDataType + " while deserializing data table for DISTINCT query");
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 85a1450..e45366a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -108,7 +108,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
if (mergedIntermediateResult == null) {
mergedIntermediateResult = intermediateResultToMerge;
} else {
- _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
+ mergedIntermediateResult = _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index a325a97..5d1dc8f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -44,10 +44,10 @@ import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
* Base class for queries tests.
*/
public abstract class BaseQueriesTest {
- private static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
- private static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
- private static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
- private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
+ protected static final Pql2Compiler PQL_COMPILER = new Pql2Compiler();
+ protected static final CalciteSqlCompiler SQL_COMPILER = new CalciteSqlCompiler();
+ protected static final PlanMaker PLAN_MAKER = new InstancePlanMakerImplV2();
+ protected static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);
protected abstract String getFilter();
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 2196b4a..647b68b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -23,16 +23,23 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -41,70 +48,69 @@ import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
import org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.query.aggregation.DistinctTable;
+import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.pql.parsers.Pql2Compiler;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import org.testng.collections.Lists;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
/**
- * Class to test DISTINCT queries.
- * Generates custom data set with explicitly generating
- * duplicate rows/keys
+ * Queries test for DISTINCT queries.
+ * TODO: Add a test for 'ORDER BY bytesColumn' when it is supported
*/
public class DistinctQueriesTest extends BaseQueriesTest {
- private static final int NUM_ROWS = 1_000_000;
-
- private List<GenericRow> _rows = new ArrayList<>();
-
- private static String D1 = "STRING_COL1";
- private static String D2 = "STRING_COL2";
- private static String M1 = "INT_COL";
- private static String M2 = "LONG_COL";
-
- // in the custom data set, each row is repeated after 20 rows
- private static final int TUPLE_REPEAT_INTERVAL = 20;
- // in the custom data set, each row is repeated 5 times, total 200k unique rows in dataset
- private static final int PER_TUPLE_REPEAT_FREQUENCY = 5;
- private static final int NUM_UNIQUE_TUPLES = NUM_ROWS / PER_TUPLE_REPEAT_FREQUENCY;
-
- private static final int INT_BASE_VALUE = 10000;
- private static final int INT_INCREMENT = 500;
- private static final long LONG_BASE_VALUE = 100000000;
- private static final long LONG_INCREMENT = 5500;
-
- private static final String TABLE_NAME = "DistinctTestTable";
- private static final int NUM_SEGMENTS = 2;
- private static final String SEGMENT_NAME_1 = TABLE_NAME + "_100000000_200000000";
- private static final String SEGMENT_NAME_2 = TABLE_NAME + "_300000000_400000000";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "DistinctQueryTest");
-
- private List<IndexSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS);
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME_PREFIX = "testSegment_";
+
+ private static final int NUM_RECORDS_PER_SEGMENT = 10000;
+ private static final int NUM_UNIQUE_RECORDS_PER_SEGMENT = 100;
+
+ private static final String INT_COLUMN = "intColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final String BYTES_COLUMN = "bytesColumn";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
+ .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
+
+ private IndexSegment _indexSegment;
private List<SegmentDataManager> _segmentDataManagers;
- private final Set<Record> _expectedAddTransformResults = new HashSet<>();
- private final Set<Record> _expectedSubTransformResults = new HashSet<>();
- private final Set<Record> _expectedAddSubTransformResults = new HashSet<>();
- private final Set<Record> _expectedResults = new HashSet<>();
- private final FieldSpec.DataType[] _dataTypes =
- new FieldSpec.DataType[]{FieldSpec.DataType.STRING, FieldSpec.DataType.STRING, FieldSpec.DataType.INT, FieldSpec.DataType.LONG};
- private Schema _schema;
+ @Override
+ protected String getFilter() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<SegmentDataManager> getSegmentDataManagers() {
+ return _segmentDataManagers;
+ }
@BeforeClass
public void setUp() {
- Pql2Compiler.ENABLE_DISTINCT = true;
- createPinotTableSchema();
- createTestData();
+ FileUtils.deleteQuietly(INDEX_DIR);
}
@AfterClass
@@ -112,797 +118,490 @@ public class DistinctQueriesTest extends BaseQueriesTest {
FileUtils.deleteQuietly(INDEX_DIR);
}
- private void createPinotTableSchema() {
- _schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
- .addSingleValueDimension(D2, FieldSpec.DataType.STRING).addMetric(M1, FieldSpec.DataType.INT)
- .addMetric(M2, FieldSpec.DataType.LONG).build();
- }
-
/**
- * Custom data generator that explicitly generates duplicate
- * rows in dataset for purpose of testing DISTINCT functionality
+ * Helper method to generate records based on the given base value.
+ *
+ * All columns will have the same value but different data types (BYTES values are encoded STRING values).
+ * For the {i}th unique record, the value will be {baseValue + i}.
*/
- private void createTestData() {
- int pos = 0;
- Object[] columnValues = new Object[_schema.size()];
- for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
- GenericRow row = new GenericRow();
- double addition;
- double subtraction;
- int col = 0;
- boolean duplicate = false;
- for (FieldSpec.DataType dataType : _dataTypes) {
- // generate each column for the row
- Object value = null;
- if (rowIndex == 0) {
- switch (dataType) {
- case INT:
- value = INT_BASE_VALUE;
- row.putField(M1, value);
- break;
- case LONG:
- value = LONG_BASE_VALUE;
- row.putField(M2, value);
- break;
- case STRING:
- value = RandomStringUtils.randomAlphabetic(10);
- if (col == 0) {
- row.putField(D1, value);
- } else {
- row.putField(D2, value);
- }
- break;
- }
- } else {
- if (rowIndex == pos + (TUPLE_REPEAT_INTERVAL * PER_TUPLE_REPEAT_FREQUENCY)) {
- pos = rowIndex;
- }
- if (rowIndex < pos + TUPLE_REPEAT_INTERVAL) {
- // generate unique row
- switch (dataType) {
- case INT:
- value = (Integer) _rows.get(rowIndex - 1).getValue(M1) + INT_INCREMENT;
- row.putField(M1, value);
- break;
- case LONG:
- value = (Long) _rows.get(rowIndex - 1).getValue(M2) + LONG_INCREMENT;
- row.putField(M2, value);
- break;
- case STRING:
- value = RandomStringUtils.randomAlphabetic(10);
- if (col == 0) {
- row.putField(D1, value);
- } else {
- row.putField(D2, value);
- }
- break;
- }
- } else {
- // generate duplicate row
- duplicate = true;
- switch (dataType) {
- case INT:
- value = _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(M1);
- row.putField(M1, value);
- break;
- case LONG:
- value = _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(M2);
- row.putField(M2, value);
- break;
- case STRING:
- if (col == 0) {
- row.putField(D1, _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(D1));
- } else {
- row.putField(D2, _rows.get(rowIndex - TUPLE_REPEAT_INTERVAL).getValue(D2));
- }
- break;
- }
- }
- }
-
- columnValues[col++] = value;
- }
-
- // add the generated row
- _rows.add(row);
-
- // compute expected result for add and sub transform function
- addition = ((Integer) columnValues[2]) + ((Long) columnValues[3]);
- subtraction = ((Long) columnValues[3]) - ((Integer) columnValues[2]);
-
- // compute expected result for multi column distinct
- if (!duplicate) {
- Record record = new Record(new Object[]{columnValues[0], columnValues[1], columnValues[2], columnValues[3]});
- _expectedResults.add(record);
- }
-
- _expectedAddTransformResults.add(new Record(new Object[]{addition}));
- _expectedSubTransformResults.add(new Record(new Object[]{subtraction}));
- _expectedAddSubTransformResults.add(new Record(new Object[]{addition, subtraction}));
+ private List<GenericRow> generateRecords(int baseValue) {
+ List<GenericRow> uniqueRecords = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ int value = baseValue + i;
+ GenericRow record = new GenericRow();
+ record.putValue(INT_COLUMN, value);
+ record.putValue(LONG_COLUMN, (long) value);
+ record.putValue(FLOAT_COLUMN, (float) value);
+ record.putValue(DOUBLE_COLUMN, (double) value);
+ String stringValue = Integer.toString(value);
+ record.putValue(STRING_COLUMN, stringValue);
+ byte[] bytesValue = StringUtil.encodeUtf8(stringValue);
+ record.putValue(BYTES_COLUMN, bytesValue);
+ uniqueRecords.add(record);
}
- }
-
- @Override
- protected String getFilter() {
- return "";
- }
-
- @Override
- protected IndexSegment getIndexSegment() {
- return _indexSegments.get(0);
- }
- @Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS_PER_SEGMENT);
+ for (int i = 0; i < NUM_RECORDS_PER_SEGMENT; i += NUM_UNIQUE_RECORDS_PER_SEGMENT) {
+ records.addAll(uniqueRecords);
+ }
+ return records;
}
- private void createSegment(Schema schema, RecordReader recordReader, String segmentName, String tableName)
+ private ImmutableSegment createSegment(int index, List<GenericRow> records)
throws Exception {
- SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(schema);
- segmentGeneratorConfig.setTableName(tableName);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+ String segmentName = SEGMENT_NAME_PREFIX + index;
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, recordReader);
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records, SCHEMA));
driver.build();
- File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
- if (!segmentIndexDir.exists()) {
- throw new IllegalStateException("Segment generation failed");
- }
- }
-
- private ImmutableSegment loadSegment(String segmentName)
- throws Exception {
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.heap);
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
}
/**
- * Test DISTINCT query with multiple columns on generated data set.
- * All the generated dataset is put into a single segment
- * and we directly run the {@link AggregationOperator} to
- * get segment level execution results.
- * The results are then compared to the expected result table
- * that was build during data generation
- * @throws Exception
+ * Test DISTINCT query within a single segment.
+ * <p>The following query types are tested:
+ * <ul>
+ * <li>Selecting all columns</li>
+ * <li>Selecting some columns with filter</li>
+ * <li>Selecting some columns transform, filter, order-by and limit</li>
+ * <li>Selecting some columns with filter that does not match any record</li>
+ * </ul>
*/
@Test
public void testDistinctInnerSegment()
throws Exception {
+ _indexSegment = createSegment(0, generateRecords(0));
try {
- // put all the generated dataset in a single segment
- try (RecordReader recordReader = new GenericRowRecordReader(_rows, _schema)) {
- createSegment(_schema, recordReader, SEGMENT_NAME_1, TABLE_NAME);
- final ImmutableSegment immutableSegment = loadSegment(SEGMENT_NAME_1);
- _indexSegments.add(immutableSegment);
-
- // All 200k unique rows should be returned
+ {
+ // Test selecting all columns
String query =
- "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 1000000";
- innerSegmentTestHelper(query, NUM_UNIQUE_TUPLES);
-
- // All 200k unique rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 200000";
- innerSegmentTestHelper(query, NUM_UNIQUE_TUPLES);
-
- // 100k rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 100000";
- innerSegmentTestHelper(query, 100000);
-
- // default: 10 unique rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable";
- innerSegmentTestHelper(query, 10);
-
- // default: 10 unique rows should be returned
- query = "SELECT DISTINCT(add(INT_COL,LONG_COL)) FROM DistinctTestTable";
- innerSegmentTransformQueryTestHelper(query, 10, 1, new String[]{"add(INT_COL,LONG_COL)"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
-
- // default: 10 unique rows should be returned
- query = "SELECT DISTINCT(sub(LONG_COL,INT_COL)) FROM DistinctTestTable";
- innerSegmentTransformQueryTestHelper(query, 10, 2, new String[]{"sub(LONG_COL,INT_COL)"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
-
- // 100k unique rows should be returned
- query = "SELECT DISTINCT(add(INT_COL,LONG_COL),sub(LONG_COL,INT_COL)) FROM DistinctTestTable LIMIT 100000 ";
- innerSegmentTransformQueryTestHelper(query, 100000, 3,
- new String[]{"add(INT_COL,LONG_COL)", "sub(LONG_COL,INT_COL)"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
+ "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
+
+ // Check data schema
+ DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+ DataSchema dataSchema = distinctTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(),
+ new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+ // Check values, where all 100 unique values should be returned
+ assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValues.add(i);
+ }
+ Set<Integer> actualValues = new HashSet<>();
+ Iterator<Record> iterator = distinctTable.iterator();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ Object[] values = record.getValues();
+ int intValue = (int) values[0];
+ assertEquals(((Long) values[1]).intValue(), intValue);
+ assertEquals(((Float) values[2]).intValue(), intValue);
+ assertEquals(((Double) values[3]).intValue(), intValue);
+ assertEquals(Integer.parseInt((String) values[4]), intValue);
+ assertEquals(StringUtil.decodeUtf8((byte[]) values[5]), values[4]);
+ actualValues.add(intValue);
+ }
+ assertEquals(actualValues, expectedValues);
}
- } finally {
- destroySegments();
- }
- }
-
- /**
- * Helper for inner segment query tests
- * @param query query to run
- * @param expectedSize expected result size
- */
- private void innerSegmentTestHelper(final String query, final int expectedSize) {
- // compile to broker request and directly run the operator
- AggregationOperator aggregationOperator = getOperatorForQuery(query);
- IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
- List<Object> operatorResult = resultsBlock.getAggregationResult();
-
- // verify resultset
- Assert.assertNotNull(operatorResult);
- Assert.assertEquals(operatorResult.size(), 1);
- Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
- DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
- Assert.assertEquals(_expectedResults.size(), NUM_UNIQUE_TUPLES);
- Assert.assertEquals(distinctTable.size(), expectedSize);
-
- DataSchema dataSchema = distinctTable.getDataSchema();
- Assert.assertEquals(dataSchema.getColumnNames(), new String[]{D1, D2, M1, M2});
- Assert.assertEquals(dataSchema.getColumnDataTypes(),
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG});
-
- Iterator<Record> iterator = distinctTable.iterator();
- while (iterator.hasNext()) {
- Record record = iterator.next();
- Assert.assertEquals(record.getValues().length, 4);
- Assert.assertTrue(_expectedResults.contains(record));
- }
- }
-
- /**
- * Helper for inner segment transform query tests
- * @param query query to run
- * @param expectedSize expected result size
- */
- private void innerSegmentTransformQueryTestHelper(final String query, final int expectedSize, final int op,
- final String[] columnNames, final DataSchema.ColumnDataType[] columnTypes) {
- // compile to broker request and directly run the operator
- AggregationOperator aggregationOperator = getOperatorForQuery(query);
- IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
- final List<Object> operatorResult = resultsBlock.getAggregationResult();
-
- // verify resultset
- Assert.assertNotNull(operatorResult);
- Assert.assertEquals(operatorResult.size(), 1);
- Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
- final DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
- Assert.assertEquals(distinctTable.size(), expectedSize);
-
- DataSchema dataSchema = distinctTable.getDataSchema();
- Assert.assertEquals(dataSchema.getColumnNames(), columnNames);
- Assert.assertEquals(dataSchema.getColumnDataTypes(), columnTypes);
-
- Iterator<Record> iterator = distinctTable.iterator();
- while (iterator.hasNext()) {
- Record record = iterator.next();
- Assert.assertEquals(record.getValues().length, columnNames.length);
- if (op == 1) {
- Assert.assertTrue(_expectedAddTransformResults.contains(record));
- } else if (op == 2) {
- Assert.assertTrue(_expectedSubTransformResults.contains(record));
- } else {
- Assert.assertTrue(_expectedAddSubTransformResults.contains(record));
+ {
+ // Test selecting some columns with filter
+ String query =
+ "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+
+ // Check data schema
+ DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+ DataSchema dataSchema = distinctTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+ // Check values, where 40 matched values should be returned
+ assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 60; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValues.add(i);
+ }
+ Set<Integer> actualValues = new HashSet<>();
+ Iterator<Record> iterator = distinctTable.iterator();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ Object[] values = record.getValues();
+ int intValue = Integer.parseInt((String) values[0]);
+ assertEquals(StringUtil.decodeUtf8((byte[]) values[1]), values[0]);
+ assertEquals(((Float) values[2]).intValue(), intValue);
+ actualValues.add(intValue);
+ }
+ assertEquals(actualValues, expectedValues);
}
- }
- }
-
- /**
- * Test DISTINCT query with multiple columns on generated data set.
- * The generated dataset is divided into two segments.
- * We exercise the entire execution from broker ->
- * server -> segment. The server combines the results
- * from segments and sends the data table to broker.
- *
- * Currently the base class mimics the broker level
- * execution by duplicating the data table to mimic
- * two servers and then doing the merge
- *
- * The results are then compared to the expected result table
- * that was build during data generation
- * @throws Exception
- */
- @Test(dependsOnMethods = {"testDistinctInnerSegment"})
- public void testDistinctInterSegmentInterServer()
- throws Exception {
- try {
- // divide the generated dataset into 2 parts and create 2 segments
- final List<GenericRow> randomRows = new ArrayList<>();
- final List<GenericRow> copiedRows = new ArrayList<>(_rows);
- final int size = copiedRows.size();
- for (int row = size - 1; row >= size / 2; row--) {
- randomRows.add(copiedRows.remove(row));
+ {
+ // Test selecting some columns with transform, filter, order-by and limit
+ String query =
+ "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+
+ // Check data schema
+ DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+ DataSchema dataSchema = distinctTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+ // Check values, where 60 matched values should be returned (limit won't take effect on server side)
+ assertEquals(distinctTable.size(), 60);
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ expectedValues.add(i);
+ }
+ Set<Integer> actualValues = new HashSet<>();
+ Iterator<Record> iterator = distinctTable.iterator();
+ while (iterator.hasNext()) {
+ Record record = iterator.next();
+ Object[] values = record.getValues();
+ int intValue = ((Double) values[0]).intValue() / 2;
+ assertEquals(Integer.parseInt((String) values[1]), intValue);
+ actualValues.add(intValue);
+ }
+ assertEquals(actualValues, expectedValues);
}
-
- try (RecordReader recordReader1 = new GenericRowRecordReader(copiedRows, _schema);
- RecordReader recordReader2 = new GenericRowRecordReader(randomRows, _schema)) {
- createSegment(_schema, recordReader1, SEGMENT_NAME_1, TABLE_NAME);
- createSegment(_schema, recordReader2, SEGMENT_NAME_2, TABLE_NAME);
- final ImmutableSegment segment1 = loadSegment(SEGMENT_NAME_1);
- final ImmutableSegment segment2 = loadSegment(SEGMENT_NAME_2);
-
- _indexSegments.add(segment1);
- _indexSegments.add(segment2);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(segment1), new ImmutableSegmentDataManager(segment2));
-
- // All 200k unique rows should be returned
+ {
+ // Test selecting some columns with filter that does not match any record
String query =
- "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 1000000";
- interSegmentInterServerTestHelper(query, NUM_UNIQUE_TUPLES);
-
- // All 200k unique unique 1 million rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 200000";
- interSegmentInterServerTestHelper(query, NUM_UNIQUE_TUPLES);
+ "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
- // 100k unique rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable LIMIT 100000";
- interSegmentInterServerTestHelper(query, 100000);
+ // Check data schema, where data type should be STRING for all columns
+ DistinctTable distinctTable = getDistinctTableInnerSegment(query);
+ DataSchema dataSchema = distinctTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
- // Default: 10 unique rows should be returned
- query = "SELECT DISTINCT(STRING_COL1, STRING_COL2, INT_COL, LONG_COL) FROM DistinctTestTable";
- interSegmentInterServerTestHelper(query, 10);
+ // Check values, where no record should be returned
+ assertEquals(distinctTable.size(), 0);
}
} finally {
- destroySegments();
+ _indexSegment.destroy();
}
}
/**
- * Helper for inter segment, inter server query tests
- * @param query query to run
- * @param expectedSize expected result size
+ * Helper method to get the DistinctTable result for one single segment for the given query.
*/
- private void interSegmentInterServerTestHelper(String query, int expectedSize) {
- BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- final SelectionResults selectionResults = brokerResponse.getSelectionResults();
-
- Assert.assertEquals(selectionResults.getColumns().size(), 4);
-
- Assert.assertEquals(selectionResults.getColumns().get(0), D1);
- Assert.assertEquals(selectionResults.getColumns().get(1), D2);
- Assert.assertEquals(selectionResults.getColumns().get(2), M1);
- Assert.assertEquals(selectionResults.getColumns().get(3), M2);
-
- Assert.assertEquals(_expectedResults.size(), NUM_UNIQUE_TUPLES);
- Assert.assertEquals(selectionResults.getRows().size(), expectedSize);
-
- for (Serializable[] row : selectionResults.getRows()) {
- Assert.assertEquals(row.length, 4);
- Record record = new Record(row);
- Assert.assertTrue(_expectedResults.contains(record));
- }
+ private DistinctTable getDistinctTableInnerSegment(String query) {
+ AggregationOperator aggregationOperator = getOperatorForQuery(query);
+ List<Object> aggregationResult = aggregationOperator.nextBlock().getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
+ assertTrue(aggregationResult.get(0) instanceof DistinctTable);
+ return (DistinctTable) aggregationResult.get(0);
}
/**
- * Test DISTINCT queries on multiple columns with FILTER.
- * A simple hand-written data set of 10 rows in a single segment
- * is used for FILTER based queries as opposed to generated data set.
- * The results are compared to expected table.
- *
- * Runs 4 different queries with predicates.
- * @throws Exception
+ * Test DISTINCT query across multiple segments and servers (2 servers, each with 2 segments).
+ * <p>Both PQL and SQL format are tested.
+ * <p>The following query types are tested:
+ * <ul>
+ * <li>Selecting all columns</li>
+ * <li>Selecting some columns with filter</li>
+ * <li>Selecting some columns transform, filter, order-by and limit</li>
+ * <li>Selecting some columns with filter that does not match any record</li>
+ * <li>
+ * Selecting some columns with filter that does not match any record in one segment but matches some records in
+ * the other segment
+ * </li>
+ * <li>
+ * Selecting some columns with filter that does not match any record in one server but matches some records in the
+ * other server
+ * </li>
+ * </ul>
*/
- @Test(dependsOnMethods = {"testDistinctInterSegmentInterServer"})
- public void testDistinctWithFilter()
+ @Test
+ public void testDistinctInterSegment()
throws Exception {
+ ImmutableSegment segment0 = createSegment(0, generateRecords(0));
+ ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
+ _segmentDataManagers =
+ Arrays.asList(new ImmutableSegmentDataManager(segment0), new ImmutableSegmentDataManager(segment1));
try {
- String tableName = TABLE_NAME + "WithFilter";
-
- Schema schema = new Schema.SchemaBuilder().setSchemaName(tableName)
- .addSingleValueDimension("State", FieldSpec.DataType.STRING)
- .addSingleValueDimension("City", FieldSpec.DataType.STRING).addMetric("SaleAmount", FieldSpec.DataType.INT)
- .build();
-
- String query1 = "SELECT DISTINCT(State, City) FROM " + tableName + " WHERE SaleAmount >= 200000";
- String query2 = "SELECT DISTINCT(State, City) FROM " + tableName + " WHERE SaleAmount >= 400000";
- String query3 = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount >= 200000";
- String query4 = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount >= 400000";
-
- Set<Record> q1ExpectedResults = new HashSet<>();
- Set<Record> q2ExpectedResults = new HashSet<>();
- Set<Record> q3ExpectedResults = new HashSet<>();
- Set<Record> q4ExpectedResults = new HashSet<>();
-
- List<GenericRow> rows =
- createSimpleTable(q1ExpectedResults, q2ExpectedResults, q3ExpectedResults, q4ExpectedResults);
-
- try (RecordReader recordReader = new GenericRowRecordReader(rows, schema)) {
- createSegment(schema, recordReader, SEGMENT_NAME_1, tableName);
- final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
- _indexSegments.add(segment);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(segment), new ImmutableSegmentDataManager(segment));
-
- // without ORDER BY
- runFilterQueryInnerSegment(q1ExpectedResults, query1, new String[]{"State", "City"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
- runFilterQueryInnerSegment(q2ExpectedResults, query2, new String[]{"State", "City"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
- runFilterQueryInnerSegment(q3ExpectedResults, query3, new String[]{"State", "City", "SaleAmount"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
- runFilterQueryInnerSegment(q4ExpectedResults, query4, new String[]{"State", "City", "SaleAmount"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT});
-
- // with ORDER BY ASC/DESC
- String orderByQuery = query1 + " ORDER BY State, City LIMIT 100";
- List<Record> sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query1 + " ORDER BY State, City LIMIT 2";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query1 + " ORDER BY State, City DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query1 + " ORDER BY State, City DESC LIMIT 2";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale"}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query2 + " ORDER BY State, City LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query2 + " ORDER BY State, City LIMIT 1";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query2 + " ORDER BY State, City DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query2 + " ORDER BY State, City DESC LIMIT 1";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "San Mateo"}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City"});
-
- orderByQuery = query3 + " ORDER BY State, City, SaleAmount LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query3 + " ORDER BY State, City, SaleAmount LIMIT 3";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query3 + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query3 + " ORDER BY State, City, SaleAmount DESC LIMIT 3";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query4 + " ORDER BY State, City, SaleAmount LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query4 + " ORDER BY State, City, SaleAmount LIMIT 2";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query4 + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = query4 + " ORDER BY State, City, SaleAmount DESC LIMIT 2";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
- + " ORDER BY State, City, SaleAmount LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery =
- "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " ORDER BY State, City, SaleAmount LIMIT 10";
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery =
- "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " ORDER BY State, City, SaleAmount LIMIT 5";
- sortedResults = sortedResults.subList(0, 5);
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
- + " ORDER BY State, City, SaleAmount DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
- + " ORDER BY State, City, SaleAmount DESC LIMIT 10";
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
- + " ORDER BY State, City, SaleAmount DESC LIMIT 5";
- sortedResults = sortedResults.subList(0, 5);
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- orderByQuery = "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName
- + " ORDER BY State DESC, City, SaleAmount DESC LIMIT 100";
- sortedResults = new ArrayList<>();
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 150000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Bellevue", 100000}));
- sortedResults.add(new Record(new Object[]{"Washington", "Seattle", 100000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 100000}));
- sortedResults.add(new Record(new Object[]{"Oregon", "Portland", 50000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 700000}));
- sortedResults.add(new Record(new Object[]{"California", "Mountain View", 200000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 500000}));
- sortedResults.add(new Record(new Object[]{"California", "San Mateo", 400000}));
- sortedResults.add(new Record(new Object[]{"California", "Sunnyvale", 300000}));
- runQueryInterSegmentWithOrderBy(orderByQuery, sortedResults, new String[]{"State", "City", "SaleAmount"});
-
- String emptyResultQuery =
- "SELECT DISTINCT(State, City, SaleAmount) FROM " + tableName + " WHERE SaleAmount = 0";
- // All column data types should be STRING for empty result
- runFilterQueryInnerSegment(Collections.emptySet(), emptyResultQuery,
- new String[]{"State", "City", "SaleAmount"},
- new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING});
- runQueryInterSegmentWithOrderBy(emptyResultQuery, Collections.emptyList(),
- new String[]{"State", "City", "SaleAmount"});
- emptyResultQuery += " ORDER BY State, City LIMIT 100";
- runQueryInterSegmentWithOrderBy(emptyResultQuery, Collections.emptyList(),
- new String[]{"State", "City", "SaleAmount"});
+ {
+ // Test selecting all columns
+ String pqlQuery =
+ "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000";
+ String sqlQuery =
+ "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable LIMIT 10000";
+
+ // Check data schema
+ BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(),
+ Arrays.asList("intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"));
+ BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(),
+ new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+ // Check values, where all 200 unique values should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValues.add(i);
+ expectedValues.add(1000 + i);
+ }
+ Set<Integer> pqlValues = new HashSet<>();
+ for (Serializable[] row : pqlRows) {
+ int intValue = (int) row[0];
+ assertEquals(((Long) row[1]).intValue(), intValue);
+ assertEquals(((Float) row[2]).intValue(), intValue);
+ assertEquals(((Double) row[3]).intValue(), intValue);
+ assertEquals(Integer.parseInt((String) row[4]), intValue);
+ assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
+ pqlValues.add(intValue);
+ }
+ assertEquals(pqlValues, expectedValues);
+ Set<Integer> sqlValues = new HashSet<>();
+ for (Object[] row : sqlRows) {
+ int intValue = (int) row[0];
+ assertEquals(((Long) row[1]).intValue(), intValue);
+ assertEquals(((Float) row[2]).intValue(), intValue);
+ assertEquals(((Double) row[3]).intValue(), intValue);
+ assertEquals(Integer.parseInt((String) row[4]), intValue);
+ assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
+ sqlValues.add(intValue);
+ }
+ assertEquals(sqlValues, expectedValues);
+ }
+ {
+ // Test selecting some columns with filter
+ String pqlQuery =
+ "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+ String sqlQuery =
+ "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000";
+
+ // Check data schema
+ BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(), Arrays.asList("stringColumn", "bytesColumn", "floatColumn"));
+ BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+ // Check values, where 140 matched values should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ if (i >= 60) {
+ expectedValues.add(i);
+ }
+ expectedValues.add(1000 + i);
+ }
+ Set<Integer> pqlValues = new HashSet<>();
+ for (Serializable[] row : pqlRows) {
+ int intValue = Integer.parseInt((String) row[0]);
+ assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
+ assertEquals(((Float) row[2]).intValue(), intValue);
+ pqlValues.add(intValue);
+ }
+ assertEquals(pqlValues, expectedValues);
+ Set<Integer> sqlValues = new HashSet<>();
+ for (Object[] row : sqlRows) {
+ int intValue = Integer.parseInt((String) row[0]);
+ assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
+ assertEquals(((Float) row[2]).intValue(), intValue);
+ sqlValues.add(intValue);
+ }
+ assertEquals(sqlValues, expectedValues);
+ }
+ {
+ // Test selecting some columns with transform, filter, order-by and limit
+ String pqlQuery =
+ "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+ String sqlQuery =
+ "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10";
+
+ // Check data schema
+ BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(), Arrays.asList("add(intColumn,floatColumn)", "stringColumn"));
+ BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+ // Check values, where only 10 top values sorted in string format descending order should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertEquals(pqlRows.size(), 10);
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertEquals(sqlRows.size(), 10);
+ int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
+ for (int i = 0; i < 10; i++) {
+ Serializable[] row = pqlRows.get(i);
+ int intValue = ((Double) row[0]).intValue() / 2;
+ assertEquals(intValue, expectedValues[i]);
+ assertEquals(Integer.parseInt((String) row[1]), intValue);
+ }
+ for (int i = 0; i < 10; i++) {
+ Object[] row = sqlRows.get(i);
+ int intValue = ((Double) row[0]).intValue() / 2;
+ assertEquals(intValue, expectedValues[i]);
+ assertEquals(Integer.parseInt((String) row[1]), intValue);
+ }
+ }
+ {
+ // Test selecting some columns with filter that does not match any record
+ String pqlQuery =
+ "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+ String sqlQuery =
+ "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10";
+
+ // Check data schema, where data type should be STRING for all columns
+ BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(), Arrays.asList("floatColumn", "longColumn"));
+ BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(),
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+
+ // Check values, where no record should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertTrue(pqlRows.isEmpty());
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertTrue(sqlRows.isEmpty());
+ }
+ {
+ // Test selecting some columns with filter that does not match any record in one segment but matches some
+ // records in the other segment
+ String pqlQuery =
+ "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
+ String sqlQuery =
+ "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5";
+
+ // Check data schema
+ BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(), Collections.singletonList("intColumn"));
+ BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT});
+
+ // Check values, where only 5 top values sorted in int format ascending order should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertEquals(pqlRows.size(), 5);
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertEquals(sqlRows.size(), 5);
+ int[] expectedValues = new int[]{1000, 1001, 1002, 1003, 1004};
+ for (int i = 0; i < 5; i++) {
+ Serializable[] row = pqlRows.get(i);
+ assertEquals((int) row[0], expectedValues[i]);
+ }
+ for (int i = 0; i < 5; i++) {
+ Object[] row = sqlRows.get(i);
+ assertEquals((int) row[0], expectedValues[i]);
+ }
+ }
+ {
+ // Test electing some columns with filter that does not match any record in one server but matches some records
+ // in the other server
+ String pqlQuery =
+ "SELECT DISTINCT(longColumn) FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
+ String sqlQuery =
+ "SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn < 200 ORDER BY longColumn DESC LIMIT 5";
+
+ BrokerRequest pqlBrokerRequest = PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+ BrokerResponseNative pqlResponse = queryServersWithDifferentSegments(pqlBrokerRequest, segment0, segment1);
+ BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+ sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat", "sql"));
+ BrokerResponseNative sqlResponse = queryServersWithDifferentSegments(sqlBrokerRequest, segment0, segment1);
+
+ // Check data schema
+ SelectionResults selectionResults = pqlResponse.getSelectionResults();
+ assertNotNull(selectionResults);
+ assertEquals(selectionResults.getColumns(), Collections.singletonList("longColumn"));
+ ResultTable resultTable = sqlResponse.getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = resultTable.getDataSchema();
+ assertEquals(dataSchema.getColumnNames(), new String[]{"longColumn"});
+ assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.LONG});
+
+ // Check values, where only 5 top values sorted in long format descending order should be returned
+ List<Serializable[]> pqlRows = selectionResults.getRows();
+ assertEquals(pqlRows.size(), 5);
+ List<Object[]> sqlRows = resultTable.getRows();
+ assertEquals(sqlRows.size(), 5);
+ int[] expectedValues = new int[]{99, 98, 97, 96, 95};
+ for (int i = 0; i < 5; i++) {
+ Serializable[] row = pqlRows.get(i);
+ assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
+ }
+ for (int i = 0; i < 5; i++) {
+ Object[] row = sqlRows.get(i);
+ assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
+ }
}
} finally {
- destroySegments();
- }
- }
-
- /**
- * Helper for testing filter queries
- * @param expectedTable expected result set
- * @param query query to run
- * @param columnNames name of columns
- * @param types data types
- */
- private void runFilterQueryInnerSegment(final Set<Record> expectedTable, final String query, String[] columnNames,
- DataSchema.ColumnDataType[] types) {
- AggregationOperator aggregationOperator = getOperatorForQuery(query);
- IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
- List<Object> operatorResult = resultsBlock.getAggregationResult();
-
- Assert.assertNotNull(operatorResult);
- Assert.assertEquals(operatorResult.size(), 1);
- Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
-
- DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
- Assert.assertEquals(distinctTable.size(), expectedTable.size());
-
- DataSchema dataSchema = distinctTable.getDataSchema();
- Assert.assertEquals(dataSchema.getColumnNames(), columnNames);
- Assert.assertEquals(dataSchema.getColumnDataTypes(), types);
-
- Iterator<Record> iterator = distinctTable.iterator();
- while (iterator.hasNext()) {
- Record record = iterator.next();
- Assert.assertEquals(record.getValues().length, columnNames.length);
- Assert.assertTrue(expectedTable.contains(record));
- }
- }
-
- private void runQueryInterSegmentWithOrderBy(String query, List<Record> orderedResults, String[] columnNames) {
- BrokerResponseNative brokerResponseNative = getBrokerResponseForPqlQuery(query);
- final SelectionResults selectionResults = brokerResponseNative.getSelectionResults();
- Assert.assertEquals(selectionResults.getColumns(), Lists.newArrayList(columnNames));
- List<Serializable[]> rows = selectionResults.getRows();
- Assert.assertEquals(rows.size(), orderedResults.size());
- int counter = 0;
- for (Serializable[] row : rows) {
- Assert.assertEquals(row.length, columnNames.length);
- Record actualRecord = new Record(row);
- Assert.assertEquals(actualRecord, orderedResults.get(counter++));
+ for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
+ segmentDataManager.destroy();
+ }
}
}
/**
- * Create a segment with simple table of (State, City, SaleAmount, Time)
- * @param q1ExpectedResults expected results of filter query 1
- * @param q2ExpectedResults expected results of filter query 1
- * @param q3ExpectedResults expected results of filter query 1
- * @param q4ExpectedResults expected results of filter query 1
- * @return list of generic rows
+ * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
+ * 2 copies of segment1.
*/
- private List<GenericRow> createSimpleTable(final Set<Record> q1ExpectedResults, final Set<Record> q2ExpectedResults,
- final Set<Record> q3ExpectedResults, final Set<Record> q4ExpectedResults) {
- int numRows = 10;
- List<GenericRow> rows = new ArrayList<>(numRows);
- Object[] columns;
-
- // ROW 1
- GenericRow row = new GenericRow();
- columns = new Object[]{"California", "San Mateo", 500000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- Record record = new Record(new Object[]{columns[0], columns[1]});
- q1ExpectedResults.add(record);
- q2ExpectedResults.add(record);
- record = new Record(new Object[]{columns[0], columns[1], columns[2]});
- q3ExpectedResults.add(record);
- q4ExpectedResults.add(record);
-
- // ROW 2
- row = new GenericRow();
- columns = new Object[]{"California", "San Mateo", 400000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- record = new Record(new Object[]{columns[0], columns[1], columns[2]});
- q3ExpectedResults.add(record);
- q4ExpectedResults.add(record);
-
- // ROW 3
- row = new GenericRow();
- columns = new Object[]{"California", "Sunnyvale", 300000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- record = new Record(new Object[]{columns[0], columns[1]});
- q1ExpectedResults.add(record);
- record = new Record(new Object[]{columns[0], columns[1], columns[2]});
- q3ExpectedResults.add(record);
-
- // ROW 4
- row = new GenericRow();
- columns = new Object[]{"California", "Sunnyvale", 300000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 5
- row = new GenericRow();
- columns = new Object[]{"California", "Mountain View", 700000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- record = new Record(new Object[]{columns[0], columns[1]});
- q1ExpectedResults.add(record);
- q2ExpectedResults.add(record);
- record = new Record(new Object[]{columns[0], columns[1], columns[2]});
- q3ExpectedResults.add(record);
- q4ExpectedResults.add(record);
-
- // ROW 6
- row = new GenericRow();
- columns = new Object[]{"California", "Mountain View", 700000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 7
- row = new GenericRow();
- columns = new Object[]{"California", "Mountain View", 200000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- record = new Record(new Object[]{columns[0], columns[1], columns[2]});
- q3ExpectedResults.add(record);
-
- // ROW 8
- row = new GenericRow();
- columns = new Object[]{"Washington", "Seattle", 100000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 9
- row = new GenericRow();
- columns = new Object[]{"Washington", "Bellevue", 100000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 10
- row = new GenericRow();
- columns = new Object[]{"Oregon", "Portland", 50000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 11
- row = new GenericRow();
- columns = new Object[]{"Washington", "Bellevue", 150000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- // ROW 12
- row = new GenericRow();
- columns = new Object[]{"Oregon", "Portland", 100000};
- row.putField("State", columns[0]);
- row.putField("City", columns[1]);
- row.putField("SaleAmount", columns[2]);
- rows.add(row);
-
- return rows;
- }
-
- private void destroySegments() {
- for (IndexSegment indexSegment : _indexSegments) {
- if (indexSegment != null) {
- indexSegment.destroy();
- }
- }
- _indexSegments.clear();
+ private BrokerResponseNative queryServersWithDifferentSegments(BrokerRequest brokerRequest, ImmutableSegment segment0,
+ ImmutableSegment segment1) {
+ List<SegmentDataManager> segmentDataManagers0 =
+ Arrays.asList(new ImmutableSegmentDataManager(segment0), new ImmutableSegmentDataManager(segment0));
+ List<SegmentDataManager> segmentDataManagers1 =
+ Arrays.asList(new ImmutableSegmentDataManager(segment1), new ImmutableSegmentDataManager(segment1));
+
+ // Server side
+ DataTable instanceResponse0 = PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers0, brokerRequest, EXECUTOR_SERVICE,
+ CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+ DataTable instanceResponse1 = PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers1, brokerRequest, EXECUTOR_SERVICE,
+ CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+
+ // Broker side
+ BrokerReduceService brokerReduceService = new BrokerReduceService();
+ Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+ dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.OFFLINE), instanceResponse0);
+ dataTableMap.put(new ServerRoutingInstance("localhost", 1234, TableType.REALTIME), instanceResponse1);
+ return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap, null);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org