You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/29 07:34:25 UTC
[incubator-pinot] branch master updated: Optimize DistinctCount to
store dictIds within segment (#5765)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a8fbdae Optimize DistinctCount to store dictIds within segment (#5765)
a8fbdae is described below
commit a8fbdaeffaf99c1df4258f60ecbbab1f553d7696
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Jul 29 00:34:11 2020 -0700
Optimize DistinctCount to store dictIds within segment (#5765)
For DistinctCount aggregation function, we can store dictIds within segment without fetching the values, and read the values from dictionary before returning the result for the segment.
---
.../org/apache/pinot/core/common/BlockValSet.java | 8 +
.../apache/pinot/core/common/DataBlockCache.java | 11 +-
.../pinot/core/operator/ProjectionOperator.java | 9 +-
.../core/operator/blocks/ProjectionBlock.java | 39 ++---
.../pinot/core/operator/blocks/TransformBlock.java | 4 -
.../operator/docvalsets/ProjectionBlockValSet.java | 28 ++--
.../operator/docvalsets/TransformBlockValSet.java | 8 +
.../function/DistinctCountAggregationFunction.java | 165 +++++++++++++++------
.../DistinctCountMVAggregationFunction.java | 53 ++++++-
9 files changed, 230 insertions(+), 95 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
index 2a0f875..e7574a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.common;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -38,6 +40,12 @@ public interface BlockValSet {
boolean isSingleValue();
/**
+ * Returns the dictionary for the column, or {@code null} if the column is not dictionary-encoded.
+ */
+ @Nullable
+ Dictionary getDictionary();
+
+ /**
* SINGLE-VALUED COLUMN APIs
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index fbaeeed..141ecfb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -23,9 +23,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
/**
@@ -71,6 +71,15 @@ public class DataBlockCache {
}
/**
+ * Returns the number of documents within the current block.
+ *
+ * @return Number of documents within the current block
+ */
+ public int getNumDocs() {
+ return _length;
+ }
+
+ /**
* SINGLE-VALUED COLUMN API
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
index 32ef7c8..d525ce7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
@@ -18,13 +18,11 @@
*/
package org.apache.pinot.core.operator;
-import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.DataBlockCache;
import org.apache.pinot.core.common.DataFetcher;
import org.apache.pinot.core.common.DataSource;
-import org.apache.pinot.core.common.DataSourceMetadata;
import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
@@ -33,17 +31,12 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> {
private static final String OPERATOR_NAME = "ProjectionOperator";
private final Map<String, DataSource> _dataSourceMap;
- private final Map<String, DataSourceMetadata> _dataSourceMetadataMap;
private final BaseOperator<DocIdSetBlock> _docIdSetOperator;
private final DataBlockCache _dataBlockCache;
public ProjectionOperator(Map<String, DataSource> dataSourceMap,
@Nullable BaseOperator<DocIdSetBlock> docIdSetOperator) {
_dataSourceMap = dataSourceMap;
- _dataSourceMetadataMap = new HashMap<>(dataSourceMap.size());
- for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
- _dataSourceMetadataMap.put(entry.getKey(), entry.getValue().getDataSourceMetadata());
- }
_docIdSetOperator = docIdSetOperator;
_dataBlockCache = new DataBlockCache(new DataFetcher(dataSourceMap));
}
@@ -66,7 +59,7 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> {
return null;
} else {
_dataBlockCache.initNewBlock(docIdSetBlock.getDocIdSet(), docIdSetBlock.getSearchableLength());
- return new ProjectionBlock(_dataSourceMetadataMap, _dataBlockCache, docIdSetBlock);
+ return new ProjectionBlock(_dataSourceMap, _dataBlockCache);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
index c3b8774..989a870 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
@@ -25,9 +25,8 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
import org.apache.pinot.core.common.BlockMetadata;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.DataBlockCache;
-import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet;
-import org.apache.pinot.spi.data.FieldSpec;
/**
@@ -35,24 +34,20 @@ import org.apache.pinot.spi.data.FieldSpec;
* It provides DocIdSetBlock for a given column.
*/
public class ProjectionBlock implements Block {
- private final Map<String, DataSourceMetadata> _dataSourceMetadataMap;
- private final DocIdSetBlock _docIdSetBlock;
+ private final Map<String, DataSource> _dataSourceMap;
private final DataBlockCache _dataBlockCache;
- public ProjectionBlock(Map<String, DataSourceMetadata> dataSourceMetadataMap, DataBlockCache dataBlockCache, DocIdSetBlock docIdSetBlock) {
- _dataSourceMetadataMap = dataSourceMetadataMap;
- _docIdSetBlock = docIdSetBlock;
+ public ProjectionBlock(Map<String, DataSource> dataSourceMap, DataBlockCache dataBlockCache) {
+ _dataSourceMap = dataSourceMap;
_dataBlockCache = dataBlockCache;
}
- @Override
- public BlockValSet getBlockValueSet() {
- throw new UnsupportedOperationException();
+ public int getNumDocs() {
+ return _dataBlockCache.getNumDocs();
}
- @Override
- public BlockDocIdValueSet getBlockDocIdValueSet() {
- throw new UnsupportedOperationException();
+ public BlockValSet getBlockValueSet(String column) {
+ return new ProjectionBlockValSet(_dataBlockCache, column, _dataSourceMap.get(column));
}
@Override
@@ -61,21 +56,17 @@ public class ProjectionBlock implements Block {
}
@Override
- public BlockMetadata getMetadata() {
+ public BlockValSet getBlockValueSet() {
throw new UnsupportedOperationException();
}
- public BlockValSet getBlockValueSet(String column) {
- FieldSpec fieldSpec = _dataSourceMetadataMap.get(column).getFieldSpec();
- return new ProjectionBlockValSet(_dataBlockCache, column, fieldSpec.getDataType(),
- fieldSpec.isSingleValueField());
- }
-
- public DocIdSetBlock getDocIdSetBlock() {
- return _docIdSetBlock;
+ @Override
+ public BlockDocIdValueSet getBlockDocIdValueSet() {
+ throw new UnsupportedOperationException();
}
- public int getNumDocs() {
- return _docIdSetBlock.getSearchableLength();
+ @Override
+ public BlockMetadata getMetadata() {
+ throw new UnsupportedOperationException();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
index 7d78e07..63f57b6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
@@ -78,8 +78,4 @@ public class TransformBlock implements Block {
public BlockMetadata getMetadata() {
throw new UnsupportedOperationException();
}
-
- public DocIdSetBlock getDocIdSetBlock() {
- return _projectionBlock.getDocIdSetBlock();
- }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
index efdb41b..c355131 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
@@ -18,9 +18,12 @@
*/
package org.apache.pinot.core.operator.docvalsets;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.common.DataBlockCache;
+import org.apache.pinot.core.common.DataSource;
import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -32,32 +35,33 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
public class ProjectionBlockValSet implements BlockValSet {
private final DataBlockCache _dataBlockCache;
private final String _column;
- private final DataType _dataType;
- private final boolean _singleValue;
+ private final DataSource _dataSource;
/**
* Constructor for the class.
- * The dataBlockCache argument is initialized in {@link ProjectionOperator},
- * so that it can be reused across multiple calls to {@link ProjectionOperator#nextBlock()}.
- *
- * @param dataBlockCache data block cache
- * @param column Projection column.
+ * The dataBlockCache is initialized in {@link ProjectionOperator} so that it can be reused across multiple calls to
+ * {@link ProjectionOperator#nextBlock()}.
*/
- public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataType dataType, boolean singleValue) {
+ public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataSource dataSource) {
_dataBlockCache = dataBlockCache;
_column = column;
- _dataType = dataType;
- _singleValue = singleValue;
+ _dataSource = dataSource;
}
@Override
public DataType getValueType() {
- return _dataType;
+ return _dataSource.getDataSourceMetadata().getDataType();
}
@Override
public boolean isSingleValue() {
- return _singleValue;
+ return _dataSource.getDataSourceMetadata().isSingleValue();
+ }
+
+ @Nullable
+ @Override
+ public Dictionary getDictionary() {
+ return _dataSource.getDictionary();
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
index 78fa38a..fd82adb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.core.operator.docvalsets;
+import javax.annotation.Nullable;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.blocks.ProjectionBlock;
import org.apache.pinot.core.operator.transform.TransformResultMetadata;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
@@ -52,6 +54,12 @@ public class TransformBlockValSet implements BlockValSet {
return _transformFunction.getResultMetadata().isSingleValue();
}
+ @Nullable
+ @Override
+ public Dictionary getDictionary() {
+ return _transformFunction.getDictionary();
+ }
+
@Override
public int[] getDictionaryIdsSV() {
return _transformFunction.transformToDictIdsSV(_projectionBlock);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 7e7ba4b..c13c8c0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.aggregation.function;
+import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.Arrays;
import java.util.Map;
@@ -29,10 +30,12 @@ import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
+ protected Dictionary _dictionary;
public DistinctCountAggregationFunction(ExpressionContext expression) {
super(expression);
@@ -64,7 +67,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
BlockValSet blockValSet = blockValSetMap.get(_expression);
IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
- FieldSpec.DataType valueType = blockValSet.getValueType();
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ valueSet.add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
+ DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
int[] intValues = blockValSet.getIntValuesSV();
@@ -111,43 +126,55 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- FieldSpec.DataType valueType = blockValSet.getValueType();
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
+ DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
int[] intValues = blockValSet.getIntValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], intValues[i]);
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
}
break;
case LONG:
long[] longValues = blockValSet.getLongValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Long.hashCode(longValues[i]));
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]));
}
break;
case FLOAT:
float[] floatValues = blockValSet.getFloatValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Float.hashCode(floatValues[i]));
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
}
break;
case DOUBLE:
double[] doubleValues = blockValSet.getDoubleValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Double.hashCode(doubleValues[i]));
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
}
break;
case STRING:
String[] stringValues = blockValSet.getStringValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], stringValues[i].hashCode());
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode());
}
break;
case BYTES:
byte[][] bytesValues = blockValSet.getBytesValuesSV();
for (int i = 0; i < length; i++) {
- setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Arrays.hashCode(bytesValues[i]));
+ getValueSet(groupByResultHolder, groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i]));
}
break;
default:
@@ -160,7 +187,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- FieldSpec.DataType valueType = blockValSet.getValueType();
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[] dictIds = blockValSet.getDictionaryIdsSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], dictIds[i]);
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
+ DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
int[] intValues = blockValSet.getIntValuesSV();
@@ -192,6 +231,12 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode());
}
break;
+ case BYTES:
+ byte[][] bytesValues = blockValSet.getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Arrays.hashCode(bytesValues[i]));
+ }
+ break;
default:
throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
}
@@ -202,7 +247,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
IntOpenHashSet valueSet = aggregationResultHolder.getResult();
if (valueSet == null) {
return new IntOpenHashSet();
+ }
+
+ if (_dictionary != null) {
+ // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+ return convertToValueSet(valueSet, _dictionary);
} else {
+ // For non-dictionary-encoded expression, directly return the value set
return valueSet;
}
}
@@ -212,7 +263,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
if (valueSet == null) {
return new IntOpenHashSet();
+ }
+
+ if (_dictionary != null) {
+ // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+ return convertToValueSet(valueSet, _dictionary);
} else {
+ // For non-dictionary-encoded expression, directly return the value set
return valueSet;
}
}
@@ -244,35 +301,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
}
/**
- * Helper method to set value for a groupKey into the result holder.
- *
- * @param groupByResultHolder Result holder
- * @param groupKey Group-key for which to set the value
- * @param value Value for the group key
- */
- private void setValueForGroupKey(GroupByResultHolder groupByResultHolder, int groupKey, int value) {
- IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
- valueSet.add(value);
- }
-
- /**
- * Helper method to set values for a given array of groupKeys, into the result holder.
- *
- * @param groupByResultHolder Result holder
- * @param groupKeys Array of group keys for which to set the value
- * @param value Value to be set
- */
- private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
- for (int groupKey : groupKeys) {
- setValueForGroupKey(groupByResultHolder, groupKey, value);
- }
- }
-
- /**
* Returns the value set from the result holder or creates a new one if it does not exist.
- *
- * @param aggregationResultHolder Result holder
- * @return Value set from the result holder
*/
protected static IntOpenHashSet getValueSet(AggregationResultHolder aggregationResultHolder) {
IntOpenHashSet valueSet = aggregationResultHolder.getResult();
@@ -284,11 +313,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
}
/**
- * Returns the value set for the given group key. If one does not exist, creates a new one and returns that.
- *
- * @param groupByResultHolder Result holder
- * @param groupKey Group key for which to return the value set
- * @return Value set for the group key
+ * Returns the value set for the given group key or creates a new one if it does not exist.
*/
protected static IntOpenHashSet getValueSet(GroupByResultHolder groupByResultHolder, int groupKey) {
IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
@@ -298,4 +323,58 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
}
return valueSet;
}
+
+ /**
+ * Helper method to set value for the given group keys into the result holder.
+ */
+ private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
+ for (int groupKey : groupKeys) {
+ getValueSet(groupByResultHolder, groupKey).add(value);
+ }
+ }
+
+ /**
+ * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded
+ * expression.
+ */
+ private static IntOpenHashSet convertToValueSet(IntOpenHashSet dictIdSet, Dictionary dictionary) {
+ IntOpenHashSet valueSet = new IntOpenHashSet(dictIdSet.size());
+ IntIterator iterator = dictIdSet.iterator();
+ DataType valueType = dictionary.getValueType();
+ switch (valueType) {
+ case INT:
+ while (iterator.hasNext()) {
+ valueSet.add(dictionary.getIntValue(iterator.nextInt()));
+ }
+ break;
+ case LONG:
+ while (iterator.hasNext()) {
+ valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.nextInt())));
+ }
+ break;
+ case FLOAT:
+ while (iterator.hasNext()) {
+ valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.nextInt())));
+ }
+ break;
+ case DOUBLE:
+ while (iterator.hasNext()) {
+ valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.nextInt())));
+ }
+ break;
+ case STRING:
+ while (iterator.hasNext()) {
+ valueSet.add(dictionary.getStringValue(iterator.nextInt()).hashCode());
+ }
+ break;
+ case BYTES:
+ while (iterator.hasNext()) {
+ valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.nextInt())));
+ }
+ break;
+ default:
+ throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
+ }
+ return valueSet;
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index 4dba11e..7d8d5d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.spi.data.FieldSpec;
@@ -47,9 +48,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
@Override
public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ BlockValSet blockValSet = blockValSetMap.get(_expression);
IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
- BlockValSet blockValSet = blockValSetMap.get(_expression);
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ for (int dictId : dictIds[i]) {
+ valueSet.add(dictId);
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
FieldSpec.DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
@@ -100,8 +115,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- FieldSpec.DataType valueType = blockValSet.getValueType();
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+ for (int dictId : dictIds[i]) {
+ valueSet.add(dictId);
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
+ FieldSpec.DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
int[][] intValues = blockValSet.getIntValuesMV();
@@ -157,8 +187,25 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
Map<ExpressionContext, BlockValSet> blockValSetMap) {
BlockValSet blockValSet = blockValSetMap.get(_expression);
- FieldSpec.DataType valueType = blockValSet.getValueType();
+ // For dictionary-encoded expression, store dictionary ids into the value set
+ Dictionary dictionary = blockValSet.getDictionary();
+ if (dictionary != null) {
+ _dictionary = dictionary;
+ int[][] dictIds = blockValSet.getDictionaryIdsMV();
+ for (int i = 0; i < length; i++) {
+ for (int groupKey : groupKeysArray[i]) {
+ IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+ for (int dictId : dictIds[i]) {
+ valueSet.add(dictId);
+ }
+ }
+ }
+ return;
+ }
+
+ // For non-dictionary-encoded expression, store hash code of the values into the value set
+ FieldSpec.DataType valueType = blockValSet.getValueType();
switch (valueType) {
case INT:
int[][] intValues = blockValSet.getIntValuesMV();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org