You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/21 22:40:44 UTC
[incubator-pinot] branch master updated: Support BYTES in group-by
(#5708)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0927e15 Support BYTES in group-by (#5708)
0927e15 is described below
commit 0927e150efe5aa84cfc682be1d142d24ef0ea95f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 21 15:40:30 2020 -0700
Support BYTES in group-by (#5708)
- Add ValueToIdMap (on-the-fly dictionary) for BYTES type
- Re-order the operation to save the per-value switch case in `NoDictionaryMultiColumnGroupKeyGenerator. generateKeysForBlock()`
- Add type specific group key iterator in `NoDictionaryMultiColumnGroupKeyGenerator` for performance improvement
- Enhance `NoDictionaryGroupKeyGeneratorTest` to test BYTES type
---
.../groupby/DictionaryBasedGroupKeyGenerator.java | 39 ++-
.../NoDictionaryMultiColumnGroupKeyGenerator.java | 166 +++++--------
.../NoDictionarySingleColumnGroupKeyGenerator.java | 244 ++++++++++++++-----
.../groupby/utils/BaseValueToIdMap.java | 13 +
.../{DoubleToIdMap.java => BytesToIdMap.java} | 36 +--
.../aggregation/groupby/utils/DoubleToIdMap.java | 3 +-
.../aggregation/groupby/utils/FloatToIdMap.java | 3 +-
.../aggregation/groupby/utils/IntToIdMap.java | 3 +-
.../aggregation/groupby/utils/LongToIdMap.java | 3 +-
.../aggregation/groupby/utils/ValueToIdMap.java | 7 +
.../groupby/utils/ValueToIdMapFactory.java | 7 +-
.../pinot/queries/DistinctCountQueriesTest.java | 2 +-
.../groupby/NoDictionaryGroupKeyGeneratorTest.java | 270 ++++++++++-----------
13 files changed, 433 insertions(+), 363 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index b598a07..5da07d1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -28,7 +28,6 @@ import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
-import javax.annotation.Nonnull;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
@@ -151,7 +150,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void generateKeysForBlock(@Nonnull TransformBlock transformBlock, @Nonnull int[] groupKeys) {
+ public void generateKeysForBlock(TransformBlock transformBlock, int[] groupKeys) {
// Fetch dictionary ids in the given block for all group-by columns
for (int i = 0; i < _numGroupByExpressions; i++) {
BlockValSet blockValueSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
@@ -162,7 +161,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void generateKeysForBlock(@Nonnull TransformBlock transformBlock, @Nonnull int[][] groupKeys) {
+ public void generateKeysForBlock(TransformBlock transformBlock, int[][] groupKeys) {
// Fetch dictionary ids in the given block for all group-by columns
for (int i = 0; i < _numGroupByExpressions; i++) {
BlockValSet blockValueSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
@@ -194,7 +193,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
* @param numDocs Number of documents inside the block
* @param outGroupIds Buffer for group id results
*/
- void processSingleValue(int numDocs, @Nonnull int[] outGroupIds);
+ void processSingleValue(int numDocs, int[] outGroupIds);
/**
* Process a block of documents for case with multi-valued group-by columns.
@@ -202,7 +201,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
* @param numDocs Number of documents inside the block
* @param outGroupIds Buffer for group id results
*/
- void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds);
+ void processMultiValue(int numDocs, int[][] outGroupIds);
/**
* Get the upper bound of group id (exclusive) inside the holder.
@@ -219,7 +218,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
@Override
- public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+ public void processSingleValue(int numDocs, int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
int groupId = 0;
for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -231,7 +230,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+ public void processMultiValue(int numDocs, int[][] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
int[] groupIds = getIntRawKeys(i);
for (int groupId : groupIds) {
@@ -251,7 +250,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
return _flags;
}
- @Nonnull
@Override
public Iterator<GroupKey> iterator() {
return new Iterator<GroupKey>() {
@@ -301,7 +299,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+ public void processSingleValue(int numDocs, int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
int rawKey = 0;
for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -312,7 +310,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+ public void processMultiValue(int numDocs, int[][] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
int[] groupIds = getIntRawKeys(i);
int length = groupIds.length;
@@ -344,7 +342,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
return _rawKeyToGroupIdMap;
}
- @Nonnull
@Override
public Iterator<GroupKey> iterator() {
return new Iterator<GroupKey>() {
@@ -490,7 +487,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+ public void processSingleValue(int numDocs, int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
long rawKey = 0L;
for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -501,7 +498,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+ public void processMultiValue(int numDocs, int[][] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
long[] rawKeys = getLongRawKeys(i);
int length = rawKeys.length;
@@ -534,7 +531,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
return _rawKeyToGroupIdMap;
}
- @Nonnull
@Override
public Iterator<GroupKey> iterator() {
return new Iterator<GroupKey>() {
@@ -644,12 +640,12 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
*/
private String getGroupKey(long rawKey) {
int cardinality = _cardinalities[0];
- StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get((int) (rawKey % cardinality)).toString());
+ StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue((int) (rawKey % cardinality)));
rawKey /= cardinality;
for (int i = 1; i < _numGroupByExpressions; i++) {
groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
cardinality = _cardinalities[i];
- groupKeyBuilder.append(_dictionaries[i].get((int) (rawKey % cardinality)));
+ groupKeyBuilder.append(_dictionaries[i].getStringValue((int) (rawKey % cardinality)));
rawKey /= cardinality;
}
return groupKeyBuilder.toString();
@@ -671,7 +667,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+ public void processSingleValue(int numDocs, int[] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
int[] dictIds = new int[_numGroupByExpressions];
for (int j = 0; j < _numGroupByExpressions; j++) {
@@ -682,7 +678,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
}
@Override
- public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+ public void processMultiValue(int numDocs, int[][] outGroupIds) {
for (int i = 0; i < numDocs; i++) {
IntArray[] rawKeys = getIntArrayRawKeys(i);
int length = rawKeys.length;
@@ -715,7 +711,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
return _rawKeyToGroupIdMap;
}
- @Nonnull
@Override
public Iterator<GroupKey> iterator() {
return new Iterator<GroupKey>() {
@@ -828,10 +823,10 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
* @return String group key
*/
private String getGroupKey(IntArray rawKey) {
- StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get(rawKey._elements[0]).toString());
+ StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue(rawKey._elements[0]));
for (int i = 1; i < _numGroupByExpressions; i++) {
groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
- groupKeyBuilder.append(_dictionaries[i].get(rawKey._elements[i]));
+ groupKeyBuilder.append(_dictionaries[i].getStringValue(rawKey._elements[i]));
}
return groupKeyBuilder.toString();
}
@@ -839,6 +834,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
/**
* Drop un-necessary checks for highest performance.
*/
+ @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
private static class IntArray {
public int[] _elements;
@@ -855,7 +851,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
return result;
}
- @SuppressWarnings({"Contract", "EqualsWhichDoesntCheckParameterClass"})
@Override
public boolean equals(Object obj) {
int[] that = ((IntArray) obj)._elements;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index a3e2921..1d292c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -18,9 +18,10 @@
*/
package org.apache.pinot.core.query.aggregation.groupby;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Iterator;
-import java.util.Map;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
@@ -30,7 +31,8 @@ import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMapFactory
import org.apache.pinot.core.query.request.context.ExpressionContext;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.util.FixedIntArray;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -45,7 +47,7 @@ import org.apache.pinot.spi.data.FieldSpec;
public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerator {
private final ExpressionContext[] _groupByExpressions;
private final int _numGroupByExpressions;
- private final FieldSpec.DataType[] _dataTypes;
+ private final DataType[] _dataTypes;
private final Dictionary[] _dictionaries;
private final ValueToIdMap[] _onTheFlyDictionaries;
private final Object2IntOpenHashMap<FixedIntArray> _groupKeyMap;
@@ -57,7 +59,7 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
ExpressionContext[] groupByExpressions, int numGroupsLimit) {
_groupByExpressions = groupByExpressions;
_numGroupByExpressions = groupByExpressions.length;
- _dataTypes = new FieldSpec.DataType[_numGroupByExpressions];
+ _dataTypes = new DataType[_numGroupByExpressions];
_dictionaries = new Dictionary[_numGroupByExpressions];
_onTheFlyDictionaries = new ValueToIdMap[_numGroupByExpressions];
@@ -82,55 +84,63 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
return _globalGroupIdUpperBound;
}
- @SuppressWarnings("ConstantConditions")
@Override
public void generateKeysForBlock(TransformBlock transformBlock, int[] groupKeys) {
int numDocs = transformBlock.getNumDocs();
- Object[] values = new Object[_numGroupByExpressions];
+ int[][] keys = new int[numDocs][_numGroupByExpressions];
for (int i = 0; i < _numGroupByExpressions; i++) {
BlockValSet blockValSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
if (_dictionaries[i] != null) {
- values[i] = blockValSet.getDictionaryIdsSV();
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = dictIds[j];
+ }
} else {
- values[i] = getValuesFromBlockValSet(blockValSet, _dataTypes[i]);
+ ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[i];
+ switch (_dataTypes[i]) {
+ case INT:
+ int[] intValues = blockValSet.getIntValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(intValues[j]);
+ }
+ break;
+ case LONG:
+ long[] longValues = blockValSet.getLongValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(longValues[j]);
+ }
+ break;
+ case FLOAT:
+ float[] floatValues = blockValSet.getFloatValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(floatValues[j]);
+ }
+ break;
+ case DOUBLE:
+ double[] doubleValues = blockValSet.getDoubleValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(doubleValues[j]);
+ }
+ break;
+ case STRING:
+ String[] stringValues = blockValSet.getStringValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(stringValues[j]);
+ }
+ break;
+ case BYTES:
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int j = 0; j < numDocs; j++) {
+ keys[j][i] = onTheFlyDictionary.put(new ByteArray(bytesValues[j]));
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + _dataTypes[i]);
+ }
}
}
-
for (int i = 0; i < numDocs; i++) {
- int[] keys = new int[_numGroupByExpressions];
- for (int j = 0; j < _numGroupByExpressions; j++) {
- if (_dictionaries[j] != null) {
- int[] dictIds = (int[]) values[j];
- keys[j] = dictIds[i];
- } else {
- FieldSpec.DataType dataType = _dataTypes[j];
- switch (dataType) {
- case INT:
- int[] intValues = (int[]) values[j];
- keys[j] = _onTheFlyDictionaries[j].put(intValues[i]);
- break;
- case LONG:
- long[] longValues = (long[]) values[j];
- keys[j] = _onTheFlyDictionaries[j].put(longValues[i]);
- break;
- case FLOAT:
- float[] floatValues = (float[]) values[j];
- keys[j] = _onTheFlyDictionaries[j].put(floatValues[i]);
- break;
- case DOUBLE:
- double[] doubleValues = (double[]) values[j];
- keys[j] = _onTheFlyDictionaries[j].put(doubleValues[i]);
- break;
- case STRING:
- String[] stringValues = (String[]) values[j];
- keys[j] = _onTheFlyDictionaries[j].put(stringValues[i]);
- break;
- default:
- throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + dataType);
- }
- }
- }
- groupKeys[i] = getGroupIdForKey(new FixedIntArray(keys));
+ groupKeys[i] = getGroupIdForKey(new FixedIntArray(keys[i]));
}
}
@@ -147,7 +157,7 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
@Override
public Iterator<GroupKey> getUniqueGroupKeys() {
- return new GroupKeyIterator(_groupKeyMap);
+ return new GroupKeyIterator();
}
/**
@@ -170,12 +180,12 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
/**
* Iterator for {Group-Key, Group-id) pair.
*/
- class GroupKeyIterator implements Iterator<GroupKey> {
- Iterator<Map.Entry<FixedIntArray, Integer>> _iterator;
- GroupKey _groupKey;
+ private class GroupKeyIterator implements Iterator<GroupKey> {
+ final ObjectIterator<Object2IntMap.Entry<FixedIntArray>> _iterator;
+ final GroupKey _groupKey;
- public GroupKeyIterator(Map<FixedIntArray, Integer> map) {
- _iterator = map.entrySet().iterator();
+ public GroupKeyIterator() {
+ _iterator = _groupKeyMap.object2IntEntrySet().fastIterator();
_groupKey = new GroupKey();
}
@@ -186,8 +196,8 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
@Override
public GroupKey next() {
- Map.Entry<FixedIntArray, Integer> entry = _iterator.next();
- _groupKey._groupId = entry.getValue();
+ Object2IntMap.Entry<FixedIntArray> entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
_groupKey._stringKey = buildStringKeyFromIds(entry.getKey());
return _groupKey;
}
@@ -201,58 +211,16 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
private String buildStringKeyFromIds(FixedIntArray keyList) {
StringBuilder builder = new StringBuilder();
int[] keys = keyList.elements();
- for (int i = 0; i < keyList.size(); i++) {
- String key;
- int dictId = keys[i];
-
- if (_dictionaries[i] != null) {
- key = _dictionaries[i].get(dictId).toString();
- } else {
- key = _onTheFlyDictionaries[i].getString(dictId);
- }
-
+ for (int i = 0; i < _numGroupByExpressions; i++) {
if (i > 0) {
builder.append(GroupKeyGenerator.DELIMITER);
}
- builder.append(key);
+ if (_dictionaries[i] != null) {
+ builder.append(_dictionaries[i].getStringValue(keys[i]));
+ } else {
+ builder.append(_onTheFlyDictionaries[i].getString(keys[i]));
+ }
}
-
return builder.toString();
}
-
- /**
- * Helper method to fetch values from BlockValSet
- * @param dataType Data type
- * @param blockValSet Block val set
- * @return Values from block val set
- */
- private Object getValuesFromBlockValSet(BlockValSet blockValSet, FieldSpec.DataType dataType) {
- Object values;
-
- switch (dataType) {
- case INT:
- values = blockValSet.getIntValuesSV();
- break;
-
- case LONG:
- values = blockValSet.getLongValuesSV();
- break;
-
- case FLOAT:
- values = blockValSet.getFloatValuesSV();
- break;
-
- case DOUBLE:
- values = blockValSet.getDoubleValuesSV();
- break;
-
- case STRING:
- values = blockValSet.getStringValuesSV();
- break;
-
- default:
- throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + dataType);
- }
- return values;
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
index 49a1930..1d9df51 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
@@ -28,13 +28,15 @@ import it.unimi.dsi.fastutil.longs.Long2IntMap;
import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.util.Iterator;
import java.util.Map;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.TransformOperator;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -42,9 +44,10 @@ import org.apache.pinot.spi.data.FieldSpec;
* in absence of dictionary for the group by column.
*
*/
+@SuppressWarnings({"rawtypes", "unchecked"})
public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenerator {
private final ExpressionContext _groupByExpression;
- private final FieldSpec.DataType _dataType;
+ private final DataType _dataType;
private final Map _groupKeyMap;
private final int _globalGroupIdUpperBound;
@@ -99,6 +102,12 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
groupKeys[i] = getKeyForValue(stringValues[i]);
}
break;
+ case BYTES:
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < numDocs; i++) {
+ groupKeys[i] = getKeyForValue(new ByteArray(bytesValues[i]));
+ }
+ break;
default:
throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + _dataType);
}
@@ -111,43 +120,35 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
* @param keyType DataType for the key
* @return Map
*/
- private Map createGroupKeyMap(FieldSpec.DataType keyType) {
- Map map;
+ private Map createGroupKeyMap(DataType keyType) {
switch (keyType) {
case INT:
Int2IntMap intMap = new Int2IntOpenHashMap();
intMap.defaultReturnValue(INVALID_ID);
- map = intMap;
- break;
-
+ return intMap;
case LONG:
Long2IntOpenHashMap longMap = new Long2IntOpenHashMap();
longMap.defaultReturnValue(INVALID_ID);
- map = longMap;
- break;
-
+ return longMap;
case FLOAT:
Float2IntOpenHashMap floatMap = new Float2IntOpenHashMap();
floatMap.defaultReturnValue(INVALID_ID);
- map = floatMap;
- break;
-
+ return floatMap;
case DOUBLE:
Double2IntOpenHashMap doubleMap = new Double2IntOpenHashMap();
doubleMap.defaultReturnValue(INVALID_ID);
- map = doubleMap;
- break;
-
+ return doubleMap;
case STRING:
Object2IntOpenHashMap<String> stringMap = new Object2IntOpenHashMap<>();
stringMap.defaultReturnValue(INVALID_ID);
- map = stringMap;
- break;
-
+ return stringMap;
+ case BYTES:
+ Object2IntOpenHashMap<ByteArray> bytesMap = new Object2IntOpenHashMap<>();
+ bytesMap.defaultReturnValue(INVALID_ID);
+ return bytesMap;
default:
- throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + keyType);
+ throw new IllegalStateException("Illegal data type for no-dictionary key generator: " + keyType);
}
- return map;
}
@Override
@@ -163,84 +164,201 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
@Override
public Iterator<GroupKey> getUniqueGroupKeys() {
- return new GroupKeyIterator(_groupKeyMap);
+ switch (_dataType) {
+ case INT:
+ return new IntGroupKeyIterator((Int2IntOpenHashMap) _groupKeyMap);
+ case LONG:
+ return new LongGroupKeyIterator((Long2IntOpenHashMap) _groupKeyMap);
+ case FLOAT:
+ return new FloatGroupKeyIterator((Float2IntOpenHashMap) _groupKeyMap);
+ case DOUBLE:
+ return new DoubleGroupKeyIterator((Double2IntOpenHashMap) _groupKeyMap);
+ case STRING:
+ case BYTES:
+ return new ObjectGroupKeyIterator((Object2IntOpenHashMap) _groupKeyMap);
+ default:
+ throw new IllegalStateException();
+ }
}
- @SuppressWarnings("unchecked")
private int getKeyForValue(int value) {
Int2IntMap map = (Int2IntMap) _groupKeyMap;
int groupId = map.get(value);
- if (groupId == INVALID_ID) {
- if (_numGroups < _globalGroupIdUpperBound) {
- groupId = _numGroups;
- map.put(value, _numGroups++);
- }
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
}
return groupId;
}
- @SuppressWarnings("unchecked")
private int getKeyForValue(long value) {
Long2IntMap map = (Long2IntMap) _groupKeyMap;
int groupId = map.get(value);
- if (groupId == INVALID_ID) {
- if (_numGroups < _globalGroupIdUpperBound) {
- groupId = _numGroups;
- map.put(value, _numGroups++);
- }
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
}
return groupId;
}
- @SuppressWarnings("unchecked")
private int getKeyForValue(float value) {
Float2IntMap map = (Float2IntMap) _groupKeyMap;
int groupId = map.get(value);
- if (groupId == INVALID_ID) {
- if (_numGroups < _globalGroupIdUpperBound) {
- groupId = _numGroups;
- map.put(value, _numGroups++);
- }
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
}
return groupId;
}
- @SuppressWarnings("unchecked")
private int getKeyForValue(double value) {
Double2IntMap map = (Double2IntMap) _groupKeyMap;
int groupId = map.get(value);
- if (groupId == INVALID_ID) {
- if (_numGroups < _globalGroupIdUpperBound) {
- groupId = _numGroups;
- map.put(value, _numGroups++);
- }
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
}
return groupId;
}
- @SuppressWarnings("unchecked")
private int getKeyForValue(String value) {
Object2IntMap<String> map = (Object2IntMap<String>) _groupKeyMap;
int groupId = map.getInt(value);
- if (groupId == INVALID_ID) {
- if (_numGroups < _globalGroupIdUpperBound) {
- groupId = _numGroups;
- map.put(value, _numGroups++);
- }
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
}
return groupId;
}
- /**
- * Iterator for {Group-Key, Group-id) pair.
- */
- class GroupKeyIterator implements Iterator<GroupKey> {
- Iterator<Map.Entry<Object, Integer>> _iterator;
- GroupKey _groupKey;
+ private int getKeyForValue(ByteArray value) {
+ Object2IntMap<ByteArray> map = (Object2IntMap<ByteArray>) _groupKeyMap;
+ int groupId = map.getInt(value);
+ if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+ groupId = _numGroups++;
+ map.put(value, groupId);
+ }
+ return groupId;
+ }
+
+ private static class IntGroupKeyIterator implements Iterator<GroupKey> {
+ final Iterator<Int2IntMap.Entry> _iterator;
+ final GroupKey _groupKey;
+
+ IntGroupKeyIterator(Int2IntOpenHashMap intMap) {
+ _iterator = intMap.int2IntEntrySet().fastIterator();
+ _groupKey = new GroupKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _iterator.hasNext();
+ }
+
+ @Override
+ public GroupKey next() {
+ Int2IntMap.Entry entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
+ _groupKey._stringKey = Integer.toString(entry.getIntKey());
+ return _groupKey;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class LongGroupKeyIterator implements Iterator<GroupKey> {
+ final Iterator<Long2IntMap.Entry> _iterator;
+ final GroupKey _groupKey;
+
+ LongGroupKeyIterator(Long2IntOpenHashMap longMap) {
+ _iterator = longMap.long2IntEntrySet().fastIterator();
+ _groupKey = new GroupKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _iterator.hasNext();
+ }
+
+ @Override
+ public GroupKey next() {
+ Long2IntMap.Entry entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
+ _groupKey._stringKey = Long.toString(entry.getLongKey());
+ return _groupKey;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class FloatGroupKeyIterator implements Iterator<GroupKey> {
+ final Iterator<Float2IntMap.Entry> _iterator;
+ final GroupKey _groupKey;
+
+ FloatGroupKeyIterator(Float2IntOpenHashMap floatMap) {
+ _iterator = floatMap.float2IntEntrySet().fastIterator();
+ _groupKey = new GroupKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _iterator.hasNext();
+ }
+
+ @Override
+ public GroupKey next() {
+ Float2IntMap.Entry entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
+ _groupKey._stringKey = Float.toString(entry.getFloatKey());
+ return _groupKey;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class DoubleGroupKeyIterator implements Iterator<GroupKey> {
+ final Iterator<Double2IntMap.Entry> _iterator;
+ final GroupKey _groupKey;
+
+ DoubleGroupKeyIterator(Double2IntOpenHashMap doubleMap) {
+ _iterator = doubleMap.double2IntEntrySet().fastIterator();
+ _groupKey = new GroupKey();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return _iterator.hasNext();
+ }
+
+ @Override
+ public GroupKey next() {
+ Double2IntMap.Entry entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
+ _groupKey._stringKey = Double.toString(entry.getDoubleKey());
+ return _groupKey;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class ObjectGroupKeyIterator implements Iterator<GroupKey> {
+ final ObjectIterator<Object2IntMap.Entry> _iterator;
+ final GroupKey _groupKey;
- @SuppressWarnings("unchecked")
- public GroupKeyIterator(Map map) {
- _iterator = (Iterator<Map.Entry<Object, Integer>>) map.entrySet().iterator();
+ ObjectGroupKeyIterator(Object2IntOpenHashMap objectMap) {
+ _iterator = objectMap.object2IntEntrySet().fastIterator();
_groupKey = new GroupKey();
}
@@ -251,8 +369,8 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
@Override
public GroupKey next() {
- Map.Entry<Object, Integer> entry = _iterator.next();
- _groupKey._groupId = entry.getValue();
+ Object2IntMap.Entry entry = _iterator.next();
+ _groupKey._groupId = entry.getIntValue();
_groupKey._stringKey = entry.getKey().toString();
return _groupKey;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
index adfe70e..6d889d3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.query.aggregation.groupby.utils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
/**
* Abstract base class for {@link ValueToIdMap} interface.
*/
@@ -48,6 +51,11 @@ public abstract class BaseValueToIdMap implements ValueToIdMap {
}
@Override
+ public int put(ByteArray value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public int getInt(int id) {
throw new UnsupportedOperationException();
}
@@ -71,4 +79,9 @@ public abstract class BaseValueToIdMap implements ValueToIdMap {
public String getString(int id) {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ByteArray getBytes(int id) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
similarity index 61%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
index c882c3d..a503d79 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
@@ -18,28 +18,29 @@
*/
package org.apache.pinot.core.query.aggregation.groupby.utils;
-import it.unimi.dsi.fastutil.doubles.Double2IntMap;
-import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectList;
+import org.apache.pinot.spi.utils.ByteArray;
/**
- * Implementation of {@link ValueToIdMap} for double.
+ * Implementation of {@link ValueToIdMap} for ByteArray.
*/
-public class DoubleToIdMap extends BaseValueToIdMap {
- Double2IntMap _valueToIdMap;
- DoubleList _idToValueMap;
+public class BytesToIdMap extends BaseValueToIdMap {
+ Object2IntMap<ByteArray> _valueToIdMap;
+ ObjectList<ByteArray> _idToValueMap;
- public DoubleToIdMap() {
- _valueToIdMap = new Double2IntOpenHashMap();
+ public BytesToIdMap() {
+ _valueToIdMap = new Object2IntOpenHashMap<>();
_valueToIdMap.defaultReturnValue(INVALID_KEY);
- _idToValueMap = new DoubleArrayList();
+ _idToValueMap = new ObjectArrayList<>();
}
@Override
- public int put(double value) {
- int id = _valueToIdMap.get(value);
+ public int put(ByteArray value) {
+ int id = _valueToIdMap.getInt(value);
if (id == INVALID_KEY) {
id = _idToValueMap.size();
_valueToIdMap.put(value, id);
@@ -49,14 +50,13 @@ public class DoubleToIdMap extends BaseValueToIdMap {
}
@Override
- public double getDouble(int id) {
- assert id < _idToValueMap.size();
- return _idToValueMap.getDouble(id);
+ public String getString(int id) {
+ return getBytes(id).toHexString();
}
@Override
- public String getString(int id) {
+ public ByteArray getBytes(int id) {
assert id < _idToValueMap.size();
- return (Double.valueOf(_idToValueMap.getDouble(id)).toString());
+ return _idToValueMap.get(id);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
index c882c3d..396a136 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
@@ -56,7 +56,6 @@ public class DoubleToIdMap extends BaseValueToIdMap {
@Override
public String getString(int id) {
- assert id < _idToValueMap.size();
- return (Double.valueOf(_idToValueMap.getDouble(id)).toString());
+ return Double.toString(getDouble(id));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
index 0f21c2c..390f38d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
@@ -56,7 +56,6 @@ public class FloatToIdMap extends BaseValueToIdMap {
@Override
public String getString(int id) {
- assert id < _idToValueMap.size();
- return (Float.valueOf(_idToValueMap.getFloat(id)).toString());
+ return Float.toString(getFloat(id));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
index 71b4025..0afc61d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
@@ -56,7 +56,6 @@ public class IntToIdMap extends BaseValueToIdMap {
@Override
public String getString(int id) {
- assert id < _idToValueMap.size();
- return (Integer.valueOf(_idToValueMap.getInt(id)).toString());
+ return Integer.toString(getInt(id));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
index a498662..6a68792 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
@@ -56,7 +56,6 @@ public class LongToIdMap extends BaseValueToIdMap {
@Override
public String getString(int id) {
- assert id < _idToValueMap.size();
- return (Long.valueOf(_idToValueMap.getLong(id)).toString());
+ return Long.toString(getLong(id));
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
index 79873ae..96d1be0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.core.query.aggregation.groupby.utils;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
/**
* Interface for mapping primitive values to contiguous id's.
*/
@@ -34,6 +37,8 @@ public interface ValueToIdMap {
int put(String value);
+ int put(ByteArray value);
+
int getInt(int id);
long getLong(int id);
@@ -43,4 +48,6 @@ public interface ValueToIdMap {
double getDouble(int id);
String getString(int id);
+
+ ByteArray getBytes(int id);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
index 803d68a..444899e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
@@ -33,19 +33,16 @@ public class ValueToIdMapFactory {
switch (dataType) {
case INT:
return new IntToIdMap();
-
case LONG:
return new LongToIdMap();
-
case FLOAT:
return new FloatToIdMap();
-
case DOUBLE:
return new DoubleToIdMap();
-
case STRING:
return new StringToIdMap();
-
+ case BYTES:
+ return new BytesToIdMap();
default:
throw new IllegalArgumentException("Illegal data type for ValueToIdMapFactory: " + dataType);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index 9ef2c07..0a9c105 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -257,7 +257,7 @@ public class DistinctCountQueriesTest extends BaseQueriesTest {
@AfterClass
public void tearDown()
throws IOException {
- FileUtils.deleteDirectory(INDEX_DIR);
_indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
index 16b7046..7b9df60 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
@@ -19,16 +19,16 @@
package org.apache.pinot.query.aggregation.groupby;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -49,231 +49,207 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
/**
* Unit test for {@link NoDictionaryMultiColumnGroupKeyGenerator}
*/
public class NoDictionaryGroupKeyGeneratorTest {
- private static final String SEGMENT_NAME = "testSegment";
- private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
+ private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "NoDictionaryGroupKeyGeneratorTest");
+ private static final Random RANDOM = new Random();
- private static final String STRING_DICT_COLUMN = "string_dict_column";
- private static final String[] COLUMN_NAMES =
- {"int_column", "long_column", "float_column", "double_column", "string_column", STRING_DICT_COLUMN};
- private static final String[] NO_DICT_COLUMN_NAMES =
- {"int_column", "long_column", "float_column", "double_column", "string_column"};
- private static final FieldSpec.DataType[] DATA_TYPES =
- {FieldSpec.DataType.INT, FieldSpec.DataType.LONG, FieldSpec.DataType.FLOAT, FieldSpec.DataType.DOUBLE, FieldSpec.DataType.STRING, FieldSpec.DataType.STRING};
- private static final int NUM_COLUMNS = COLUMN_NAMES.length;
- private static final int NUM_ROWS = 1000;
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
- private RecordReader _recordReader;
+ private static final String INT_COLUMN = "intColumn";
+ private static final String LONG_COLUMN = "longColumn";
+ private static final String FLOAT_COLUMN = "floatColumn";
+ private static final String DOUBLE_COLUMN = "doubleColumn";
+ private static final String STRING_COLUMN = "stringColumn";
+ private static final String BYTES_COLUMN = "bytesColumn";
+ private static final String BYTES_DICT_COLUMN = "bytesDictColumn";
+ private static final List<String> COLUMNS = Arrays
+ .asList(INT_COLUMN, LONG_COLUMN, FLOAT_COLUMN, DOUBLE_COLUMN, STRING_COLUMN, BYTES_COLUMN, BYTES_DICT_COLUMN);
+ private static final int NUM_COLUMNS = COLUMNS.size();
+ private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .setNoDictionaryColumns(COLUMNS.subList(0, NUM_COLUMNS - 1)).build();
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+ .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+ .addSingleValueDimension(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE)
+ .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(BYTES_COLUMN, FieldSpec.DataType.BYTES)
+ .addSingleValueDimension(BYTES_DICT_COLUMN, FieldSpec.DataType.BYTES).build();
+
+ private static final int NUM_RECORDS = 1000;
+ private static final int NUM_UNIQUE_RECORDS = 100;
+
+ private final String[][] _stringValues = new String[NUM_UNIQUE_RECORDS][NUM_COLUMNS];
+ private IndexSegment _indexSegment;
private TransformOperator _transformOperator;
private TransformBlock _transformBlock;
@BeforeClass
- public void setup()
+ public void setUp()
throws Exception {
- FileUtils.deleteQuietly(new File(INDEX_DIR_PATH));
+ FileUtils.deleteDirectory(TEMP_DIR);
+
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ for (int i = 0; i < NUM_UNIQUE_RECORDS; i++) {
+ GenericRow record = new GenericRow();
+ String[] values = _stringValues[i];
+ int intValue = RANDOM.nextInt();
+ record.putValue(INT_COLUMN, intValue);
+ values[0] = Integer.toString(intValue);
+ long longValue = RANDOM.nextLong();
+ record.putValue(LONG_COLUMN, longValue);
+ values[1] = Long.toString(longValue);
+ float floatValue = RANDOM.nextFloat();
+ record.putValue(FLOAT_COLUMN, floatValue);
+ values[2] = Float.toString(floatValue);
+ double doubleValue = RANDOM.nextDouble();
+ record.putValue(DOUBLE_COLUMN, doubleValue);
+ values[3] = Double.toString(doubleValue);
+ String stringValue = RandomStringUtils.randomAlphabetic(10);
+ record.putValue(STRING_COLUMN, stringValue);
+ values[4] = stringValue;
+ // NOTE: Create fixed-length bytes so that dictionary can be generated.
+ byte[] bytesValue = new byte[10];
+ RANDOM.nextBytes(bytesValue);
+ record.putValue(BYTES_COLUMN, bytesValue);
+ record.putValue(BYTES_DICT_COLUMN, bytesValue);
+ values[5] = BytesUtils.toHexString(bytesValue);
+ values[6] = values[5];
+ for (int j = 0; j < NUM_RECORDS / NUM_UNIQUE_RECORDS; j++) {
+ records.add(record);
+ }
+ }
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
- _recordReader = buildSegment();
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
- // Load the segment.
- IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR_PATH, SEGMENT_NAME), ReadMode.heap);
+ _indexSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, SEGMENT_NAME), ReadMode.mmap);
// Create transform operator and block
// NOTE: put all columns into group-by so that transform operator has expressions for all columns
- String query = String.format("SELECT COUNT(*) FROM table GROUP BY %s", StringUtils.join(COLUMN_NAMES, ", "));
+ String query = "SELECT COUNT(*) FROM table GROUP BY " + StringUtils.join(COLUMNS, ", ");
QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query);
-
List<ExpressionContext> expressions = new ArrayList<>();
- for (String column : COLUMN_NAMES) {
+ for (String column : COLUMNS) {
expressions.add(ExpressionContext.forIdentifier(column));
}
TransformPlanNode transformPlanNode =
- new TransformPlanNode(indexSegment, queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+ new TransformPlanNode(_indexSegment, queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL);
_transformOperator = transformPlanNode.run();
_transformBlock = _transformOperator.nextBlock();
}
/**
- * Unit test for {@link org.apache.pinot.core.query.aggregation.groupby.NoDictionarySingleColumnGroupKeyGenerator}
- * @throws Exception
+ * Unit test for {@link NoDictionarySingleColumnGroupKeyGenerator}
*/
@Test
- public void testSingleColumnGroupKeyGenerator()
- throws Exception {
- for (String column : COLUMN_NAMES) {
- testGroupKeyGenerator(new String[]{column});
+ public void testSingleColumnGroupKeyGenerator() {
+ for (int i = 0; i < NUM_COLUMNS - 1; i++) {
+ testGroupKeyGenerator(new int[]{i});
}
}
/**
* Unit test for {@link NoDictionaryMultiColumnGroupKeyGenerator}
- * @throws Exception
*/
@Test
- public void testMultiColumnGroupKeyGenerator()
- throws Exception {
- testGroupKeyGenerator(COLUMN_NAMES);
+ public void testMultiColumnGroupKeyGenerator() {
+ testGroupKeyGenerator(new int[]{0, 1});
+ testGroupKeyGenerator(new int[]{2, 3});
+ testGroupKeyGenerator(new int[]{4, 5});
+ testGroupKeyGenerator(new int[]{1, 2, 3});
+ testGroupKeyGenerator(new int[]{4, 5, 0});
+ testGroupKeyGenerator(new int[]{5, 4, 3, 2, 1, 0});
}
/**
* Tests multi-column group key generator when at least one column as dictionary, and others don't.
*/
@Test
- public void testMultiColumnHybridGroupKeyGenerator()
- throws Exception {
- for (String noDictColumn : NO_DICT_COLUMN_NAMES) {
- testGroupKeyGenerator(new String[]{noDictColumn, STRING_DICT_COLUMN});
+ public void testMultiColumnHybridGroupKeyGenerator() {
+ for (int i = 0; i < NUM_COLUMNS - 1; i++) {
+ testGroupKeyGenerator(new int[]{i, NUM_COLUMNS - 1});
}
}
- private void testGroupKeyGenerator(String[] groupByColumns)
- throws Exception {
- int numGroupByColumns = groupByColumns.length;
- ExpressionContext[] groupByExpressions = new ExpressionContext[numGroupByColumns];
- for (int i = 0; i < numGroupByColumns; i++) {
- groupByExpressions[i] = ExpressionContext.forIdentifier(groupByColumns[i]);
- }
-
+ private void testGroupKeyGenerator(int[] groupByColumnIndexes) {
+ int numGroupByColumns = groupByColumnIndexes.length;
GroupKeyGenerator groupKeyGenerator;
if (numGroupByColumns == 1) {
- groupKeyGenerator = new NoDictionarySingleColumnGroupKeyGenerator(_transformOperator, groupByExpressions[0],
+ groupKeyGenerator = new NoDictionarySingleColumnGroupKeyGenerator(_transformOperator,
+ ExpressionContext.forIdentifier(COLUMNS.get(groupByColumnIndexes[0])),
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
} else {
+ ExpressionContext[] groupByExpressions = new ExpressionContext[numGroupByColumns];
+ for (int i = 0; i < numGroupByColumns; i++) {
+ groupByExpressions[i] = ExpressionContext.forIdentifier(COLUMNS.get(groupByColumnIndexes[i]));
+ }
groupKeyGenerator = new NoDictionaryMultiColumnGroupKeyGenerator(_transformOperator, groupByExpressions,
InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
}
- groupKeyGenerator.generateKeysForBlock(_transformBlock, new int[NUM_ROWS]);
+ groupKeyGenerator.generateKeysForBlock(_transformBlock, new int[NUM_RECORDS]);
// Assert total number of group keys is as expected
- Set<String> expectedGroupKeys = getExpectedGroupKeys(_recordReader, groupByColumns);
- Assert.assertEquals(groupKeyGenerator.getCurrentGroupKeyUpperBound(), expectedGroupKeys.size(),
+ Set<String> expectedGroupKeys = getExpectedGroupKeys(groupByColumnIndexes);
+ assertEquals(groupKeyGenerator.getCurrentGroupKeyUpperBound(), expectedGroupKeys.size(),
"Number of group keys mis-match.");
// Assert all group key values are as expected
Iterator<GroupKeyGenerator.GroupKey> uniqueGroupKeys = groupKeyGenerator.getUniqueGroupKeys();
while (uniqueGroupKeys.hasNext()) {
GroupKeyGenerator.GroupKey groupKey = uniqueGroupKeys.next();
- String actual = groupKey._stringKey;
- Assert.assertTrue(expectedGroupKeys.contains(actual), "Unexpected group key: " + actual);
+ if (!expectedGroupKeys.contains(groupKey._stringKey)) {
+ System.out.println(expectedGroupKeys);
+ }
+ assertTrue(expectedGroupKeys.contains(groupKey._stringKey), "Unexpected group key: " + groupKey._stringKey);
}
}
- /**
- * Helper method to build group keys for a given array of group-by columns.
- *
- * @param groupByColumns Group-by columns for which to generate the group-keys.
- * @return Set of unique group keys.
- * @throws Exception
- */
- private Set<String> getExpectedGroupKeys(RecordReader recordReader, String[] groupByColumns)
- throws Exception {
+ private Set<String> getExpectedGroupKeys(int[] groupByColumnIndexes) {
+ int numGroupByColumns = groupByColumnIndexes.length;
Set<String> groupKeys = new HashSet<>();
StringBuilder stringBuilder = new StringBuilder();
-
- recordReader.rewind();
- while (recordReader.hasNext()) {
- GenericRow row = recordReader.next();
-
+ for (int i = 0; i < NUM_UNIQUE_RECORDS; i++) {
stringBuilder.setLength(0);
- for (int i = 0; i < groupByColumns.length; i++) {
- stringBuilder.append(row.getValue(groupByColumns[i]));
- if (i < groupByColumns.length - 1) {
+ String[] values = _stringValues[i];
+ for (int j = 0; j < numGroupByColumns; j++) {
+ if (j > 0) {
stringBuilder.append(GroupKeyGenerator.DELIMITER);
}
+ stringBuilder.append(values[groupByColumnIndexes[j]]);
}
groupKeys.add(stringBuilder.toString());
}
return groupKeys;
}
- /**
- * Helper method to build a segment as follows:
- * <ul>
- * <li> One string column without dictionary. </li>
- * <li> One integer column with dictionary. </li>
- * </ul>
- *
- * It also computes the unique group keys while it generates the index.
- *
- * @return Set containing unique group keys from the created segment.
- *
- * @throws Exception
- */
- private static RecordReader buildSegment()
- throws Exception {
- Schema schema = new Schema();
-
- for (int i = 0; i < COLUMN_NAMES.length; i++) {
- DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec(COLUMN_NAMES[i], DATA_TYPES[i], true);
- schema.addField(dimensionFieldSpec);
- }
-
- TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
-
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
- config.setRawIndexCreationColumns(Arrays.asList(NO_DICT_COLUMN_NAMES));
-
- config.setOutDir(INDEX_DIR_PATH);
- config.setSegmentName(SEGMENT_NAME);
-
- Random random = new Random();
- List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
- for (int i = 0; i < NUM_ROWS; i++) {
- Map<String, Object> map = new HashMap<>(NUM_COLUMNS);
-
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- String column = fieldSpec.getName();
-
- FieldSpec.DataType dataType = fieldSpec.getDataType();
- switch (dataType) {
- case INT:
- map.put(column, random.nextInt());
- break;
-
- case LONG:
- map.put(column, random.nextLong());
- break;
-
- case FLOAT:
- map.put(column, random.nextFloat());
- break;
-
- case DOUBLE:
- map.put(column, random.nextDouble());
- break;
-
- case STRING:
- map.put(column, "value_" + i);
- break;
-
- default:
- throw new IllegalArgumentException("Illegal data type specified: " + dataType);
- }
- }
-
- GenericRow genericRow = new GenericRow();
- genericRow.init(map);
- rows.add(genericRow);
- }
-
- RecordReader recordReader = new GenericRowRecordReader(rows);
- SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
- driver.init(config, recordReader);
- driver.build();
-
- return recordReader;
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(TEMP_DIR);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org