You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/06/29 06:11:28 UTC
[pinot] branch master updated: Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4152f60a84 Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)
4152f60a84 is described below
commit 4152f60a849fe38f57b95600570555a5522c0c4c
Author: Sonam Mandal <so...@linkedin.com>
AuthorDate: Tue Jun 28 23:11:22 2022 -0700
Add multi-value noDict APIs and their implementation for querying immutable and mutable fixed-width segment types (#8953)
* Add multi-value raw APIs and their implementation for querying immutable and mutable fixed-width segment types
* Address review comments
* Update pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
Co-authored-by: Rong Rong <wa...@gmail.com>
* Fix
Co-authored-by: Rong Rong <wa...@gmail.com>
---
.../org/apache/pinot/core/common/DataFetcher.java | 79 +-
.../apache/pinot/queries/DistinctQueriesTest.java | 520 ++++++++++++-
.../pinot/queries/MultiValueRawQueriesTest.java | 810 +++++++++++++++++++++
.../mutable/DefaultMutableIndexProvider.java | 47 +-
.../indexsegment/mutable/IntermediateSegment.java | 2 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 160 +++-
.../forward/FixedByteMVMutableForwardIndex.java | 87 ++-
.../fwd/MultiValueFixedByteRawIndexCreator.java | 5 +-
.../constant/ConstantMVForwardIndexReader.java | 10 +
...eader.java => BaseChunkForwardIndexReader.java} | 31 +-
.../forward/FixedBitMVForwardIndexReader.java | 71 ++
.../FixedByteChunkMVForwardIndexReader.java | 56 +-
.../FixedByteChunkSVForwardIndexReader.java | 4 +-
.../FixedBytePower2ChunkSVForwardIndexReader.java | 4 +-
.../forward/VarByteChunkMVForwardIndexReader.java | 62 +-
.../forward/VarByteChunkSVForwardIndexReader.java | 4 +-
.../startree/v2/store/StarTreeLoaderUtils.java | 4 +-
.../mutable/MutableSegmentImplRawMVTest.java | 266 +++++++
.../impl/dictionary/MultiValueDictionaryTest.java | 169 ++++-
.../MultiValueFixedByteRawIndexCreatorTest.java | 3 +-
.../FixedByteMVMutableForwardIndexTest.java | 49 +-
.../spi/index/mutable/MutableForwardIndex.java | 105 +++
.../spi/index/reader/ForwardIndexReader.java | 389 +++++++++-
23 files changed, 2775 insertions(+), 162 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
index 11cae5496a..0eff70555b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java
@@ -53,6 +53,7 @@ public class DataFetcher {
// ChunkReaderContext should be closed explicitly to release the off-heap buffer
private final Map<String, ColumnValueReader> _columnValueReaderMap;
private final int[] _reusableMVDictIds;
+ private final int _maxNumValuesPerMVEntry;
/**
* Constructor for DataFetcher.
@@ -74,6 +75,7 @@ public class DataFetcher {
}
}
_reusableMVDictIds = new int[maxNumValuesPerMVEntry];
+ _maxNumValuesPerMVEntry = maxNumValuesPerMVEntry;
}
/**
@@ -628,12 +630,15 @@ public class DataFetcher {
void readIntValuesMV(int[] docIds, int length, int[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
- assert _dictionary != null;
- for (int i = 0; i < length; i++) {
- int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
- int[] values = new int[numValues];
- _dictionary.readIntValues(_reusableMVDictIds, numValues, values);
- valuesBuffer[i] = values;
+ if (_dictionary != null) {
+ for (int i = 0; i < length; i++) {
+ int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ int[] values = new int[numValues];
+ _dictionary.readIntValues(_reusableMVDictIds, numValues, values);
+ valuesBuffer[i] = values;
+ }
+ } else {
+ _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
}
}
@@ -645,12 +650,15 @@ public class DataFetcher {
void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
- assert _dictionary != null;
- for (int i = 0; i < length; i++) {
- int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
- long[] values = new long[numValues];
- _dictionary.readLongValues(_reusableMVDictIds, numValues, values);
- valuesBuffer[i] = values;
+ if (_dictionary != null) {
+ for (int i = 0; i < length; i++) {
+ int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ long[] values = new long[numValues];
+ _dictionary.readLongValues(_reusableMVDictIds, numValues, values);
+ valuesBuffer[i] = values;
+ }
+ } else {
+ _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
}
}
@@ -662,12 +670,15 @@ public class DataFetcher {
void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
- assert _dictionary != null;
- for (int i = 0; i < length; i++) {
- int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
- float[] values = new float[numValues];
- _dictionary.readFloatValues(_reusableMVDictIds, numValues, values);
- valuesBuffer[i] = values;
+ if (_dictionary != null) {
+ for (int i = 0; i < length; i++) {
+ int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ float[] values = new float[numValues];
+ _dictionary.readFloatValues(_reusableMVDictIds, numValues, values);
+ valuesBuffer[i] = values;
+ }
+ } else {
+ _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
}
}
@@ -679,12 +690,15 @@ public class DataFetcher {
void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
- assert _dictionary != null;
- for (int i = 0; i < length; i++) {
- int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
- double[] values = new double[numValues];
- _dictionary.readDoubleValues(_reusableMVDictIds, numValues, values);
- valuesBuffer[i] = values;
+ if (_dictionary != null) {
+ for (int i = 0; i < length; i++) {
+ int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ double[] values = new double[numValues];
+ _dictionary.readDoubleValues(_reusableMVDictIds, numValues, values);
+ valuesBuffer[i] = values;
+ }
+ } else {
+ _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
}
}
@@ -696,12 +710,15 @@ public class DataFetcher {
void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
- assert _dictionary != null;
- for (int i = 0; i < length; i++) {
- int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
- String[] values = new String[numValues];
- _dictionary.readStringValues(_reusableMVDictIds, numValues, values);
- valuesBuffer[i] = values;
+ if (_dictionary != null) {
+ for (int i = 0; i < length; i++) {
+ int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ String[] values = new String[numValues];
+ _dictionary.readStringValues(_reusableMVDictIds, numValues, values);
+ valuesBuffer[i] = values;
+ }
+ } else {
+ _reader.readValuesMV(docIds, length, _maxNumValuesPerMVEntry, valuesBuffer, getReaderContext());
}
}
@@ -714,7 +731,7 @@ public class DataFetcher {
public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) {
Tracing.activeRecording().setInputDataType(_dataType, _singleValue);
for (int i = 0; i < length; i++) {
- numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext());
+ numValuesBuffer[i] = _reader.getNumValuesMV(docIds[i], getReaderContext());
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 6e34f36a48..9f17069f69 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -92,13 +92,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
private static final String FLOAT_MV_COLUMN = "floatMVColumn";
private static final String DOUBLE_MV_COLUMN = "doubleMVColumn";
private static final String STRING_MV_COLUMN = "stringMVColumn";
-
- // TODO: Fix raw index for MV column then add tests for them
-// private static final String RAW_INT_MV_COLUMN = "rawIntMVColumn";
-// private static final String RAW_LONG_MV_COLUMN = "rawLongMVColumn";
-// private static final String RAW_FLOAT_MV_COLUMN = "rawFloatMVColumn";
-// private static final String RAW_DOUBLE_MV_COLUMN = "rawDoubleMVColumn";
-// private static final String RAW_STRING_MV_COLUMN = "rawStringMVColumn";
+ private static final String RAW_INT_MV_COLUMN = "rawIntMVColumn";
+ private static final String RAW_LONG_MV_COLUMN = "rawLongMVColumn";
+ private static final String RAW_FLOAT_MV_COLUMN = "rawFloatMVColumn";
+ private static final String RAW_DOUBLE_MV_COLUMN = "rawDoubleMVColumn";
+ private static final String RAW_STRING_MV_COLUMN = "rawStringMVColumn";
//@formatter:off
private static final Schema SCHEMA = new Schema.SchemaBuilder()
@@ -121,19 +119,19 @@ public class DistinctQueriesTest extends BaseQueriesTest {
.addMultiValueDimension(FLOAT_MV_COLUMN, DataType.FLOAT)
.addMultiValueDimension(DOUBLE_MV_COLUMN, DataType.DOUBLE)
.addMultiValueDimension(STRING_MV_COLUMN, DataType.STRING)
-// .addMultiValueDimension(RAW_INT_MV_COLUMN, DataType.INT)
-// .addMultiValueDimension(RAW_LONG_MV_COLUMN, DataType.LONG)
-// .addMultiValueDimension(RAW_FLOAT_MV_COLUMN, DataType.FLOAT)
-// .addMultiValueDimension(RAW_DOUBLE_MV_COLUMN, DataType.DOUBLE)
-// .addMultiValueDimension(RAW_STRING_MV_COLUMN, DataType.STRING)
+ .addMultiValueDimension(RAW_INT_MV_COLUMN, DataType.INT)
+ .addMultiValueDimension(RAW_LONG_MV_COLUMN, DataType.LONG)
+ .addMultiValueDimension(RAW_FLOAT_MV_COLUMN, DataType.FLOAT)
+ .addMultiValueDimension(RAW_DOUBLE_MV_COLUMN, DataType.DOUBLE)
+ .addMultiValueDimension(RAW_STRING_MV_COLUMN, DataType.STRING)
.build();
//@formatter:on
private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
.setNoDictionaryColumns(
Arrays.asList(RAW_INT_COLUMN, RAW_LONG_COLUMN, RAW_FLOAT_COLUMN, RAW_DOUBLE_COLUMN, RAW_BIG_DECIMAL_COLUMN,
- RAW_STRING_COLUMN, RAW_BYTES_COLUMN/*, RAW_INT_MV_COLUMN, RAW_LONG_MV_COLUMN, RAW_FLOAT_MV_COLUMN,
- RAW_DOUBLE_MV_COLUMN, RAW_STRING_MV_COLUMN*/)).build();
+ RAW_STRING_COLUMN, RAW_BYTES_COLUMN, RAW_INT_MV_COLUMN, RAW_LONG_MV_COLUMN, RAW_FLOAT_MV_COLUMN,
+ RAW_DOUBLE_MV_COLUMN, RAW_STRING_MV_COLUMN)).build();
private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;
@@ -204,11 +202,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
record.putValue(FLOAT_MV_COLUMN, mvValue);
record.putValue(DOUBLE_MV_COLUMN, mvValue);
record.putValue(STRING_MV_COLUMN, mvValue);
-// record.putValue(RAW_INT_MV_COLUMN, mvValue);
-// record.putValue(RAW_LONG_MV_COLUMN, mvValue);
-// record.putValue(RAW_FLOAT_MV_COLUMN, mvValue);
-// record.putValue(RAW_DOUBLE_MV_COLUMN, mvValue);
-// record.putValue(RAW_STRING_MV_COLUMN, mvValue);
+ record.putValue(RAW_INT_MV_COLUMN, mvValue);
+ record.putValue(RAW_LONG_MV_COLUMN, mvValue);
+ record.putValue(RAW_FLOAT_MV_COLUMN, mvValue);
+ record.putValue(RAW_DOUBLE_MV_COLUMN, mvValue);
+ record.putValue(RAW_STRING_MV_COLUMN, mvValue);
uniqueRecords.add(record);
}
@@ -366,6 +364,55 @@ public class DistinctQueriesTest extends BaseQueriesTest {
}
}
}
+ {
+ // Raw MV numeric columns
+ //@formatter:off
+ List<String> queries = Arrays.asList(
+ "SELECT DISTINCT(rawIntMVColumn) FROM testTable",
+ "SELECT DISTINCT(rawLongMVColumn) FROM testTable",
+ "SELECT DISTINCT(rawFloatMVColumn) FROM testTable",
+ "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable"
+ );
+ //@formatter:on
+ // We define a specific result set here since the data read from raw is in the order added
+ Set<Integer> expectedValues = new HashSet<>(Arrays.asList(0, 1, 2, 3, 4, 100, 101, 102, 103, 104));
+ for (String query : queries) {
+ DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+ DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+ for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+ assertEquals(distinctTable.size(), 10);
+ Set<Integer> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ assertEquals(values.length, 1);
+ assertTrue(values[0] instanceof Number);
+ actualValues.add(((Number) values[0]).intValue());
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+ }
+ }
+ {
+ // Raw MV string column
+ //@formatter:off
+ String query = "SELECT DISTINCT(rawStringMVColumn) FROM testTable";
+ //@formatter:on
+ // We define a specific result set here since the data read from raw is in the order added
+ Set<Integer> expectedValues = new HashSet<>(Arrays.asList(0, 1, 2, 3, 4, 100, 101, 102, 103, 104));
+ DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+ DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+ for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+ assertEquals(distinctTable.size(), 10);
+ Set<Integer> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ assertEquals(values.length, 1);
+ assertTrue(values[0] instanceof String);
+ actualValues.add(Integer.parseInt((String) values[0]));
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+ }
}
@Test
@@ -562,6 +609,85 @@ public class DistinctQueriesTest extends BaseQueriesTest {
assertEquals(actualValues, expectedValues);
}
}
+ {
+ // Numeric raw MV columns ASC
+ //@formatter:off
+ List<String> queries = Arrays.asList(
+ "SELECT DISTINCT(rawIntMVColumn) FROM testTable ORDER BY rawIntMVColumn",
+ "SELECT DISTINCT(rawLongMVColumn) FROM testTable ORDER BY rawLongMVColumn",
+ "SELECT DISTINCT(rawFloatMVColumn) FROM testTable ORDER BY rawFloatMVColumn",
+ "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable ORDER BY rawDoubleMVColumn"
+ );
+ //@formatter:on
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ expectedValues.add(i);
+ }
+ for (String query : queries) {
+ DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+ DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+ for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+ assertEquals(distinctTable.size(), 10);
+ Set<Integer> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ assertEquals(values.length, 1);
+ assertTrue(values[0] instanceof Number);
+ actualValues.add(((Number) values[0]).intValue());
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+ }
+ }
+ {
+ // Numeric raw MV columns DESC
+ //@formatter:off
+ List<String> queries = Arrays.asList(
+ "SELECT DISTINCT(rawIntMVColumn) FROM testTable ORDER BY rawIntMVColumn DESC",
+ "SELECT DISTINCT(rawLongMVColumn) FROM testTable ORDER BY rawLongMVColumn DESC",
+ "SELECT DISTINCT(rawFloatMVColumn) FROM testTable ORDER BY rawFloatMVColumn DESC",
+ "SELECT DISTINCT(rawDoubleMVColumn) FROM testTable ORDER BY rawDoubleMVColumn DESC"
+ );
+ //@formatter:on
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 10; i < 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValues.add(i);
+ }
+ for (String query : queries) {
+ DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+ DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+ for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+ assertEquals(distinctTable.size(), 10);
+ Set<Integer> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ assertEquals(values.length, 1);
+ assertTrue(values[0] instanceof Number);
+ actualValues.add(((Number) values[0]).intValue());
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+ }
+ }
+ {
+ // String raw MV column
+ String query = "SELECT DISTINCT(rawStringMVColumn) FROM testTable ORDER BY rawStringMVColumn";
+ Set<String> expectedValues =
+ new HashSet<>(Arrays.asList("0", "1", "10", "100", "101", "102", "103", "104", "105", "106"));
+ DistinctTable distinctTable1 = getDistinctTableInnerSegment(query);
+ DistinctTable distinctTable2 = DistinctTable.fromByteBuffer(ByteBuffer.wrap(distinctTable1.toBytes()));
+ for (DistinctTable distinctTable : Arrays.asList(distinctTable1, distinctTable2)) {
+ assertEquals(distinctTable.size(), 10);
+ Set<String> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ assertEquals(values.length, 1);
+ assertTrue(values[0] instanceof String);
+ actualValues.add((String) values[0]);
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+ }
}
/**
@@ -576,10 +702,15 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* <li>Selecting some columns order by raw BYTES column</li>
* <li>Selecting some columns transform, filter, order-by and limit</li>
* <li>Selecting some columns with filter that does not match any record</li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* </ul>
*/
private void testDistinctInnerSegmentHelper(String[] queries) {
- assertEquals(queries.length, 8);
+ assertEquals(queries.length, 13);
// Selecting all dictionary-encoded SV columns
// SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, bigDecimalColumn, stringColumn, bytesColumn
@@ -808,6 +939,148 @@ public class DistinctQueriesTest extends BaseQueriesTest {
assertEquals(distinctTable.size(), 0);
assertFalse(distinctTable.isMainTable());
}
+
+ // Selecting all raw MV columns
+ // SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn
+ // FROM testTable LIMIT 10000
+ {
+ DistinctTable distinctTable = getDistinctTableInnerSegment(queries[8]);
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "rawIntMVColumn", "rawLongMVColumn", "rawFloatMVColumn", "rawDoubleMVColumn", "rawStringMVColumn"
+ }, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+ });
+ assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where all 100 * 2^5 unique combinations should be returned
+ int numUniqueCombinations = NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 5);
+ assertEquals(distinctTable.size(), numUniqueCombinations);
+ assertTrue(distinctTable.isMainTable());
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ int intValue = (Integer) values[0];
+ List<Integer> actualValueList =
+ Arrays.asList(intValue, ((Long) values[1]).intValue(), ((Float) values[2]).intValue(),
+ ((Double) values[3]).intValue(), Integer.parseInt((String) values[4]));
+ List<Integer> expectedValues = new ArrayList<>(2);
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ for (Integer actualValue : actualValueList) {
+ assertTrue(expectedValues.contains(actualValue));
+ }
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numUniqueCombinations);
+ }
+
+ // Selecting some SV columns (including raw) and some raw MV columns
+ // SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable LIMIT 10000
+ {
+ DistinctTable distinctTable = getDistinctTableInnerSegment(queries[9]);
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "longColumn", "rawBigDecimalColumn", "rawFloatMVColumn", "rawStringMVColumn"
+ }, new ColumnDataType[]{
+ ColumnDataType.LONG, ColumnDataType.BIG_DECIMAL, ColumnDataType.FLOAT, ColumnDataType.STRING
+ });
+ assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where all 100 * 2^2 unique combinations should be returned
+ int numUniqueCombinations = NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 2);
+ assertEquals(distinctTable.size(), numUniqueCombinations);
+ assertTrue(distinctTable.isMainTable());
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ int intValue = ((Long) values[0]).intValue();
+ List<Integer> actualValueList =
+ Arrays.asList(intValue, ((BigDecimal) values[1]).intValue(), ((Float) values[2]).intValue(),
+ Integer.parseInt((String) values[3]));
+ assertEquals((int) actualValueList.get(1), intValue);
+ List<Integer> expectedMVValues = new ArrayList<>(2);
+ expectedMVValues.add(intValue);
+ expectedMVValues.add(intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ assertTrue(expectedMVValues.contains(actualValueList.get(2)));
+ assertTrue(expectedMVValues.contains(actualValueList.get(3)));
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numUniqueCombinations);
+ }
+
+ // Selecting some columns with filter
+ // SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000
+ {
+ DistinctTable distinctTable = getDistinctTableInnerSegment(queries[10]);
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"stringColumn", "bytesColumn", "rawIntMVColumn"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.INT});
+ assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where 40 * 2 matched combinations should be returned
+ int numMatchedCombinations = (NUM_UNIQUE_RECORDS_PER_SEGMENT - 60) * 2;
+ assertEquals(distinctTable.size(), numMatchedCombinations);
+ assertTrue(distinctTable.isMainTable());
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ int intValue = Integer.parseInt((String) values[0]);
+ assertTrue(intValue >= 60);
+ List<Integer> actualValueList =
+ Arrays.asList(intValue, Integer.parseInt(new String(((ByteArray) values[1]).getBytes(), UTF_8).trim()),
+ (Integer) values[2]);
+ assertEquals((int) actualValueList.get(1), intValue);
+ assertTrue((Integer) values[2] == intValue || (Integer) values[2] == intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numMatchedCombinations);
+ }
+
+ // Selecting some columns order by raw MV column
+ // SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC
+ {
+ DistinctTable distinctTable = getDistinctTableInnerSegment(queries[11]);
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawDoubleMVColumn"},
+ new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.DOUBLE});
+ assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where only 10 top values should be returned
+ assertEquals(distinctTable.size(), 10);
+ assertTrue(distinctTable.isMainTable());
+ Set<Integer> expectedValues = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ expectedValues.add(NUM_UNIQUE_RECORDS_PER_SEGMENT * 2 - i - 1);
+ }
+ Set<Integer> actualValues = new HashSet<>();
+ for (Record record : distinctTable.getRecords()) {
+ Object[] values = record.getValues();
+ int actualValue = ((Double) values[1]).intValue();
+ assertEquals(((Float) values[0]).intValue(), actualValue - NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ actualValues.add(actualValue);
+ }
+ assertEquals(actualValues, expectedValues);
+ }
+
+ // Selecting some columns with filter that does not match any record
+ // SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn
+ {
+ DistinctTable distinctTable = getDistinctTableInnerSegment(queries[12]);
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawLongMVColumn"},
+ new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
+ assertEquals(distinctTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where no record should be returned
+ assertEquals(distinctTable.size(), 0);
+ assertTrue(distinctTable.isMainTable());
+ }
}
/**
@@ -822,6 +1095,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* <li>Selecting some columns order by raw BYTES column</li>
* <li>Selecting some columns transform, filter, order-by and limit</li>
* <li>Selecting some columns with filter that does not match any record</li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* </ul>
*/
@Test
@@ -838,7 +1116,14 @@ public class DistinctQueriesTest extends BaseQueriesTest {
"SELECT DISTINCT intColumn, rawBytesColumn FROM testTable ORDER BY rawBytesColumn LIMIT 5",
"SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 "
+ "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
- "SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn"
+ "SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn",
+ "SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+ + "FROM testTable LIMIT 10000",
+ "SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+ + "LIMIT 10000",
+ "SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE rawIntColumn >= 60 LIMIT 10000",
+ "SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC",
+ "SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn"
});
//@formatter:on
}
@@ -855,6 +1140,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* <li>Selecting some columns order by raw BYTES column</li>
* <li>Selecting some columns transform, filter, order-by and limit</li>
* <li>Selecting some columns with filter that does not match any record</li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* </ul>
*/
@Test
@@ -879,7 +1169,18 @@ public class DistinctQueriesTest extends BaseQueriesTest {
+ "GROUP BY ADD(intColumn, floatColumn), stringColumn "
+ "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
"SELECT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' "
- + "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn"
+ + "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn",
+ "SELECT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+ + "FROM testTable GROUP BY rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, "
+ + "rawStringMVColumn LIMIT 10000",
+ "SELECT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+ + "GROUP BY longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn LIMIT 10000",
+ "SELECT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE rawIntColumn >= 60 "
+ + "GROUP BY stringColumn, bytesColumn, rawIntMVColumn LIMIT 10000",
+ "SELECT floatColumn, rawDoubleMVColumn FROM testTable GROUP BY floatColumn, rawDoubleMVColumn "
+ + "ORDER BY rawDoubleMVColumn DESC",
+ "SELECT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, "
+ + "rawLongMVColumn ORDER BY rawLongMVColumn"
});
//@formatter:on
}
@@ -912,11 +1213,16 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* Selecting some columns with filter that does not match any record in one segment but matches some records in
* the other segment
* </li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* TODO: Support alias and add a test for that
* </ul>
*/
private void testDistinctInterSegmentHelper(String[] queries) {
- assertEquals(queries.length, 9);
+ assertEquals(queries.length, 14);
// Selecting all dictionary-encoded SV columns
// SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, bigDecimalColumn, stringColumn, bytesColumn
@@ -1157,6 +1463,142 @@ public class DistinctQueriesTest extends BaseQueriesTest {
assertEquals((int) rows.get(i)[0], expectedValues[i]);
}
}
+
+ // Selecting all dictionary-encoded raw MV columns
+ // SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn
+ // FROM testTable LIMIT 10000
+ {
+ ResultTable resultTable = getBrokerResponse(queries[9]).getResultTable();
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "rawIntMVColumn", "rawLongMVColumn", "rawFloatMVColumn", "rawDoubleMVColumn", "rawStringMVColumn"
+ }, new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING
+ });
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where all 200 * 2^5 unique values should be returned
+ int numUniqueCombinations = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 5);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), numUniqueCombinations);
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Object[] row : rows) {
+ int intValue = (Integer) row[0];
+ List<Integer> actualValueList = Arrays.asList(intValue, ((Long) row[1]).intValue(), ((Float) row[2]).intValue(),
+ ((Double) row[3]).intValue(), Integer.parseInt((String) row[4]));
+ List<Integer> expectedValues = new ArrayList<>(2);
+ if (intValue < 1000) {
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ } else {
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + 1000);
+ expectedValues.add(intValue % NUM_UNIQUE_RECORDS_PER_SEGMENT + NUM_UNIQUE_RECORDS_PER_SEGMENT + 1000);
+ }
+ for (Integer actualValue : actualValueList) {
+ assertTrue(expectedValues.contains(actualValue));
+ }
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numUniqueCombinations);
+ }
+
+ // Selecting some SV columns (including raw) and some raw MV columns
+ // SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable LIMIT 10000
+ {
+ ResultTable resultTable = getBrokerResponse(queries[10]).getResultTable();
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{
+ "longColumn", "rawBigDecimalColumn", "rawFloatMVColumn", "rawStringMVColumn"
+ }, new ColumnDataType[]{
+ ColumnDataType.LONG, ColumnDataType.BIG_DECIMAL, ColumnDataType.FLOAT, ColumnDataType.STRING
+ });
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where all 200 * 2^2 unique values should be returned
+ int numUniqueCombinations = 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT * (1 << 2);
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), numUniqueCombinations);
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Object[] row : rows) {
+ int intValue = ((Long) row[0]).intValue();
+ List<Integer> actualValueList =
+ Arrays.asList(intValue, ((BigDecimal) row[1]).intValue(), ((Float) row[2]).intValue(),
+ Integer.parseInt((String) row[3]));
+ assertEquals((int) actualValueList.get(1), intValue);
+ List<Integer> expectedMVValues = new ArrayList<>(2);
+ expectedMVValues.add(intValue);
+ expectedMVValues.add(intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ assertTrue(expectedMVValues.contains(actualValueList.get(2)));
+ assertTrue(expectedMVValues.contains(actualValueList.get(3)));
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numUniqueCombinations);
+ }
+
+ // Selecting some columns with filter
+ // SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000
+ {
+ ResultTable resultTable = getBrokerResponse(queries[11]).getResultTable();
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"stringColumn", "bytesColumn", "rawIntMVColumn"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where 140 * 2 matched values should be returned
+ int numMatchedCombinations = (2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60) * 2;
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), numMatchedCombinations);
+ Set<List<Integer>> actualValues = new HashSet<>();
+ for (Object[] row : rows) {
+ int intValue = Integer.parseInt((String) row[0]);
+ assertTrue(intValue >= 60);
+ List<Integer> actualValueList =
+ Arrays.asList(intValue, Integer.parseInt(new String(BytesUtils.toBytes((String) row[1]), UTF_8).trim()),
+ (Integer) row[2]);
+ assertEquals((int) actualValueList.get(1), intValue);
+ assertTrue((Integer) row[2] == intValue || (Integer) row[2] == intValue + NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ actualValues.add(actualValueList);
+ }
+ assertEquals(actualValues.size(), numMatchedCombinations);
+ }
+
+ // Selecting some columns order by raw MV column
+ // SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC
+ {
+ ResultTable resultTable = getBrokerResponse(queries[12]).getResultTable();
+
+ // Check data schema
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawDoubleMVColumn"},
+ new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.DOUBLE});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where only 10 top values should be returned
+ List<Object[]> rows = resultTable.getRows();
+ assertEquals(rows.size(), 10);
+ for (int i = 0; i < 10; i++) {
+ int expectedValue = NUM_UNIQUE_RECORDS_PER_SEGMENT * 2 + 1000 - i - 1;
+ Object[] row = rows.get(i);
+ assertEquals(((Float) row[0]).intValue(), expectedValue - NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ assertEquals(((Double) row[1]).intValue(), expectedValue);
+ }
+ }
+
+ // Selecting some columns with filter that does not match any record
+ // SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn
+ {
+ ResultTable resultTable = getBrokerResponse(queries[13]).getResultTable();
+
+ // Check data schema, where data type should be STRING for all columns
+ DataSchema expectedDataSchema = new DataSchema(new String[]{"floatColumn", "rawLongMVColumn"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+
+ // Check values, where no record should be returned
+ assertTrue(resultTable.getRows().isEmpty());
+ }
}
/**
@@ -1175,6 +1617,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* Selecting some columns with filter that does not match any record in one segment but matches some records in
* the other segment
* </li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* TODO: Support alias and add a test for that
* </ul>
*/
@@ -1193,7 +1640,14 @@ public class DistinctQueriesTest extends BaseQueriesTest {
"SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 "
+ "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
"SELECT DISTINCT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longMVColumn",
- "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5"
+ "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
+ "SELECT DISTINCT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+ + "FROM testTable LIMIT 10000",
+ "SELECT DISTINCT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+ + "LIMIT 10000",
+ "SELECT DISTINCT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000",
+ "SELECT DISTINCT floatColumn, rawDoubleMVColumn FROM testTable ORDER BY rawDoubleMVColumn DESC",
+ "SELECT DISTINCT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' ORDER BY rawLongMVColumn"
});
//@formatter:on
}
@@ -1215,6 +1669,11 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* Selecting some columns with filter that does not match any record in one segment but matches some records in
* the other segment
* </li>
+ * <li>Selecting all dictionary-encoded raw MV columns</li>
+ * <li>Selecting some SV columns (including raw) and some raw MV columns</li>
+ * <li>Selecting some columns with filter with raw MV</li>
+ * <li>Selecting some columns order by raw MV column</li>
+ * <li>Selecting some columns with filter that does not match any record with raw MV</li>
* TODO: Support alias and add a test for that
* </ul>
*/
@@ -1241,7 +1700,18 @@ public class DistinctQueriesTest extends BaseQueriesTest {
+ "ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
"SELECT floatColumn, longMVColumn FROM testTable WHERE stringColumn = 'a' "
+ "GROUP BY floatColumn, longMVColumn ORDER BY longMVColumn",
- "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5"
+ "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5",
+ "SELECT rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn FROM testTable "
+ + "GROUP BY rawIntMVColumn, rawLongMVColumn, rawFloatMVColumn, rawDoubleMVColumn, rawStringMVColumn "
+ + "LIMIT 10000",
+ "SELECT longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn FROM testTable "
+ + "GROUP BY longColumn, rawBigDecimalColumn, rawFloatMVColumn, rawStringMVColumn LIMIT 10000",
+ "SELECT stringColumn, bytesColumn, rawIntMVColumn FROM testTable WHERE intColumn >= 60 GROUP BY "
+ + "stringColumn, bytesColumn, rawIntMVColumn LIMIT 10000",
+ "SELECT floatColumn, rawDoubleMVColumn FROM testTable GROUP BY floatColumn, rawDoubleMVColumn "
+ + "ORDER BY rawDoubleMVColumn DESC",
+ "SELECT floatColumn, rawLongMVColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, "
+ + "rawLongMVColumn ORDER BY rawLongMVColumn"
});
//@formatter:on
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java
new file mode 100644
index 0000000000..b026717400
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/MultiValueRawQueriesTest.java
@@ -0,0 +1,810 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+// TODO: Add tests for more query patterns when additional fixes for MV raw columns are made
+public class MultiValueRawQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "MultiValueRawQueriesTest");
+
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME_1 = "testSegment1";
+ private static final String SEGMENT_NAME_2 = "testSegment2";
+
+ private static final int NUM_UNIQUE_RECORDS_PER_SEGMENT = 10;
+ private static final int NUM_DUPLICATES_PER_RECORDS = 2;
+ private static final int MV_OFFSET = 100;
+ private static final int BASE_VALUE_1 = 0;
+ private static final int BASE_VALUE_2 = 1000;
+
+ private final static String SV_INT_COL = "svIntCol";
+ private final static String MV_INT_COL = "mvIntCol";
+ private final static String MV_LONG_COL = "mvLongCol";
+ private final static String MV_FLOAT_COL = "mvFloatCol";
+ private final static String MV_DOUBLE_COL = "mvDoubleCol";
+ private final static String MV_STRING_COL = "mvStringCol";
+ private final static String MV_RAW_INT_COL = "mvRawIntCol";
+ private final static String MV_RAW_LONG_COL = "mvRawLongCol";
+ private final static String MV_RAW_FLOAT_COL = "mvRawFloatCol";
+ private final static String MV_RAW_DOUBLE_COL = "mvRawDoubleCol";
+ private final static String MV_RAW_STRING_COL = "mvRawStringCol";
+
+ private static final Schema SCHEMA = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addSingleValueDimension(SV_INT_COL, FieldSpec.DataType.INT)
+ .addMultiValueDimension(MV_INT_COL, FieldSpec.DataType.INT)
+ .addMultiValueDimension(MV_LONG_COL, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_FLOAT_COL, FieldSpec.DataType.FLOAT)
+ .addMultiValueDimension(MV_DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+ .addMultiValueDimension(MV_STRING_COL, FieldSpec.DataType.STRING)
+ .addMultiValueDimension(MV_RAW_INT_COL, FieldSpec.DataType.INT)
+ .addMultiValueDimension(MV_RAW_LONG_COL, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_RAW_FLOAT_COL, FieldSpec.DataType.FLOAT)
+ .addMultiValueDimension(MV_RAW_DOUBLE_COL, FieldSpec.DataType.DOUBLE)
+ .addMultiValueDimension(MV_RAW_STRING_COL, FieldSpec.DataType.STRING)
+ .build();
+
+ private static final DataSchema DATA_SCHEMA = new DataSchema(new String[]{"mvDoubleCol", "mvFloatCol", "mvIntCol",
+ "mvLongCol", "mvRawDoubleCol", "mvRawFloatCol", "mvRawIntCol", "mvRawLongCol", "mvRawStringCol", "mvStringCol",
+ "svIntCol"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.FLOAT_ARRAY,
+ DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+ DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.FLOAT_ARRAY,
+ DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.LONG_ARRAY,
+ DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY,
+ DataSchema.ColumnDataType.INT});
+
+ private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNoDictionaryColumns(
+ Arrays.asList(MV_RAW_INT_COL, MV_RAW_LONG_COL, MV_RAW_FLOAT_COL, MV_RAW_DOUBLE_COL, MV_RAW_STRING_COL))
+ .build();
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ ImmutableSegment segment1 = createSegment(generateRecords(BASE_VALUE_1), SEGMENT_NAME_1);
+ ImmutableSegment segment2 = createSegment(generateRecords(BASE_VALUE_2), SEGMENT_NAME_2);
+ _indexSegment = segment1;
+ _indexSegments = Arrays.asList(segment1, segment2);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ for (IndexSegment indexSegment : _indexSegments) {
+ indexSegment.destroy();
+ }
+
+ FileUtils.deleteQuietly(INDEX_DIR);
+ }
+
+ /**
+ * Helper method to generate records based on the given base value.
+ *
+ * All columns will have the same value but different data types (BYTES values are encoded STRING values).
+ * For the {i}th unique record, the value will be {baseValue + i}.
+ */
+ private List<GenericRow> generateRecords(int baseValue) {
+ List<GenericRow> uniqueRecords = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT);
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ int value = baseValue + i;
+ GenericRow record = new GenericRow();
+ record.putValue(SV_INT_COL, value);
+ Integer[] mvValue = new Integer[]{value, value + MV_OFFSET};
+ record.putValue(MV_INT_COL, mvValue);
+ record.putValue(MV_LONG_COL, mvValue);
+ record.putValue(MV_FLOAT_COL, mvValue);
+ record.putValue(MV_DOUBLE_COL, mvValue);
+ record.putValue(MV_STRING_COL, mvValue);
+ record.putValue(MV_RAW_INT_COL, mvValue);
+ record.putValue(MV_RAW_LONG_COL, mvValue);
+ record.putValue(MV_RAW_FLOAT_COL, mvValue);
+ record.putValue(MV_RAW_DOUBLE_COL, mvValue);
+ record.putValue(MV_RAW_STRING_COL, mvValue);
+ uniqueRecords.add(record);
+ }
+
+ List<GenericRow> records = new ArrayList<>(NUM_UNIQUE_RECORDS_PER_SEGMENT * NUM_DUPLICATES_PER_RECORDS);
+ for (int i = 0; i < NUM_DUPLICATES_PER_RECORDS; i++) {
+ records.addAll(uniqueRecords);
+ }
+ return records;
+ }
+
+ private ImmutableSegment createSegment(List<GenericRow> records, String segmentName)
+ throws Exception {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(segmentName);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
+ }
+
+ @Test
+ public void testSelectQueries() {
+ {
+ // Select * query
+ String query = "SELECT * from testTable ORDER BY svIntCol LIMIT 40";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+ assertNotNull(resultTable);
+ assertEquals(resultTable.getDataSchema(), DATA_SCHEMA);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 40);
+
+ Set<Integer> expectedValuesFirst = new HashSet<>();
+ Set<Integer> expectedValuesSecond = new HashSet<>();
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValuesFirst.add(i);
+ expectedValuesSecond.add(i + MV_OFFSET);
+ }
+
+ Set<Integer> actualValuesFirst = new HashSet<>();
+ Set<Integer> actualValuesSecond = new HashSet<>();
+ for (int i = 0; i < 40; i++) {
+ Object[] values = recordRows.get(i);
+ assertEquals(values.length, 11);
+ int svIntValue = (int) values[10];
+ int[] intValues = (int[]) values[2];
+ assertEquals(intValues[1] - intValues[0], MV_OFFSET);
+ assertEquals(svIntValue, intValues[0]);
+
+ int[] intValuesRaw = (int[]) values[6];
+ assertEquals(intValues[0], intValuesRaw[0]);
+ assertEquals(intValues[1], intValuesRaw[1]);
+
+ long[] longValues = (long[]) values[3];
+ long[] longValuesRaw = (long[]) values[7];
+ assertEquals(longValues[0], intValues[0]);
+ assertEquals(longValues[1], intValues[1]);
+ assertEquals(longValues[0], longValuesRaw[0]);
+ assertEquals(longValues[1], longValuesRaw[1]);
+
+ float[] floatValues = (float[]) values[1];
+ float[] floatValuesRaw = (float[]) values[5];
+ assertEquals(floatValues[0], (float) intValues[0]);
+ assertEquals(floatValues[1], (float) intValues[1]);
+ assertEquals(floatValues[0], floatValuesRaw[0]);
+ assertEquals(floatValues[1], floatValuesRaw[1]);
+
+ double[] doubleValues = (double[]) values[0];
+ double[] doubleValuesRaw = (double[]) values[4];
+ assertEquals(doubleValues[0], (double) intValues[0]);
+ assertEquals(doubleValues[1], (double) intValues[1]);
+ assertEquals(doubleValues[0], doubleValuesRaw[0]);
+ assertEquals(doubleValues[1], doubleValuesRaw[1]);
+
+ String[] stringValues = (String[]) values[8];
+ String[] stringValuesRaw = (String[]) values[9];
+ assertEquals(Integer.parseInt(stringValues[0]), intValues[0]);
+ assertEquals(Integer.parseInt(stringValues[1]), intValues[1]);
+ assertEquals(stringValues[0], stringValuesRaw[0]);
+ assertEquals(stringValues[1], stringValuesRaw[1]);
+
+ actualValuesFirst.add(intValues[0]);
+ actualValuesSecond.add(intValues[1]);
+ }
+ assertTrue(actualValuesFirst.containsAll(expectedValuesFirst));
+ assertTrue(actualValuesSecond.containsAll(expectedValuesSecond));
+ }
+ {
+ // Select some dict based MV and some raw MV columns. Validate that the values match for the corresponding rows
+ String query = "SELECT mvIntCol, mvDoubleCol, mvStringCol, mvRawIntCol, mvRawDoubleCol, mvRawStringCol, svIntCol "
+ + "from testTable ORDER BY svIntCol LIMIT 40";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "mvIntCol", "mvDoubleCol", "mvStringCol", "mvRawIntCol", "mvRawDoubleCol", "mvRawStringCol", "svIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.INT_ARRAY, DataSchema.ColumnDataType.DOUBLE_ARRAY,
+ DataSchema.ColumnDataType.STRING_ARRAY, DataSchema.ColumnDataType.INT_ARRAY,
+ DataSchema.ColumnDataType.DOUBLE_ARRAY, DataSchema.ColumnDataType.STRING_ARRAY,
+ DataSchema.ColumnDataType.INT
+ });
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 40);
+
+ Set<Integer> expectedValuesFirst = new HashSet<>();
+ Set<Integer> expectedValuesSecond = new HashSet<>();
+ for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+ expectedValuesFirst.add(i);
+ expectedValuesSecond.add(i + MV_OFFSET);
+ }
+
+ Set<Integer> actualValuesFirst = new HashSet<>();
+ Set<Integer> actualValuesSecond = new HashSet<>();
+ for (int i = 0; i < 40; i++) {
+ Object[] values = recordRows.get(i);
+ assertEquals(values.length, 7);
+ int[] intValues = (int[]) values[0];
+ assertEquals(intValues[1] - intValues[0], MV_OFFSET);
+
+ int[] intValuesRaw = (int[]) values[3];
+ assertEquals(intValues[0], intValuesRaw[0]);
+ assertEquals(intValues[1], intValuesRaw[1]);
+
+ double[] doubleValues = (double[]) values[1];
+ double[] doubleValuesRaw = (double[]) values[4];
+ assertEquals(doubleValues[0], (double) intValues[0]);
+ assertEquals(doubleValues[1], (double) intValues[1]);
+ assertEquals(doubleValues[0], doubleValuesRaw[0]);
+ assertEquals(doubleValues[1], doubleValuesRaw[1]);
+
+ String[] stringValues = (String[]) values[2];
+ String[] stringValuesRaw = (String[]) values[5];
+ assertEquals(Integer.parseInt(stringValues[0]), intValues[0]);
+ assertEquals(Integer.parseInt(stringValues[1]), intValues[1]);
+ assertEquals(stringValues[0], stringValuesRaw[0]);
+ assertEquals(stringValues[1], stringValuesRaw[1]);
+
+ assertEquals(intValues[0], (int) values[6]);
+ assertEquals(intValuesRaw[0], (int) values[6]);
+
+ actualValuesFirst.add(intValues[0]);
+ actualValuesSecond.add(intValues[1]);
+ }
+ assertTrue(actualValuesFirst.containsAll(expectedValuesFirst));
+ assertTrue(actualValuesSecond.containsAll(expectedValuesSecond));
+ }
+ {
+ // Test a select with a ARRAYLENGTH transform function
+ String query = "SELECT ARRAYLENGTH(mvRawLongCol), ARRAYLENGTH(mvLongCol) from testTable LIMIT 10";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+ assertNotNull(resultTable);
+ DataSchema dataSchema = new DataSchema(new String[]{"arraylength(mvRawLongCol)", "arraylength(mvLongCol)"},
+ new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 10);
+
+ for (int i = 0; i < 10; i++) {
+ Object[] values = recordRows.get(i);
+ assertEquals(values.length, 2);
+ int intRawVal = (int) values[0];
+ int intVal = (int) values[1];
+ assertEquals(intRawVal, 2);
+ assertEquals(intVal, intRawVal);
+ assertEquals(intVal, intRawVal);
+ }
+ }
+ }
+
+ @Test
+ public void testSimpleAggregateQueries() {
+ {
+ // Aggregation on int columns
+ String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+ + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+ + "AVGMV(mvRawIntCol) from testTable";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+ "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ });
+ validateSimpleAggregateQueryResults(resultTable, dataSchema);
+ }
+ {
+ // Aggregation on long columns
+ String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+ + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+ + "AVGMV(mvRawLongCol) from testTable";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+ "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ });
+ validateSimpleAggregateQueryResults(resultTable, dataSchema);
+ }
+ {
+ // Aggregation on float columns
+ String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+ + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+ + "AVGMV(mvRawFloatCol) from testTable";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+ "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+ "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ });
+ validateSimpleAggregateQueryResults(resultTable, dataSchema);
+ }
+ {
+ // Aggregation on double columns
+ String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+ + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+ + "AVGMV(mvRawDoubleCol) from testTable";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+ "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+ "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ });
+ validateSimpleAggregateQueryResults(resultTable, dataSchema);
+ }
+ {
+ // Aggregation on string columns
+ String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+ + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+ + "AVGMV(mvRawStringCol) from testTable";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+ "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+ "avgmv(mvStringCol)", "avgmv(mvRawStringCol)"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE
+ });
+ validateSimpleAggregateQueryResults(resultTable, dataSchema);
+ }
+ }
+
+ private void validateSimpleAggregateQueryResults(ResultTable resultTable, DataSchema expectedDataSchema) {
+ assertNotNull(resultTable);
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 1);
+
+ Object[] values = recordRows.get(0);
+ long countInt = (long) values[0];
+ long countIntRaw = (long) values[1];
+ assertEquals(countInt, 160);
+ assertEquals(countInt, countIntRaw);
+
+ double sumInt = (double) values[2];
+ double sumIntRaw = (double) values[3];
+ assertEquals(sumInt, 88720.0);
+ assertEquals(sumInt, sumIntRaw);
+
+ double minInt = (double) values[4];
+ double minIntRaw = (double) values[5];
+ assertEquals(minInt, 0.0);
+ assertEquals(minInt, minIntRaw);
+
+ double maxInt = (double) values[6];
+ double maxIntRaw = (double) values[7];
+ assertEquals(maxInt, 1109.0);
+ assertEquals(maxInt, maxIntRaw);
+
+ double avgInt = (double) values[8];
+ double avgIntRaw = (double) values[9];
+ assertEquals(avgInt, 554.5);
+ assertEquals(avgInt, avgIntRaw);
+ }
+
+ @Test
+ public void testAggregateWithGroupByQueries() {
+ {
+ // Aggregation on int columns with group by
+ String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+ + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+ + "AVGMV(mvRawIntCol), svIntCol, mvRawLongCol from testTable GROUP BY svIntCol, mvRawLongCol "
+ + "ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+ "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+ "svIntCol", "mvRawLongCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+ }
+ {
+ // Aggregation on long columns with group by
+ String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+ + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+ + "AVGMV(mvRawLongCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+ + "ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+ "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)",
+ "svIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+ }
+ {
+ // Aggregation on float columns with group by
+ String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+ + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+ + "AVGMV(mvRawFloatCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+ + "ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+ "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+ "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)", "svIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+ }
+ {
+ // Aggregation on double columns with group by
+ String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+ + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+ + "AVGMV(mvRawDoubleCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+ + "ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+ "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+ "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)", "svIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+ }
+ {
+ // Aggregation on string columns with group by
+ String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+ + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+ + "AVGMV(mvRawStringCol), svIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvRawIntCol "
+ + "ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+ "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+ "avgmv(mvStringCol)", "avgmv(mvRawStringCol)", "svIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, false);
+ }
+ {
+ // Aggregation on int columns with group by on 3 columns
+ String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+ + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+ + "AVGMV(mvRawIntCol), svIntCol, mvLongCol, mvRawLongCol from testTable GROUP BY svIntCol, mvLongCol, "
+ + "mvRawLongCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+ "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+ "svIntCol", "mvLongCol", "mvRawLongCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+ DataSchema.ColumnDataType.LONG
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+ }
+ {
+ // Aggregation on long columns with group by on 3 columns
+ String query = "SELECT COUNTMV(mvLongCol), COUNTMV(mvRawLongCol), SUMMV(mvLongCol), SUMMV(mvRawLongCol), "
+ + "MINMV(mvLongCol), MINMV(mvRawLongCol), MAXMV(mvLongCol), MAXMV(mvRawLongCol), AVGMV(mvLongCol), "
+ + "AVGMV(mvRawLongCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol, "
+ + "mvRawIntCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvLongCol)", "countmv(mvRawLongCol)", "summv(mvLongCol)", "summv(mvRawLongCol)", "minmv(mvLongCol)",
+ "minmv(mvRawLongCol)", "maxmv(mvLongCol)", "maxmv(mvRawLongCol)", "avgmv(mvLongCol)", "avgmv(mvRawLongCol)",
+ "svIntCol", "mvIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+ }
+ {
+ // Aggregation on float columns with group by on 3 columns
+ String query = "SELECT COUNTMV(mvFloatCol), COUNTMV(mvRawFloatCol), SUMMV(mvFloatCol), SUMMV(mvRawFloatCol), "
+ + "MINMV(mvFloatCol), MINMV(mvRawFloatCol), MAXMV(mvFloatCol), MAXMV(mvRawFloatCol), AVGMV(mvFloatCol), "
+ + "AVGMV(mvRawFloatCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol, "
+ + "mvRawIntCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvFloatCol)", "countmv(mvRawFloatCol)", "summv(mvFloatCol)", "summv(mvRawFloatCol)",
+ "minmv(mvFloatCol)", "minmv(mvRawFloatCol)", "maxmv(mvFloatCol)", "maxmv(mvRawFloatCol)",
+ "avgmv(mvFloatCol)", "avgmv(mvRawFloatCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+ }
+ {
+ // Aggregation on double columns with group by on 3 columns
+ String query = "SELECT COUNTMV(mvDoubleCol), COUNTMV(mvRawDoubleCol), SUMMV(mvDoubleCol), SUMMV(mvRawDoubleCol), "
+ + "MINMV(mvDoubleCol), MINMV(mvRawDoubleCol), MAXMV(mvDoubleCol), MAXMV(mvRawDoubleCol), AVGMV(mvDoubleCol), "
+ + "AVGMV(mvRawDoubleCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol, "
+ + "mvRawIntCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvDoubleCol)", "countmv(mvRawDoubleCol)", "summv(mvDoubleCol)", "summv(mvRawDoubleCol)",
+ "minmv(mvDoubleCol)", "minmv(mvRawDoubleCol)", "maxmv(mvDoubleCol)", "maxmv(mvRawDoubleCol)",
+ "avgmv(mvDoubleCol)", "avgmv(mvRawDoubleCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+ }
+ {
+ // Aggregation on string columns with group by on 3 columns
+ String query = "SELECT COUNTMV(mvStringCol), COUNTMV(mvRawStringCol), SUMMV(mvStringCol), SUMMV(mvRawStringCol), "
+ + "MINMV(mvStringCol), MINMV(mvRawStringCol), MAXMV(mvStringCol), MAXMV(mvRawStringCol), AVGMV(mvStringCol), "
+ + "AVGMV(mvRawStringCol), svIntCol, mvIntCol, mvRawIntCol from testTable GROUP BY svIntCol, mvIntCol,"
+ + "mvRawIntCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvStringCol)", "countmv(mvRawStringCol)", "summv(mvStringCol)", "summv(mvRawStringCol)",
+ "minmv(mvStringCol)", "minmv(mvRawStringCol)", "maxmv(mvStringCol)", "maxmv(mvRawStringCol)",
+ "avgmv(mvStringCol)", "avgmv(mvRawStringCol)", "svIntCol", "mvIntCol", "mvRawIntCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT,
+ DataSchema.ColumnDataType.INT
+ });
+ validateAggregateWithGroupByQueryResults(resultTable, dataSchema, true);
+ }
+ {
+ // Aggregation on int columns with group by on 3 columns, two of them RAW
+ String query = "SELECT COUNTMV(mvIntCol), COUNTMV(mvRawIntCol), SUMMV(mvIntCol), SUMMV(mvRawIntCol), "
+ + "MINMV(mvIntCol), MINMV(mvRawIntCol), MAXMV(mvIntCol), MAXMV(mvRawIntCol), AVGMV(mvIntCol), "
+ + "AVGMV(mvRawIntCol), svIntCol, mvRawLongCol, mvRawFloatCol from testTable GROUP BY svIntCol, mvRawLongCol, "
+ + "mvRawFloatCol ORDER BY svIntCol";
+ ResultTable resultTable = getBrokerResponse(query).getResultTable();
+
+ DataSchema dataSchema = new DataSchema(new String[]{
+ "countmv(mvIntCol)", "countmv(mvRawIntCol)", "summv(mvIntCol)", "summv(mvRawIntCol)", "minmv(mvIntCol)",
+ "minmv(mvRawIntCol)", "maxmv(mvIntCol)", "maxmv(mvRawIntCol)", "avgmv(mvIntCol)", "avgmv(mvRawIntCol)",
+ "svIntCol", "mvRawLongCol", "mvRawFloatCol"
+ }, new DataSchema.ColumnDataType[]{
+ DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.LONG, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE,
+ DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.LONG,
+ DataSchema.ColumnDataType.FLOAT
+ });
+ assertNotNull(resultTable);
+ assertEquals(resultTable.getDataSchema(), dataSchema);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 10);
+
+ int[] expectedSVIntValues;
+ expectedSVIntValues = new int[]{0, 0, 0, 0, 1, 1, 1, 1, 2, 2};
+
+ for (int i = 0; i < 10; i++) {
+ Object[] values = recordRows.get(i);
+ assertEquals(values.length, 13);
+
+ long count = (long) values[0];
+ long countRaw = (long) values[1];
+ assertEquals(count, 8);
+ assertEquals(count, countRaw);
+
+ double sum = (double) values[2];
+ double sumRaw = (double) values[3];
+ assertEquals(sum, sumRaw);
+
+ double min = (double) values[4];
+ double minRaw = (double) values[5];
+ assertEquals(min, minRaw);
+
+ double max = (double) values[6];
+ double maxRaw = (double) values[7];
+ assertEquals(max, maxRaw);
+
+ assertEquals(max - min, (double) MV_OFFSET);
+
+ double avg = (double) values[8];
+ double avgRaw = (double) values[9];
+ assertEquals(avg, avgRaw);
+
+ assertEquals((int) values[10], expectedSVIntValues[i]);
+
+ assertTrue((long) values[11] == expectedSVIntValues[i]
+ || (long) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+
+ assertTrue((float) values[12] == (float) expectedSVIntValues[i]
+ || (float) values[12] == (float) (expectedSVIntValues[i] + MV_OFFSET));
+ }
+ }
+ }
+
+ private void validateAggregateWithGroupByQueryResults(ResultTable resultTable, DataSchema expectedDataSchema,
+ boolean isThreeColumnGroupBy) {
+ assertNotNull(resultTable);
+ assertEquals(resultTable.getDataSchema(), expectedDataSchema);
+ List<Object[]> recordRows = resultTable.getRows();
+ assertEquals(recordRows.size(), 10);
+
+ int[] expectedSVIntValues;
+
+ if (isThreeColumnGroupBy) {
+ expectedSVIntValues = new int[]{0, 0, 0, 0, 1, 1, 1, 1, 2, 2};
+ } else {
+ expectedSVIntValues = new int[]{0, 0, 1, 1, 2, 2, 3, 3, 4, 4};
+ }
+
+ for (int i = 0; i < 10; i++) {
+ Object[] values = recordRows.get(i);
+ if (isThreeColumnGroupBy) {
+ assertEquals(values.length, 13);
+ } else {
+ assertEquals(values.length, 12);
+ }
+
+ long count = (long) values[0];
+ long countRaw = (long) values[1];
+ assertEquals(count, 8);
+ assertEquals(count, countRaw);
+
+ double sum = (double) values[2];
+ double sumRaw = (double) values[3];
+ assertEquals(sum, sumRaw);
+
+ double min = (double) values[4];
+ double minRaw = (double) values[5];
+ assertEquals(min, minRaw);
+
+ double max = (double) values[6];
+ double maxRaw = (double) values[7];
+ assertEquals(max, maxRaw);
+
+ assertEquals(max - min, (double) MV_OFFSET);
+
+ double avg = (double) values[8];
+ double avgRaw = (double) values[9];
+ assertEquals(avg, avgRaw);
+
+ assertEquals((int) values[10], expectedSVIntValues[i]);
+
+ if (expectedDataSchema.getColumnDataType(11) == DataSchema.ColumnDataType.LONG) {
+ assertTrue((long) values[11] == expectedSVIntValues[i]
+ || (long) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+ } else {
+ assertTrue((int) values[11] == expectedSVIntValues[i]
+ || (int) values[11] == expectedSVIntValues[i] + MV_OFFSET);
+ }
+
+ if (isThreeColumnGroupBy) {
+ if (expectedDataSchema.getColumnDataType(12) == DataSchema.ColumnDataType.LONG) {
+ assertTrue((long) values[12] == expectedSVIntValues[i]
+ || (long) values[12] == expectedSVIntValues[i] + MV_OFFSET);
+ } else {
+ assertTrue((int) values[12] == expectedSVIntValues[i]
+ || (int) values[12] == expectedSVIntValues[i] + MV_OFFSET);
+ }
+ }
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
index bbe9a7f286..9a03397157 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/DefaultMutableIndexProvider.java
@@ -51,24 +51,35 @@ public class DefaultMutableIndexProvider implements MutableIndexProvider {
FieldSpec.DataType storedType = context.getFieldSpec().getDataType().getStoredType();
boolean isSingleValue = context.getFieldSpec().isSingleValueField();
if (!context.hasDictionary()) {
- // No dictionary column must be single-valued
- assert isSingleValue;
- String allocationContext =
- buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
- V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
- if (storedType.isFixedWidth()) {
- return new FixedByteSVMutableForwardIndex(false, storedType, context.getCapacity(), context.getMemoryManager(),
- allocationContext);
+ if (isSingleValue) {
+ String allocationContext =
+ buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
+ V1Constants.Indexes.RAW_SV_FORWARD_INDEX_FILE_EXTENSION);
+ if (storedType.isFixedWidth()) {
+ return new FixedByteSVMutableForwardIndex(false, storedType, context.getCapacity(),
+ context.getMemoryManager(), allocationContext);
+ } else {
+ // RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns from previous consuming
+ // segments
+ // TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary
+ // columns as well
+ // TODO: Use the stats to get estimated average length
+ // Use a smaller capacity as opposed to segment flush size
+ int initialCapacity = Math.min(context.getCapacity(),
+ NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
+ return new VarByteSVMutableForwardIndex(storedType, context.getMemoryManager(), allocationContext,
+ initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
+ }
} else {
- // RealtimeSegmentStatsHistory does not have the stats for no-dictionary columns from previous consuming
- // segments
- // TODO: Add support for updating RealtimeSegmentStatsHistory with average column value size for no dictionary
- // columns as well
- // TODO: Use the stats to get estimated average length
- // Use a smaller capacity as opposed to segment flush size
- int initialCapacity = Math.min(context.getCapacity(), NODICT_VARIABLE_WIDTH_ESTIMATED_NUMBER_OF_VALUES_DEFAULT);
- return new VarByteSVMutableForwardIndex(storedType, context.getMemoryManager(), allocationContext,
- initialCapacity, NODICT_VARIABLE_WIDTH_ESTIMATED_AVERAGE_VALUE_LENGTH_DEFAULT);
+ // TODO: Add support for variable width (bytes, string, big decimal) MV RAW column types
+ assert storedType.isFixedWidth();
+ String allocationContext =
+ buildAllocationContext(context.getSegmentName(), context.getFieldSpec().getName(),
+ V1Constants.Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
+ // TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
+ return new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, context.getAvgNumMultiValues(),
+ context.getCapacity(), storedType.size(), context.getMemoryManager(), allocationContext, false,
+ storedType);
}
} else {
if (isSingleValue) {
@@ -82,7 +93,7 @@ public class DefaultMutableIndexProvider implements MutableIndexProvider {
// TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
return new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, context.getAvgNumMultiValues(),
context.getCapacity(), Integer.BYTES,
- context.getMemoryManager(), allocationContext);
+ context.getMemoryManager(), allocationContext, true, INT);
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
index c5f013a3d8..481829c86e 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/IntermediateSegment.java
@@ -153,7 +153,7 @@ public class IntermediateSegment implements MutableSegment {
// TODO: Start with a smaller capacity on FixedByteMVForwardIndexReaderWriter and let it expand
forwardIndex =
new FixedByteMVMutableForwardIndex(MAX_MULTI_VALUES_PER_ROW, DEFAULT_AVG_MULTI_VALUE_COUNT, _capacity,
- Integer.BYTES, _memoryManager, allocationContext);
+ Integer.BYTES, _memoryManager, allocationContext, true, DataType.INT);
}
_indexContainerMap.put(column,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 1f17cd682e..179b1fe795 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -436,9 +436,10 @@ public class MutableSegmentImpl implements MutableSegment {
column, dataType.toString());
return false;
}
- // So don't create dictionary if the column is member of noDictionary, is single-value
- // and doesn't have an inverted index
- return fieldSpec.isSingleValueField() && !invertedIndexColumns.contains(column);
+ // So don't create dictionary if the column (1) is member of noDictionary, and (2) is single-value or multi-value
+ // with a fixed-width field, and (3) doesn't have an inverted index
+ return (fieldSpec.isSingleValueField() || fieldSpec.getDataType().isFixedWidth())
+ && !invertedIndexColumns.contains(column);
}
// column is not a part of noDictionary set, so create dictionary
return false;
@@ -764,26 +765,75 @@ public class MutableSegmentImpl implements MutableSegment {
}
}
} else {
- // Multi-value column (always dictionary-encoded)
+ // Multi-value column
int[] dictIds = indexContainer._dictIds;
- // Update numValues info
- indexContainer._numValuesInfo.updateMVEntry(dictIds.length);
+ if (dictIds != null) {
+ // Dictionary encoded
+ // Update numValues info
+ indexContainer._numValuesInfo.updateMVEntry(dictIds.length);
- // Update forward index
- indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
+ // Update forward index
+ indexContainer._forwardIndex.setDictIdMV(docId, dictIds);
- // Update inverted index
- MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
- if (invertedIndex != null) {
- for (int dictId : dictIds) {
- try {
- invertedIndex.add(dictId, docId);
- } catch (Exception e) {
- recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+ // Update inverted index
+ MutableInvertedIndex invertedIndex = indexContainer._invertedIndex;
+ if (invertedIndex != null) {
+ for (int dictId : dictIds) {
+ try {
+ invertedIndex.add(dictId, docId);
+ } catch (Exception e) {
+ recordIndexingError(FieldConfig.IndexType.INVERTED, e);
+ }
}
}
+ } else {
+ // Raw MV columns
+
+ // Update forward index and numValues info
+ DataType dataType = fieldSpec.getDataType();
+ switch (dataType.getStoredType()) {
+ case INT:
+ Object[] values = (Object[]) value;
+ int[] intValues = new int[values.length];
+ for (int i = 0; i < values.length; i++) {
+ intValues[i] = (Integer) values[i];
+ }
+ indexContainer._forwardIndex.setIntMV(docId, intValues);
+ indexContainer._numValuesInfo.updateMVEntry(intValues.length);
+ break;
+ case LONG:
+ values = (Object[]) value;
+ long[] longValues = new long[values.length];
+ for (int i = 0; i < values.length; i++) {
+ longValues[i] = (Long) values[i];
+ }
+ indexContainer._forwardIndex.setLongMV(docId, longValues);
+ indexContainer._numValuesInfo.updateMVEntry(longValues.length);
+ break;
+ case FLOAT:
+ values = (Object[]) value;
+ float[] floatValues = new float[values.length];
+ for (int i = 0; i < values.length; i++) {
+ floatValues[i] = (Float) values[i];
+ }
+ indexContainer._forwardIndex.setFloatMV(docId, floatValues);
+ indexContainer._numValuesInfo.updateMVEntry(floatValues.length);
+ break;
+ case DOUBLE:
+ values = (Object[]) value;
+ double[] doubleValues = new double[values.length];
+ for (int i = 0; i < values.length; i++) {
+ doubleValues[i] = (Double) values[i];
+ }
+ indexContainer._forwardIndex.setDoubleMV(docId, doubleValues);
+ indexContainer._numValuesInfo.updateMVEntry(doubleValues.length);
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported data type: " + dataType + " for MV no-dictionary column: " + column);
+ }
}
}
}
@@ -982,24 +1032,66 @@ public class MutableSegmentImpl implements MutableSegment {
}
} else {
// Raw index based
- // TODO: support multi-valued column
- switch (forwardIndex.getValueType()) {
- case INT:
- return forwardIndex.getInt(docId);
- case LONG:
- return forwardIndex.getLong(docId);
- case FLOAT:
- return forwardIndex.getFloat(docId);
- case DOUBLE:
- return forwardIndex.getDouble(docId);
- case BIG_DECIMAL:
- return forwardIndex.getBigDecimal(docId);
- case STRING:
- return forwardIndex.getString(docId);
- case BYTES:
- return forwardIndex.getBytes(docId);
- default:
- throw new IllegalStateException();
+ if (forwardIndex.isSingleValue()) {
+ switch (forwardIndex.getValueType()) {
+ case INT:
+ return forwardIndex.getInt(docId);
+ case LONG:
+ return forwardIndex.getLong(docId);
+ case FLOAT:
+ return forwardIndex.getFloat(docId);
+ case DOUBLE:
+ return forwardIndex.getDouble(docId);
+ case BIG_DECIMAL:
+ return forwardIndex.getBigDecimal(docId);
+ case STRING:
+ return forwardIndex.getString(docId);
+ case BYTES:
+ return forwardIndex.getBytes(docId);
+ default:
+ throw new IllegalStateException();
+ }
+ } else {
+ // TODO: support multi-valued column for variable length column types (big decimal, string, bytes)
+ int numValues;
+ Object[] value;
+ switch (forwardIndex.getValueType()) {
+ case INT:
+ int[] intValues = forwardIndex.getIntMV(docId);
+ numValues = intValues.length;
+ value = new Object[numValues];
+ for (int i = 0; i < numValues; i++) {
+ value[i] = intValues[i];
+ }
+ return value;
+ case LONG:
+ long[] longValues = forwardIndex.getLongMV(docId);
+ numValues = longValues.length;
+ value = new Object[numValues];
+ for (int i = 0; i < numValues; i++) {
+ value[i] = longValues[i];
+ }
+ return value;
+ case FLOAT:
+ float[] floatValues = forwardIndex.getFloatMV(docId);
+ numValues = floatValues.length;
+ value = new Object[numValues];
+ for (int i = 0; i < numValues; i++) {
+ value[i] = floatValues[i];
+ }
+ return value;
+ case DOUBLE:
+ double[] doubleValues = forwardIndex.getDoubleMV(docId);
+ numValues = doubleValues.length;
+ value = new Object[numValues];
+ for (int i = 0; i < numValues; i++) {
+ value[i] = doubleValues[i];
+ }
+ return value;
+ default:
+ throw new IllegalStateException("No support for MV no dictionary column of type "
+ + forwardIndex.getValueType());
+ }
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
index f08f95e23d..b962402ab3 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/forward/FixedByteMVMutableForwardIndex.java
@@ -26,6 +26,7 @@ import org.apache.pinot.segment.local.io.writer.impl.FixedByteSingleValueMultiCo
import org.apache.pinot.segment.spi.index.mutable.MutableForwardIndex;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +115,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private final int _rowCountPerChunk;
private final PinotDataBufferMemoryManager _memoryManager;
private final String _context;
+ private final boolean _isDictionaryEncoded;
+ private final FieldSpec.DataType _valueType;
private FixedByteSingleValueMultiColWriter _curHeaderWriter;
private FixedByteSingleValueMultiColWriter _currentDataWriter;
@@ -122,7 +125,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
private int _prevRowLength = 0; // Number of values in the column for the last row added.
public FixedByteMVMutableForwardIndex(int maxNumberOfMultiValuesPerRow, int avgMultiValueCount, int rowCountPerChunk,
- int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context) {
+ int columnSizeInBytes, PinotDataBufferMemoryManager memoryManager, String context, boolean isDictionaryEncoded,
+ FieldSpec.DataType valueType) {
_memoryManager = memoryManager;
_context = context;
int initialCapacity = Math.max(maxNumberOfMultiValuesPerRow, rowCountPerChunk * avgMultiValueCount);
@@ -137,6 +141,8 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
_incrementalCapacity = incrementalCapacity;
addDataBuffer(initialCapacity);
//init(_rowCountPerChunk, _columnSizeInBytes, _maxNumberOfMultiValuesPerRow, initialCapacity, _incrementalCapacity);
+ _isDictionaryEncoded = isDictionaryEncoded;
+ _valueType = valueType;
}
private void addHeaderBuffer() {
@@ -206,12 +212,9 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
return newStartIndex;
}
- /**
- * TODO: Currently we only support dictionary-encoded forward index on multi-value columns.
- */
@Override
public boolean isDictionaryEncoded() {
- return true;
+ return _isDictionaryEncoded;
}
@Override
@@ -221,7 +224,7 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
@Override
public DataType getValueType() {
- return DataType.INT;
+ return _valueType;
}
@Override
@@ -248,11 +251,31 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
return length;
}
+ @Override
+ public int[] getDictIdMV(int docId) {
+ FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+ int rowInCurrentHeader = getRowInCurrentHeader(docId);
+ int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+ int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+ int length = headerReader.getInt(rowInCurrentHeader, 2);
+ FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+ int[] dictIdBuffer = new int[length];
+ for (int i = 0; i < length; i++) {
+ dictIdBuffer[i] = dataReader.getInt(startIndex + i, 0);
+ }
+ return dictIdBuffer;
+ }
+
@Override
public int getIntMV(int docId, int[] valueBuffer) {
return getDictIdMV(docId, valueBuffer);
}
+ @Override
+ public int[] getIntMV(int docId) {
+ return getDictIdMV(docId);
+ }
+
@Override
public int getLongMV(int docId, long[] valueBuffer) {
FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -267,6 +290,21 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
return length;
}
+ @Override
+ public long[] getLongMV(int docId) {
+ FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+ int rowInCurrentHeader = getRowInCurrentHeader(docId);
+ int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+ int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+ int length = headerReader.getInt(rowInCurrentHeader, 2);
+ FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+ long[] valueBuffer = new long[length];
+ for (int i = 0; i < length; i++) {
+ valueBuffer[i] = dataReader.getLong(startIndex + i, 0);
+ }
+ return valueBuffer;
+ }
+
@Override
public int getFloatMV(int docId, float[] valueBuffer) {
FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -281,6 +319,21 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
return length;
}
+ @Override
+ public float[] getFloatMV(int docId) {
+ FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+ int rowInCurrentHeader = getRowInCurrentHeader(docId);
+ int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+ int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+ int length = headerReader.getInt(rowInCurrentHeader, 2);
+ FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+ float[] valueBuffer = new float[length];
+ for (int i = 0; i < length; i++) {
+ valueBuffer[i] = dataReader.getFloat(startIndex + i, 0);
+ }
+ return valueBuffer;
+ }
+
@Override
public int getDoubleMV(int docId, double[] valueBuffer) {
FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
@@ -295,6 +348,28 @@ public class FixedByteMVMutableForwardIndex implements MutableForwardIndex {
return length;
}
+ @Override
+ public double[] getDoubleMV(int docId) {
+ FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+ int rowInCurrentHeader = getRowInCurrentHeader(docId);
+ int bufferIndex = headerReader.getInt(rowInCurrentHeader, 0);
+ int startIndex = headerReader.getInt(rowInCurrentHeader, 1);
+ int length = headerReader.getInt(rowInCurrentHeader, 2);
+ FixedByteSingleValueMultiColReader dataReader = _dataReaders.get(bufferIndex);
+ double[] valueBuffer = new double[length];
+ for (int i = 0; i < length; i++) {
+ valueBuffer[i] = dataReader.getDouble(startIndex + i, 0);
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getNumValuesMV(int docId) {
+ FixedByteSingleValueMultiColReader headerReader = getCurrentReader(docId);
+ int rowInCurrentHeader = getRowInCurrentHeader(docId);
+ return headerReader.getInt(rowInCurrentHeader, 2);
+ }
+
@Override
public void setDictIdMV(int docId, int[] dictIds) {
int newStartIndex = updateHeader(docId, dictIds.length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
index ef8483ee7d..82f5bff1ca 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/fwd/MultiValueFixedByteRawIndexCreator.java
@@ -77,7 +77,8 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
boolean deriveNumDocsPerChunk, int writerVersion)
throws IOException {
File file = new File(baseIndexDir, column + Indexes.RAW_MV_FORWARD_INDEX_FILE_EXTENSION);
- int totalMaxLength = maxNumberOfMultiValueElements * valueType.getStoredType().size();
+ // Store the length followed by the values
+ int totalMaxLength = Integer.BYTES + (maxNumberOfMultiValueElements * valueType.getStoredType().size());
int numDocsPerChunk =
deriveNumDocsPerChunk ? Math.max(TARGET_MAX_CHUNK_SIZE / (totalMaxLength
+ VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE), 1) : DEFAULT_NUM_DOCS_PER_CHUNK;
@@ -152,7 +153,7 @@ public class MultiValueFixedByteRawIndexCreator implements ForwardIndexCreator {
public void putDoubleMV(final double[] values) {
byte[] bytes = new byte[Integer.BYTES
- + values.length * Long.BYTES]; //numValues, bytes required to store the content
+ + values.length * Double.BYTES]; //numValues, bytes required to store the content
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
//write the length
byteBuffer.putInt(values.length);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
index 36b32b8bd2..3882de26cf 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/constant/ConstantMVForwardIndexReader.java
@@ -49,6 +49,16 @@ public final class ConstantMVForwardIndexReader implements ForwardIndexReader<Fo
return 1;
}
+ @Override
+ public int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+ return new int[]{0};
+ }
+
+ @Override
+ public int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+ return 1;
+ }
+
@Override
public void close() {
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
similarity index 92%
rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
index 289cfe3af5..6991996d8a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/BaseChunkForwardIndexReader.java
@@ -37,10 +37,10 @@ import org.slf4j.LoggerFactory;
/**
- * Base implementation for chunk-based single-value raw (non-dictionary-encoded) forward index reader.
+ * Base implementation for chunk-based raw (non-dictionary-encoded) forward index reader.
*/
-public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> {
- private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkSVForwardIndexReader.class);
+public abstract class BaseChunkForwardIndexReader implements ForwardIndexReader<ChunkReaderContext> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseChunkForwardIndexReader.class);
protected final PinotDataBuffer _dataBuffer;
protected final DataType _valueType;
@@ -52,8 +52,9 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
protected final PinotDataBuffer _dataHeader;
protected final int _headerEntryChunkOffsetSize;
protected final PinotDataBuffer _rawData;
+ protected final boolean _isSingleValue;
- public BaseChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
+ public BaseChunkForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType, boolean isSingleValue) {
_dataBuffer = dataBuffer;
_valueType = valueType;
@@ -68,7 +69,7 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
headerOffset += Integer.BYTES;
_lengthOfLongestEntry = _dataBuffer.getInt(headerOffset);
- if (valueType.isFixedWidth()) {
+ if (valueType.isFixedWidth() && isSingleValue) {
Preconditions.checkState(_lengthOfLongestEntry == valueType.size());
}
headerOffset += Integer.BYTES;
@@ -98,6 +99,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
// Useful for uncompressed data.
_rawData = _dataBuffer.view(rawDataStart, _dataBuffer.size());
+
+ _isSingleValue = isSingleValue;
}
/**
@@ -163,7 +166,7 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
@Override
public boolean isSingleValue() {
- return true;
+ return _isSingleValue;
}
@Override
@@ -173,8 +176,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
@Override
public void readValuesSV(int[] docIds, int length, int[] values, ChunkReaderContext context) {
- if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
- switch (getValueType()) {
+ if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+ switch (_valueType) {
case INT: {
int minOffset = docIds[0] * Integer.BYTES;
IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -215,8 +218,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
@Override
public void readValuesSV(int[] docIds, int length, long[] values, ChunkReaderContext context) {
- if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
- switch (getValueType()) {
+ if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+ switch (_valueType) {
case INT: {
int minOffset = docIds[0] * Integer.BYTES;
IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -257,8 +260,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
@Override
public void readValuesSV(int[] docIds, int length, float[] values, ChunkReaderContext context) {
- if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
- switch (getValueType()) {
+ if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+ switch (_valueType) {
case INT: {
int minOffset = docIds[0] * Integer.BYTES;
IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
@@ -299,8 +302,8 @@ public abstract class BaseChunkSVForwardIndexReader implements ForwardIndexReade
@Override
public void readValuesSV(int[] docIds, int length, double[] values, ChunkReaderContext context) {
- if (getValueType().isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
- switch (getValueType()) {
+ if (_valueType.isFixedWidth() && !_isCompressed && isContiguousRange(docIds, length)) {
+ switch (_valueType) {
case INT: {
int minOffset = docIds[0] * Integer.BYTES;
IntBuffer buffer = _rawData.toDirectByteBuffer(minOffset, length * Integer.BYTES).asIntBuffer();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
index 784bcf2780..d77de108ff 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBitMVForwardIndexReader.java
@@ -132,6 +132,77 @@ public final class FixedBitMVForwardIndexReader implements ForwardIndexReader<Fi
return numValues;
}
+ @Override
+ public int[] getDictIdMV(int docId, Context context) {
+ int contextDocId = context._docId;
+ int contextEndOffset = context._endOffset;
+ int startIndex;
+ if (docId == contextDocId + 1) {
+ startIndex = contextEndOffset;
+ } else {
+ int chunkId = docId / _numDocsPerChunk;
+ if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) {
+ // Same chunk
+ startIndex = _bitmapReader.getNextNthSetBitOffset(contextEndOffset + 1, docId - contextDocId - 1);
+ } else {
+ // Different chunk
+ int chunkOffset = _chunkOffsetReader.getInt(chunkId);
+ int indexInChunk = docId % _numDocsPerChunk;
+ if (indexInChunk == 0) {
+ startIndex = chunkOffset;
+ } else {
+ startIndex = _bitmapReader.getNextNthSetBitOffset(chunkOffset + 1, indexInChunk);
+ }
+ }
+ }
+ int endIndex;
+ if (docId == _numDocs - 1) {
+ endIndex = _numValues;
+ } else {
+ endIndex = _bitmapReader.getNextSetBitOffset(startIndex + 1);
+ }
+ int numValues = endIndex - startIndex;
+ int[] dictIdBuffer = new int[numValues];
+ _rawDataReader.readInt(startIndex, numValues, dictIdBuffer);
+
+ // Update context
+ context._docId = docId;
+ context._endOffset = endIndex;
+
+ return dictIdBuffer;
+ }
+
+ public int getNumValuesMV(int docId, Context context) {
+ int contextDocId = context._docId;
+ int contextEndOffset = context._endOffset;
+ int startIndex;
+ if (docId == contextDocId + 1) {
+ startIndex = contextEndOffset;
+ } else {
+ int chunkId = docId / _numDocsPerChunk;
+ if (docId > contextDocId && chunkId == contextDocId / _numDocsPerChunk) {
+ // Same chunk
+ startIndex = _bitmapReader.getNextNthSetBitOffset(contextEndOffset + 1, docId - contextDocId - 1);
+ } else {
+ // Different chunk
+ int chunkOffset = _chunkOffsetReader.getInt(chunkId);
+ int indexInChunk = docId % _numDocsPerChunk;
+ if (indexInChunk == 0) {
+ startIndex = chunkOffset;
+ } else {
+ startIndex = _bitmapReader.getNextNthSetBitOffset(chunkOffset + 1, indexInChunk);
+ }
+ }
+ }
+ int endIndex;
+ if (docId == _numDocs - 1) {
+ endIndex = _numValues;
+ } else {
+ endIndex = _bitmapReader.getNextSetBitOffset(startIndex + 1);
+ }
+ return endIndex - startIndex;
+ }
+
@Override
public void close() {
// NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
index b2e745d11d..acbec1a98d 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkMVForwardIndexReader.java
@@ -30,14 +30,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
* fixed length data type (INT, LONG, FLOAT, DOUBLE).
* <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
*/
-public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader {
private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
- public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
- super(dataBuffer, valueType);
+ public FixedByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType storedType) {
+ super(dataBuffer, storedType, false);
_maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}
@@ -61,6 +61,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
return numValues;
}
+ @Override
+ public int[] getIntMV(int docId, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ int[] valueBuffer = new int[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getInt();
+ }
+ return valueBuffer;
+ }
+
@Override
public int getLongMV(int docId, long[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
@@ -71,6 +82,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
return numValues;
}
+ @Override
+ public long[] getLongMV(int docId, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ long[] valueBuffer = new long[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getLong();
+ }
+ return valueBuffer;
+ }
+
@Override
public int getFloatMV(int docId, float[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
@@ -81,6 +103,17 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
return numValues;
}
+ @Override
+ public float[] getFloatMV(int docId, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ float[] valueBuffer = new float[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getFloat();
+ }
+ return valueBuffer;
+ }
+
@Override
public int getDoubleMV(int docId, double[] valueBuffer, ChunkReaderContext context) {
ByteBuffer byteBuffer = slice(docId, context);
@@ -91,6 +124,23 @@ public final class FixedByteChunkMVForwardIndexReader extends BaseChunkSVForward
return numValues;
}
+ @Override
+ public double[] getDoubleMV(int docId, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ int numValues = byteBuffer.getInt();
+ double[] valueBuffer = new double[numValues];
+ for (int i = 0; i < numValues; i++) {
+ valueBuffer[i] = byteBuffer.getDouble();
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getNumValuesMV(int docId, ChunkReaderContext context) {
+ ByteBuffer byteBuffer = slice(docId, context);
+ return byteBuffer.getInt();
+ }
+
private ByteBuffer slice(int docId, ChunkReaderContext context) {
if (_isCompressed) {
return sliceBytesCompressed(docId, context);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
index 94d8a9a8a6..c394d0021a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedByteChunkSVForwardIndexReader.java
@@ -30,11 +30,11 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
* LONG, FLOAT, DOUBLE).
* <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter}
*/
-public final class FixedByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedByteChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
private final int _chunkSize;
public FixedByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
- super(dataBuffer, valueType);
+ super(dataBuffer, valueType, true);
_chunkSize = _numDocsPerChunk * _lengthOfLongestEntry;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
index 0effd62c7d..0887442804 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/FixedBytePower2ChunkSVForwardIndexReader.java
@@ -30,13 +30,13 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
* LONG, FLOAT, DOUBLE).
* <p>For data layout, please refer to the documentation for {@link FixedByteChunkSVForwardIndexWriter}
*/
-public final class FixedBytePower2ChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class FixedBytePower2ChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
public static final int VERSION = 4;
private final int _shift;
public FixedBytePower2ChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
- super(dataBuffer, valueType);
+ super(dataBuffer, valueType, true);
_shift = Integer.numberOfTrailingZeros(_numDocsPerChunk);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
index 1844957f74..c445312f6f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkMVForwardIndexReader.java
@@ -32,14 +32,14 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
* (STRING, BYTES).
* <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
*/
-public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class VarByteChunkMVForwardIndexReader extends BaseChunkForwardIndexReader {
private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
public VarByteChunkMVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
- super(dataBuffer, valueType);
+ super(dataBuffer, valueType, false);
_maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}
@@ -76,6 +76,29 @@ public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIn
return numValues;
}
+ @Override
+ public String[] getStringMV(final int docId, final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ String[] valueBuffer = new String[numValues];
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = new String(bytes, StandardCharsets.UTF_8);
+ contentOffset += length;
+ }
+ return valueBuffer;
+ }
+
@Override
public int getBytesMV(final int docId, final byte[][] valueBuffer,
final ChunkReaderContext context) {
@@ -99,6 +122,41 @@ public final class VarByteChunkMVForwardIndexReader extends BaseChunkSVForwardIn
return numValues;
}
+ @Override
+ public byte[][] getBytesMV(final int docId, final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ int numValues = byteBuffer.getInt();
+ int contentOffset = (numValues + 1) * Integer.BYTES;
+ byte[][] valueBuffer = new byte[numValues][];
+ for (int i = 0; i < numValues; i++) {
+ int length = byteBuffer.getInt((i + 1) * Integer.BYTES);
+ byte[] bytes = new byte[length];
+ byteBuffer.position(contentOffset);
+ byteBuffer.get(bytes, 0, length);
+ valueBuffer[i] = bytes;
+ contentOffset += length;
+ }
+ return valueBuffer;
+ }
+
+ @Override
+ public int getNumValuesMV(final int docId, final ChunkReaderContext context) {
+ byte[] compressedBytes;
+ if (_isCompressed) {
+ compressedBytes = getBytesCompressed(docId, context);
+ } else {
+ compressedBytes = getBytesUncompressed(docId);
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(compressedBytes);
+ return byteBuffer.getInt();
+ }
+
@Override
public byte[] getBytes(int docId, ChunkReaderContext context) {
if (_isCompressed) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
index c993855ded..4a69f73463 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/forward/VarByteChunkSVForwardIndexReader.java
@@ -34,7 +34,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* (BIG_DECIMAL, STRING, BYTES).
* <p>For data layout, please refer to the documentation for {@link VarByteChunkSVForwardIndexWriter}
*/
-public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIndexReader {
+public final class VarByteChunkSVForwardIndexReader extends BaseChunkForwardIndexReader {
private static final int ROW_OFFSET_SIZE = VarByteChunkSVForwardIndexWriter.CHUNK_HEADER_ENTRY_ROW_OFFSET_SIZE;
private final int _maxChunkSize;
@@ -43,7 +43,7 @@ public final class VarByteChunkSVForwardIndexReader extends BaseChunkSVForwardIn
private final ThreadLocal<byte[]> _reusableBytes = ThreadLocal.withInitial(() -> new byte[_lengthOfLongestEntry]);
public VarByteChunkSVForwardIndexReader(PinotDataBuffer dataBuffer, DataType valueType) {
- super(dataBuffer, valueType);
+ super(dataBuffer, valueType, true);
_maxChunkSize = _numDocsPerChunk * (ROW_OFFSET_SIZE + _lengthOfLongestEntry);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
index b3c5be27b9..c966e8318a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/startree/v2/store/StarTreeLoaderUtils.java
@@ -25,7 +25,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory;
-import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkSVForwardIndexReader;
+import org.apache.pinot.segment.local.segment.index.readers.forward.BaseChunkForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
@@ -98,7 +98,7 @@ public class StarTreeLoaderUtils {
PinotDataBuffer forwardIndexDataBuffer = dataBuffer.view(start, end, ByteOrder.BIG_ENDIAN);
DataType dataType = ValueAggregatorFactory.getAggregatedValueType(functionColumnPair.getFunctionType());
FieldSpec fieldSpec = new MetricFieldSpec(metric, dataType);
- BaseChunkSVForwardIndexReader forwardIndex;
+ BaseChunkForwardIndexReader forwardIndex;
if (dataType == DataType.BYTES) {
forwardIndex = new VarByteChunkSVForwardIndexReader(forwardIndexDataBuffer, DataType.BYTES);
} else {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
new file mode 100644
index 0000000000..51e9e094d9
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplRawMVTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.indexsegment.mutable;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.SegmentTestUtils;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
+import org.apache.pinot.segment.spi.datasource.DataSource;
+import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
+import org.apache.pinot.segment.spi.index.reader.Dictionary;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
+import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+
+public class MutableSegmentImplRawMVTest {
+ private static final String AVRO_FILE = "data/test_data-mv.avro";
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "MutableSegmentImplRawMVTest");
+
+ private Schema _schema;
+ private MutableSegmentImpl _mutableSegmentImpl;
+ private ImmutableSegment _immutableSegment;
+ private long _lastIndexedTs;
+ private long _lastIngestionTimeMs;
+ private long _startTimeMs;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(TEMP_DIR);
+
+ URL resourceUrl = MutableSegmentImplTest.class.getClassLoader().getResource(AVRO_FILE);
+ Assert.assertNotNull(resourceUrl);
+ File avroFile = new File(resourceUrl.getFile());
+
+ SegmentGeneratorConfig config =
+ SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, TEMP_DIR, "testTable");
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+
+ _schema = config.getSchema();
+ Set<String> noDictionaryColumns = new HashSet<>();
+ List<String> noDictionaryColumnsList = new ArrayList<>();
+ for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+ if (!fieldSpec.isSingleValueField() && fieldSpec.getDataType().isFixedWidth()) {
+ noDictionaryColumns.add(fieldSpec.getName());
+ noDictionaryColumnsList.add(fieldSpec.getName());
+ }
+ }
+ config.setRawIndexCreationColumns(noDictionaryColumnsList);
+ assertEquals(noDictionaryColumns.size(), 2);
+
+ driver.init(config);
+ driver.build();
+ _immutableSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, driver.getSegmentName()), ReadMode.mmap);
+
+ VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(_schema, "testSegment");
+ _mutableSegmentImpl = MutableSegmentImplTestUtils
+ .createMutableSegmentImpl(_schema, noDictionaryColumns, Collections.emptySet(), Collections.emptySet(),
+ false);
+ _lastIngestionTimeMs = System.currentTimeMillis();
+ StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(_lastIngestionTimeMs);
+ _startTimeMs = System.currentTimeMillis();
+
+ try (RecordReader recordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.AVRO, avroFile, _schema.getColumnNames(), null)) {
+ GenericRow reuse = new GenericRow();
+ while (recordReader.hasNext()) {
+ _mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
+ _lastIndexedTs = System.currentTimeMillis();
+ }
+ }
+ }
+
+ @Test
+ public void testMetadata() {
+ SegmentMetadata actualSegmentMetadata = _mutableSegmentImpl.getSegmentMetadata();
+ SegmentMetadata expectedSegmentMetadata = _immutableSegment.getSegmentMetadata();
+ assertEquals(actualSegmentMetadata.getTotalDocs(), expectedSegmentMetadata.getTotalDocs());
+
+ // assert that the last indexed timestamp is close to what we expect
+ long actualTs = _mutableSegmentImpl.getSegmentMetadata().getLastIndexedTimestamp();
+ Assert.assertTrue(actualTs >= _startTimeMs);
+ Assert.assertTrue(actualTs <= _lastIndexedTs);
+
+ assertEquals(_mutableSegmentImpl.getSegmentMetadata().getLatestIngestionTimestamp(), _lastIngestionTimeMs);
+
+ for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+ String column = fieldSpec.getName();
+ DataSourceMetadata actualDataSourceMetadata = _mutableSegmentImpl.getDataSource(column).getDataSourceMetadata();
+ DataSourceMetadata expectedDataSourceMetadata = _immutableSegment.getDataSource(column).getDataSourceMetadata();
+ assertEquals(actualDataSourceMetadata.getDataType(), expectedDataSourceMetadata.getDataType());
+ assertEquals(actualDataSourceMetadata.isSingleValue(), expectedDataSourceMetadata.isSingleValue());
+ assertEquals(actualDataSourceMetadata.getNumDocs(), expectedDataSourceMetadata.getNumDocs());
+ if (!expectedDataSourceMetadata.isSingleValue()) {
+ assertEquals(actualDataSourceMetadata.getMaxNumValuesPerMVEntry(),
+ expectedDataSourceMetadata.getMaxNumValuesPerMVEntry());
+ }
+ }
+ }
+
+ @Test
+ public void testDataSourceForSVColumns()
+ throws IOException {
+ for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+ if (fieldSpec.isSingleValueField()) {
+ String column = fieldSpec.getName();
+ DataSource actualDataSource = _mutableSegmentImpl.getDataSource(column);
+ DataSource expectedDataSource = _immutableSegment.getDataSource(column);
+
+ int actualNumDocs = actualDataSource.getDataSourceMetadata().getNumDocs();
+ int expectedNumDocs = expectedDataSource.getDataSourceMetadata().getNumDocs();
+ assertEquals(actualNumDocs, expectedNumDocs);
+
+ Dictionary actualDictionary = actualDataSource.getDictionary();
+ Dictionary expectedDictionary = expectedDataSource.getDictionary();
+ assertEquals(actualDictionary.length(), expectedDictionary.length());
+
+ // Allow the segment name to be different
+ if (column.equals(CommonConstants.Segment.BuiltInVirtualColumn.SEGMENTNAME)) {
+ continue;
+ }
+
+ ForwardIndexReader actualReader = actualDataSource.getForwardIndex();
+ ForwardIndexReader expectedReader = expectedDataSource.getForwardIndex();
+ try (ForwardIndexReaderContext actualReaderContext = actualReader.createContext();
+ ForwardIndexReaderContext expectedReaderContext = expectedReader.createContext()) {
+ for (int docId = 0; docId < expectedNumDocs; docId++) {
+ int actualDictId = actualReader.getDictId(docId, actualReaderContext);
+ int expectedDictId = expectedReader.getDictId(docId, expectedReaderContext);
+ assertEquals(actualDictionary.get(actualDictId), expectedDictionary.get(expectedDictId));
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testDataSourceForMVColumns()
+ throws IOException {
+ for (FieldSpec fieldSpec : _schema.getAllFieldSpecs()) {
+ if (!fieldSpec.isSingleValueField()) {
+ String column = fieldSpec.getName();
+ DataSource actualDataSource = _mutableSegmentImpl.getDataSource(column);
+ DataSource expectedDataSource = _immutableSegment.getDataSource(column);
+
+ int actualNumDocs = actualDataSource.getDataSourceMetadata().getNumDocs();
+ int expectedNumDocs = expectedDataSource.getDataSourceMetadata().getNumDocs();
+ assertEquals(actualNumDocs, expectedNumDocs);
+
+ Dictionary actualDictionary = actualDataSource.getDictionary();
+ Dictionary expectedDictionary = expectedDataSource.getDictionary();
+ assertNull(actualDictionary);
+ assertNull(expectedDictionary);
+
+ int maxNumValuesPerMVEntry = expectedDataSource.getDataSourceMetadata().getMaxNumValuesPerMVEntry();
+
+ ForwardIndexReader actualReader = actualDataSource.getForwardIndex();
+ ForwardIndexReader expectedReader = expectedDataSource.getForwardIndex();
+ try (ForwardIndexReaderContext actualReaderContext = actualReader.createContext();
+ ForwardIndexReaderContext expectedReaderContext = expectedReader.createContext()) {
+ for (int docId = 0; docId < expectedNumDocs; docId++) {
+ switch (fieldSpec.getDataType()) {
+ case INT:
+ int[] actualInts = new int[maxNumValuesPerMVEntry];
+ int[] expectedInts = new int[maxNumValuesPerMVEntry];
+ int actualLength = actualReader.getIntMV(docId, actualInts, actualReaderContext);
+ int expectedLength = expectedReader.getIntMV(docId, expectedInts, expectedReaderContext);
+ assertEquals(actualLength, expectedLength);
+
+ for (int i = 0; i < actualLength; i++) {
+ assertEquals(actualInts[i], expectedInts[i]);
+ }
+ break;
+ case LONG:
+ long[] actualLongs = new long[maxNumValuesPerMVEntry];
+ long[] expectedLongs = new long[maxNumValuesPerMVEntry];
+ actualLength = actualReader.getLongMV(docId, actualLongs, actualReaderContext);
+ expectedLength = expectedReader.getLongMV(docId, expectedLongs, expectedReaderContext);
+ assertEquals(actualLength, expectedLength);
+
+ for (int i = 0; i < actualLength; i++) {
+ assertEquals(actualLongs[i], expectedLongs[i]);
+ }
+ break;
+ case FLOAT:
+ float[] actualFloats = new float[maxNumValuesPerMVEntry];
+ float[] expectedFloats = new float[maxNumValuesPerMVEntry];
+ actualLength = actualReader.getFloatMV(docId, actualFloats, actualReaderContext);
+ expectedLength = expectedReader.getFloatMV(docId, expectedFloats, expectedReaderContext);
+ assertEquals(actualLength, expectedLength);
+
+ for (int i = 0; i < actualLength; i++) {
+ assertEquals(actualFloats[i], expectedFloats[i]);
+ }
+ break;
+ case DOUBLE:
+ double[] actualDoubles = new double[maxNumValuesPerMVEntry];
+ double[] expectedDoubles = new double[maxNumValuesPerMVEntry];
+ actualLength = actualReader.getDoubleMV(docId, actualDoubles, actualReaderContext);
+ expectedLength = expectedReader.getDoubleMV(docId, expectedDoubles, expectedReaderContext);
+ assertEquals(actualLength, expectedLength);
+
+ for (int i = 0; i < actualLength; i++) {
+ assertEquals(actualDoubles[i], expectedDoubles[i]);
+ }
+ break;
+ default:
+ // TODO: add support for byte, string, and big decimal type MV raw columns
+ throw new UnsupportedOperationException("No support for raw MV variable length columns yet");
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ FileUtils.deleteQuietly(TEMP_DIR);
+ }
+}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
index 953b48ef60..0dcf5492d4 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/impl/dictionary/MultiValueDictionaryTest.java
@@ -22,11 +22,13 @@ import java.util.Random;
import org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.data.FieldSpec;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
@@ -47,11 +49,12 @@ public class MultiValueDictionaryTest {
}
@Test
- public void testMultiValueIndexing() {
+ public void testMultiValueIndexingWithDictionary() {
long seed = System.nanoTime();
try (LongOnHeapMutableDictionary dict = new LongOnHeapMutableDictionary();
FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
- NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer")) {
+ NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer",
+ true, FieldSpec.DataType.INT)) {
// Insert rows into the indexer and dictionary
Random random = new Random(seed);
for (int row = 0; row < NROWS; row++) {
@@ -81,4 +84,166 @@ public class MultiValueDictionaryTest {
fail("Failed with random seed: " + seed, t);
}
}
+
+ @Test
+ public void testMultiValueIndexingWithRawInt() {
+ long seed = System.nanoTime();
+ try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+ NROWS / 3, Integer.BYTES, new DirectMemoryManager("test"), "indexer",
+ false, FieldSpec.DataType.INT)) {
+ // Insert rows into the indexer
+ Random random = new Random(seed);
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+ int[] values = new int[numValues];
+ for (int i = 0; i < numValues; i++) {
+ values[i] = random.nextInt();
+ }
+ indexer.setIntMV(row, values);
+ }
+
+ // Read back rows and make sure that the values are good.
+ random = new Random(seed);
+ int[] intValues = new int[MAX_N_VALUES];
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = indexer.getIntMV(row, intValues);
+ assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+ for (int i = 0; i < numValues; i++) {
+ assertEquals(intValues[i], random.nextInt());
+ }
+ }
+ } catch (Throwable t) {
+ fail("Failed with random seed: " + seed, t);
+ }
+ }
+
+ @Test
+ public void testMultiValueIndexingWithRawLong() {
+ long seed = System.nanoTime();
+ try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+ NROWS / 3, Long.BYTES, new DirectMemoryManager("test"), "indexer",
+ false, FieldSpec.DataType.LONG)) {
+ // Insert rows into the indexer
+ Random random = new Random(seed);
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+ long[] values = new long[numValues];
+ for (int i = 0; i < numValues; i++) {
+ values[i] = random.nextLong();
+ }
+ indexer.setLongMV(row, values);
+ }
+
+ // Read back rows and make sure that the values are good.
+ random = new Random(seed);
+ long[] longValues = new long[MAX_N_VALUES];
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = indexer.getLongMV(row, longValues);
+ assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+ for (int i = 0; i < numValues; i++) {
+ assertEquals(longValues[i], random.nextLong());
+ }
+ }
+ } catch (Throwable t) {
+ fail("Failed with random seed: " + seed, t);
+ }
+ }
+
+ @Test
+ public void testMultiValueIndexingWithRawFloat() {
+ long seed = System.nanoTime();
+ try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+ NROWS / 3, Float.BYTES, new DirectMemoryManager("test"), "indexer",
+ false, FieldSpec.DataType.FLOAT)) {
+ // Insert rows into the indexer
+ Random random = new Random(seed);
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+ float[] values = new float[numValues];
+ for (int i = 0; i < numValues; i++) {
+ values[i] = random.nextFloat();
+ }
+ indexer.setFloatMV(row, values);
+ }
+
+ // Read back rows and make sure that the values are good.
+ random = new Random(seed);
+ float[] floatValues = new float[MAX_N_VALUES];
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = indexer.getFloatMV(row, floatValues);
+ assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+ for (int i = 0; i < numValues; i++) {
+ assertEquals(floatValues[i], random.nextFloat());
+ }
+ }
+ } catch (Throwable t) {
+ fail("Failed with random seed: " + seed, t);
+ }
+ }
+
+ @Test
+ public void testMultiValueIndexingWithRawDouble() {
+ long seed = System.nanoTime();
+ try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+ NROWS / 3, Double.BYTES, new DirectMemoryManager("test"), "indexer",
+ false, FieldSpec.DataType.DOUBLE)) {
+ // Insert rows into the indexer
+ Random random = new Random(seed);
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+ double[] values = new double[numValues];
+ for (int i = 0; i < numValues; i++) {
+ values[i] = random.nextDouble();
+ }
+ indexer.setDoubleMV(row, values);
+ }
+
+ // Read back rows and make sure that the values are good.
+ random = new Random(seed);
+ double[] doubleValues = new double[MAX_N_VALUES];
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = indexer.getDoubleMV(row, doubleValues);
+ assertEquals(numValues, Math.abs(random.nextInt()) % MAX_N_VALUES);
+
+ for (int i = 0; i < numValues; i++) {
+ assertEquals(doubleValues[i], random.nextDouble());
+ }
+ }
+ } catch (Throwable t) {
+ fail("Failed with random seed: " + seed, t);
+ }
+ }
+
+ @Test
+ public void testMultiValueIndexingWithRawString() {
+ long seed = System.nanoTime();
+ try (FixedByteMVMutableForwardIndex indexer = new FixedByteMVMutableForwardIndex(MAX_N_VALUES, MAX_N_VALUES / 2,
+ NROWS / 3, 24, new DirectMemoryManager("test"), "indexer",
+ false, FieldSpec.DataType.STRING)) {
+ // Insert rows into the indexer
+ Random random = new Random(seed);
+ for (int row = 0; row < NROWS; row++) {
+ int numValues = Math.abs(random.nextInt()) % MAX_N_VALUES;
+ String[] values = new String[numValues];
+ for (int i = 0; i < numValues; i++) {
+ values[i] = "random1";
+ }
+ final int curRow = row;
+ assertThrows(UnsupportedOperationException.class, () -> indexer.setStringMV(curRow, values));
+ }
+
+ // Read back rows and make sure that the values are good.
+ random = new Random(seed);
+ String[] stringValues = new String[MAX_N_VALUES];
+ for (int row = 0; row < NROWS; row++) {
+ final int curRow = row;
+ assertThrows(UnsupportedOperationException.class, () -> indexer.getStringMV(curRow, stringValues));
+ }
+ } catch (Throwable t) {
+ fail("Failed with random seed: " + seed, t);
+ }
+ }
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
index fb49dce6c4..881aa495a8 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/MultiValueFixedByteRawIndexCreatorTest.java
@@ -125,7 +125,8 @@ public class MultiValueFixedByteRawIndexCreatorTest {
//read
final PinotDataBuffer buffer = PinotDataBuffer
.mapFile(file, true, 0, file.length(), ByteOrder.BIG_ENDIAN, "");
- FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer, DataType.BYTES);
+ FixedByteChunkMVForwardIndexReader reader = new FixedByteChunkMVForwardIndexReader(buffer,
+ dataType.getStoredType());
final ChunkReaderContext context = reader.createContext();
T valueBuffer = constructor.apply(maxElements);
for (int i = 0; i < numDocs; i++) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
index 53fded69be..3268277562 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/mutable/FixedByteMVMutableForwardIndexTest.java
@@ -50,15 +50,18 @@ public class FixedByteMVMutableForwardIndexTest {
Random r = new Random();
final long seed = r.nextLong();
try {
- testIntArray(seed);
- testWithZeroSize(seed);
+ testIntArray(seed, true);
+ testIntArray(seed, false);
+ testWithZeroSize(seed, true);
+ testWithZeroSize(seed, false);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Failed with seed " + seed);
}
for (int mvs = 10; mvs < 1000; mvs += 10) {
try {
- testIntArrayFixedSize(mvs, seed);
+ testIntArrayFixedSize(mvs, seed, true);
+ testIntArrayFixedSize(mvs, seed, false);
} catch (Throwable e) {
e.printStackTrace();
Assert.fail("Failed with seed " + seed + ", mvs " + mvs);
@@ -66,7 +69,7 @@ public class FixedByteMVMutableForwardIndexTest {
}
}
- public void testIntArray(final long seed)
+ public void testIntArray(final long seed, boolean isDictionaryEncoded)
throws IOException {
FixedByteMVMutableForwardIndex readerWriter;
int rows = 1000;
@@ -74,7 +77,7 @@ public class FixedByteMVMutableForwardIndexTest {
int maxNumberOfMultiValuesPerRow = 2000;
readerWriter =
new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 2, rows / 2, columnSizeInBytes, _memoryManager,
- "IntArray");
+ "IntArray", isDictionaryEncoded, FieldSpec.DataType.INT);
Random r = new Random(seed);
int[][] data = new int[rows][];
@@ -94,7 +97,7 @@ public class FixedByteMVMutableForwardIndexTest {
readerWriter.close();
}
- public void testIntArrayFixedSize(int multiValuesPerRow, long seed)
+ public void testIntArrayFixedSize(int multiValuesPerRow, long seed, boolean isDictionaryEncoded)
throws IOException {
FixedByteMVMutableForwardIndex readerWriter;
int rows = 1000;
@@ -102,7 +105,7 @@ public class FixedByteMVMutableForwardIndexTest {
// Keep the rowsPerChunk as a multiple of multiValuesPerRow to check the cases when both data and header buffers
// transition to new ones
readerWriter = new FixedByteMVMutableForwardIndex(multiValuesPerRow, multiValuesPerRow, multiValuesPerRow * 2,
- columnSizeInBytes, _memoryManager, "IntArrayFixedSize");
+ columnSizeInBytes, _memoryManager, "IntArrayFixedSize", isDictionaryEncoded, FieldSpec.DataType.INT);
Random r = new Random(seed);
int[][] data = new int[rows][];
@@ -122,7 +125,7 @@ public class FixedByteMVMutableForwardIndexTest {
readerWriter.close();
}
- public void testWithZeroSize(long seed)
+ public void testWithZeroSize(long seed, boolean isDictionaryEncoded)
throws IOException {
FixedByteMVMutableForwardIndex readerWriter;
final int maxNumberOfMultiValuesPerRow = 5;
@@ -131,7 +134,7 @@ public class FixedByteMVMutableForwardIndexTest {
Random r = new Random(seed);
readerWriter =
new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, 3, r.nextInt(rows) + 1, columnSizeInBytes,
- _memoryManager, "ZeroSize");
+ _memoryManager, "ZeroSize", isDictionaryEncoded, FieldSpec.DataType.INT);
int[][] data = new int[rows][];
for (int i = 0; i < rows; i++) {
@@ -156,12 +159,12 @@ public class FixedByteMVMutableForwardIndexTest {
}
private FixedByteMVMutableForwardIndex createReaderWriter(FieldSpec.DataType dataType, Random r, int rows,
- int maxNumberOfMultiValuesPerRow) {
+ int maxNumberOfMultiValuesPerRow, boolean isDictionaryEncoded) {
final int avgMultiValueCount = r.nextInt(maxNumberOfMultiValuesPerRow) + 1;
final int rowCountPerChunk = r.nextInt(rows) + 1;
return new FixedByteMVMutableForwardIndex(maxNumberOfMultiValuesPerRow, avgMultiValueCount, rowCountPerChunk,
- dataType.size(), _memoryManager, "ReaderWriter");
+ dataType.size(), _memoryManager, "ReaderWriter", isDictionaryEncoded, dataType);
}
private long generateSeed() {
@@ -172,12 +175,18 @@ public class FixedByteMVMutableForwardIndexTest {
@Test
public void testLongArray()
throws IOException {
+ testLongArray(true);
+ testLongArray(false);
+ }
+
+ private void testLongArray(boolean isDictionaryEncoded)
+ throws IOException {
final long seed = generateSeed();
Random r = new Random(seed);
int rows = 1000;
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
- createReaderWriter(FieldSpec.DataType.LONG, r, rows, maxNumberOfMultiValuesPerRow);
+ createReaderWriter(FieldSpec.DataType.LONG, r, rows, maxNumberOfMultiValuesPerRow, isDictionaryEncoded);
long[][] data = new long[rows][];
for (int i = 0; i < rows; i++) {
@@ -204,12 +213,18 @@ public class FixedByteMVMutableForwardIndexTest {
@Test
public void testFloatArray()
throws IOException {
+ testFloatArray(true);
+ testFloatArray(false);
+ }
+
+ private void testFloatArray(boolean isDictoinaryEncoded)
+ throws IOException {
final long seed = generateSeed();
Random r = new Random(seed);
int rows = 1000;
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
- createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, maxNumberOfMultiValuesPerRow);
+ createReaderWriter(FieldSpec.DataType.FLOAT, r, rows, maxNumberOfMultiValuesPerRow, isDictoinaryEncoded);
float[][] data = new float[rows][];
for (int i = 0; i < rows; i++) {
@@ -236,12 +251,18 @@ public class FixedByteMVMutableForwardIndexTest {
@Test
public void testDoubleArray()
throws IOException {
+ testDoubleArray(true);
+ testDoubleArray(false);
+ }
+
+ private void testDoubleArray(boolean isDictonaryEncoded)
+ throws IOException {
final long seed = generateSeed();
Random r = new Random(seed);
int rows = 1000;
final int maxNumberOfMultiValuesPerRow = r.nextInt(100) + 1;
FixedByteMVMutableForwardIndex readerWriter =
- createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, maxNumberOfMultiValuesPerRow);
+ createReaderWriter(FieldSpec.DataType.DOUBLE, r, rows, maxNumberOfMultiValuesPerRow, isDictonaryEncoded);
double[][] data = new double[rows][];
for (int i = 0; i < rows; i++) {
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
index cc84074ea0..38e2bc8539 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/mutable/MutableForwardIndex.java
@@ -92,11 +92,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the dictionary ids for a multi-value column at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default int[] getDictIdMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getDictIdMV(int docId, int[] dictIdBuffer, ForwardIndexReaderContext context) {
return getDictIdMV(docId, dictIdBuffer);
}
+ @Override
+ default int[] getDictIdMV(int docId, ForwardIndexReaderContext context) {
+ return getDictIdMV(docId);
+ }
+
/**
* Writes the dictionary id for a single-value column into the given document id.
*
@@ -321,11 +336,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the INT type multi-value at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default int[] getIntMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getIntMV(int docId, int[] valueBuffer, ForwardIndexReaderContext context) {
return getIntMV(docId, valueBuffer);
}
+ @Override
+ default int[] getIntMV(int docId, ForwardIndexReaderContext context) {
+ return getIntMV(docId);
+ }
+
/**
* Reads the LONG type multi-value at the given document id into the passed in value buffer (the buffer size must be
* enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -339,11 +369,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the LONG type multi-value at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default long[] getLongMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getLongMV(int docId, long[] valueBuffer, ForwardIndexReaderContext context) {
return getLongMV(docId, valueBuffer);
}
+ @Override
+ default long[] getLongMV(int docId, ForwardIndexReaderContext context) {
+ return getLongMV(docId);
+ }
+
/**
* Reads the FLOAT type multi-value at the given document id into the passed in value buffer (the buffer size must be
* enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -357,11 +402,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the FLOAT type multi-value at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default float[] getFloatMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getFloatMV(int docId, float[] valueBuffer, ForwardIndexReaderContext context) {
return getFloatMV(docId, valueBuffer);
}
+ @Override
+ default float[] getFloatMV(int docId, ForwardIndexReaderContext context) {
+ return getFloatMV(docId);
+ }
+
/**
* Reads the DOUBLE type multi-value at the given document id into the passed in value buffer (the buffer size must
* be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -375,11 +435,26 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the DOUBLE type multi-value at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default double[] getDoubleMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getDoubleMV(int docId, double[] valueBuffer, ForwardIndexReaderContext context) {
return getDoubleMV(docId, valueBuffer);
}
+ @Override
+ default double[] getDoubleMV(int docId, ForwardIndexReaderContext context) {
+ return getDoubleMV(docId);
+ }
+
/**
* Reads the STRING type multi-value at the given document id into the passed in value buffer (the buffer size must
* be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -393,11 +468,41 @@ public interface MutableForwardIndex extends ForwardIndexReader<ForwardIndexRead
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the STRING type multi-value at the given document id into a buffer and returns the buffer.
+ *
+ * @param docId Document id
+ * @return A buffer containing the multi-value entries
+ */
+ default String[] getStringMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
default int getStringMV(int docId, String[] valueBuffer, ForwardIndexReaderContext context) {
return getStringMV(docId, valueBuffer);
}
+ @Override
+ default String[] getStringMV(int docId, ForwardIndexReaderContext context) {
+ return getStringMV(docId);
+ }
+
+ /**
+ * Gets the number of multi-values at a given document id and returns it.
+ *
+ * @param docId Document id
+ * @return Number of values within the multi-value entry
+ */
+ default int getNumValuesMV(int docId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ default int getNumValuesMV(int docId, ForwardIndexReaderContext context) {
+ return getNumValuesMV(docId);
+ }
+
/**
* Writes the INT type multi-value into the given document id.
*
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
index 62e34dfc55..8db16ac496 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/reader/ForwardIndexReader.java
@@ -100,6 +100,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the dictionary ids for a multi-value column at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return Dictionary ids at the given document id
+ */
+ default int[] getDictIdMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* SINGLE-VALUE COLUMN RAW INDEX APIs
*/
@@ -410,9 +421,308 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
/**
* MULTI-VALUE COLUMN RAW INDEX APIs
- * TODO: Not supported yet
*/
+ /**
+ * Fills the values
+ * @param docIds Array containing the document ids to read
+ * @param length Number of values to read
+ * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+ * @param values Values to fill
+ * @param context Reader context
+ */
+ default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, int[][] values, T context) {
+ switch (getValueType()) {
+ case INT:
+ for (int i = 0; i < length; i++) {
+ values[i] = getIntMV(docIds[i], context);
+ }
+ break;
+ case LONG:
+ long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getLongMV(docIds[i], longValueBuffer, context);
+ values[i] = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (int) longValueBuffer[j];
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+ values[i] = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (int) floatValueBuffer[j];
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+ values[i] = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (int) doubleValueBuffer[j];
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+ values[i] = new int[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = Integer.parseInt(stringValueBuffer[j]);
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+ }
+ }
+
+ /**
+ * Fills the values
+ * @param docIds Array containing the document ids to read
+ * @param length Number of values to read
+ * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+ * @param values Values to fill
+ * @param context Reader context
+ */
+ default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, long[][] values, T context) {
+ switch (getValueType()) {
+ case INT:
+ int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getIntMV(docIds[i], intValueBuffer, context);
+ values[i] = new long[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = intValueBuffer[j];
+ }
+ }
+ break;
+ case LONG:
+ for (int i = 0; i < length; i++) {
+ values[i] = getLongMV(docIds[i], context);
+ }
+ break;
+ case FLOAT:
+ float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+ values[i] = new long[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (long) floatValueBuffer[j];
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+ values[i] = new long[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (long) doubleValueBuffer[j];
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+ values[i] = new long[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = Long.parseLong(stringValueBuffer[j]);
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+ }
+ }
+
+ /**
+ * Fills the values
+ * @param docIds Array containing the document ids to read
+ * @param length Number of values to read
+ * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+ * @param values Values to fill
+ * @param context Reader context
+ */
+ default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, float[][] values, T context) {
+ switch (getValueType()) {
+ case INT:
+ int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getIntMV(docIds[i], intValueBuffer, context);
+ values[i] = new float[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = intValueBuffer[j];
+ }
+ }
+ break;
+ case LONG:
+ long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getLongMV(docIds[i], longValueBuffer, context);
+ values[i] = new float[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = longValueBuffer[j];
+ }
+ }
+ break;
+ case FLOAT:
+ for (int i = 0; i < length; i++) {
+ values[i] = getFloatMV(docIds[i], context);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+ values[i] = new float[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = (float) doubleValueBuffer[j];
+ }
+ }
+ break;
+ case STRING:
+ String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+ values[i] = new float[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = Float.parseFloat(stringValueBuffer[j]);
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+ }
+ }
+
+ /**
+ * Fills the values
+ * @param docIds Array containing the document ids to read
+ * @param length Number of values to read
+ * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+ * @param values Values to fill
+ * @param context Reader context
+ */
+ default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, double[][] values, T context) {
+ switch (getValueType()) {
+ case INT:
+ int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getIntMV(docIds[i], intValueBuffer, context);
+ values[i] = new double[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = intValueBuffer[j];
+ }
+ }
+ break;
+ case LONG:
+ long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getLongMV(docIds[i], longValueBuffer, context);
+ values[i] = new double[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = longValueBuffer[j];
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+ values[i] = new double[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = floatValueBuffer[j];
+ }
+ }
+ break;
+ case DOUBLE:
+ for (int i = 0; i < length; i++) {
+ values[i] = getDoubleMV(docIds[i], context);
+ }
+ break;
+ case STRING:
+ String[] stringValueBuffer = new String[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getStringMV(docIds[i], stringValueBuffer, context);
+ values[i] = new double[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = Double.parseDouble(stringValueBuffer[j]);
+ }
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+ }
+ }
+
+ /**
+ * Fills the values
+ * @param docIds Array containing the document ids to read
+ * @param length Number of values to read
+ * @param maxNumValuesPerMVEntry Maximum number of values per MV entry
+ * @param values Values to fill
+ * @param context Reader context
+ */
+ default void readValuesMV(int[] docIds, int length, int maxNumValuesPerMVEntry, String[][] values, T context) {
+ switch (getValueType()) {
+ case INT:
+ int[] intValueBuffer = new int[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getIntMV(docIds[i], intValueBuffer, context);
+ values[i] = new String[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = String.valueOf(intValueBuffer[j]);
+ }
+ }
+ break;
+ case LONG:
+ long[] longValueBuffer = new long[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getLongMV(docIds[i], longValueBuffer, context);
+ values[i] = new String[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = String.valueOf(longValueBuffer[j]);
+ }
+ }
+ break;
+ case FLOAT:
+ float[] floatValueBuffer = new float[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getFloatMV(docIds[i], floatValueBuffer, context);
+ values[i] = new String[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = String.valueOf(floatValueBuffer[j]);
+ }
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValueBuffer = new double[maxNumValuesPerMVEntry];
+ for (int i = 0; i < length; i++) {
+ int numValues = getDoubleMV(docIds[i], doubleValueBuffer, context);
+ values[i] = new String[numValues];
+ for (int j = 0; j < numValues; j++) {
+ values[i][j] = String.valueOf(doubleValueBuffer[j]);
+ }
+ }
+ break;
+ case STRING:
+ for (int i = 0; i < length; i++) {
+ values[i] = getStringMV(docIds[i], context);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("readValuesMV not supported for type " + getValueType());
+ }
+ }
+
/**
* Reads the INT type multi-value at the given document id into the passed in value buffer (the buffer size must be
* enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -427,6 +737,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the INT type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return INT values at the given document id
+ */
+ default int[] getIntMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Reads the LONG type multi-value at the given document id into the passed in value buffer (the buffer size must be
* enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -441,6 +762,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the LONG type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return LONG values at the given document id
+ */
+ default long[] getLongMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Reads the FLOAT type multi-value at the given document id into the passed in value buffer (the buffer size must be
* enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -455,6 +787,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the FLOAT type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return FLOAT values at the given document id
+ */
+ default float[] getFloatMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Reads the DOUBLE type multi-value at the given document id into the passed in value buffer (the buffer size must
* be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -469,6 +812,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the DOUBLE type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return DOUBLE values at the given document id
+ */
+ default double[] getDoubleMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Reads the STRING type multi-value at the given document id into the passed in value buffer (the buffer size must
* be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -483,6 +837,17 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
throw new UnsupportedOperationException();
}
+ /**
+ * Reads the STRING type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return STRING values at the given document id
+ */
+ default String[] getStringMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Reads the bytes type multi-value at the given document id into the passed in value buffer (the buffer size must
* be enough to hold all the values for the multi-value entry) and returns the number of values within the multi-value
@@ -496,4 +861,26 @@ public interface ForwardIndexReader<T extends ForwardIndexReaderContext> extends
default int getBytesMV(int docId, byte[][] valueBuffer, T context) {
throw new UnsupportedOperationException();
}
+
+ /**
+ * Reads the bytes type multi-value at the given document id.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return BYTE values at the given document id
+ */
+ default byte[][] getBytesMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Gets the number of multi-values at a given document id and returns it.
+ *
+ * @param docId Document id
+ * @param context Reader context
+ * @return Number of values within the multi-value entry
+ */
+ default int getNumValuesMV(int docId, T context) {
+ throw new UnsupportedOperationException();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org