You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/29 07:34:25 UTC

[incubator-pinot] branch master updated: Optimize DistinctCount to store dictIds within segment (#5765)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fbdae  Optimize DistinctCount to store dictIds within segment (#5765)
a8fbdae is described below

commit a8fbdaeffaf99c1df4258f60ecbbab1f553d7696
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Jul 29 00:34:11 2020 -0700

    Optimize DistinctCount to store dictIds within segment (#5765)
    
    For DistinctCount aggregation function, we can store dictIds within segment without fetching the values, and read the values from dictionary before returning the result for the segment.
---
 .../org/apache/pinot/core/common/BlockValSet.java  |   8 +
 .../apache/pinot/core/common/DataBlockCache.java   |  11 +-
 .../pinot/core/operator/ProjectionOperator.java    |   9 +-
 .../core/operator/blocks/ProjectionBlock.java      |  39 ++---
 .../pinot/core/operator/blocks/TransformBlock.java |   4 -
 .../operator/docvalsets/ProjectionBlockValSet.java |  28 ++--
 .../operator/docvalsets/TransformBlockValSet.java  |   8 +
 .../function/DistinctCountAggregationFunction.java | 165 +++++++++++++++------
 .../DistinctCountMVAggregationFunction.java        |  53 ++++++-
 9 files changed, 230 insertions(+), 95 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
index 2a0f875..e7574a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.common;
 
+import javax.annotation.Nullable;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -38,6 +40,12 @@ public interface BlockValSet {
   boolean isSingleValue();
 
   /**
+   * Returns the dictionary for the column, or {@code null} if the column is not dictionary-encoded.
+   */
+  @Nullable
+  Dictionary getDictionary();
+
+  /**
    * SINGLE-VALUED COLUMN APIs
    */
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
index fbaeeed..141ecfb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java
@@ -23,9 +23,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nonnull;
+import org.apache.pinot.core.plan.DocIdSetPlanNode;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.utils.EqualityUtils;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
 
 
 /**
@@ -71,6 +71,15 @@ public class DataBlockCache {
   }
 
   /**
+   * Returns the number of documents within the current block.
+   *
+   * @return Number of documents within the current block
+   */
+  public int getNumDocs() {
+    return _length;
+  }
+
+  /**
    * SINGLE-VALUED COLUMN API
    */
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
index 32ef7c8..d525ce7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pinot.core.operator;
 
-import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataBlockCache;
 import org.apache.pinot.core.common.DataFetcher;
 import org.apache.pinot.core.common.DataSource;
-import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.operator.blocks.DocIdSetBlock;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 
@@ -33,17 +31,12 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> {
   private static final String OPERATOR_NAME = "ProjectionOperator";
 
   private final Map<String, DataSource> _dataSourceMap;
-  private final Map<String, DataSourceMetadata> _dataSourceMetadataMap;
   private final BaseOperator<DocIdSetBlock> _docIdSetOperator;
   private final DataBlockCache _dataBlockCache;
 
   public ProjectionOperator(Map<String, DataSource> dataSourceMap,
       @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator) {
     _dataSourceMap = dataSourceMap;
-    _dataSourceMetadataMap = new HashMap<>(dataSourceMap.size());
-    for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) {
-      _dataSourceMetadataMap.put(entry.getKey(), entry.getValue().getDataSourceMetadata());
-    }
     _docIdSetOperator = docIdSetOperator;
     _dataBlockCache = new DataBlockCache(new DataFetcher(dataSourceMap));
   }
@@ -66,7 +59,7 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> {
       return null;
     } else {
       _dataBlockCache.initNewBlock(docIdSetBlock.getDocIdSet(), docIdSetBlock.getSearchableLength());
-      return new ProjectionBlock(_dataSourceMetadataMap, _dataBlockCache, docIdSetBlock);
+      return new ProjectionBlock(_dataSourceMap, _dataBlockCache);
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
index c3b8774..989a870 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java
@@ -25,9 +25,8 @@ import org.apache.pinot.core.common.BlockDocIdValueSet;
 import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.DataBlockCache;
-import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet;
-import org.apache.pinot.spi.data.FieldSpec;
 
 
 /**
@@ -35,24 +34,20 @@ import org.apache.pinot.spi.data.FieldSpec;
  * It provides DocIdSetBlock for a given column.
  */
 public class ProjectionBlock implements Block {
-  private final Map<String, DataSourceMetadata> _dataSourceMetadataMap;
-  private final DocIdSetBlock _docIdSetBlock;
+  private final Map<String, DataSource> _dataSourceMap;
   private final DataBlockCache _dataBlockCache;
 
-  public ProjectionBlock(Map<String, DataSourceMetadata> dataSourceMetadataMap, DataBlockCache dataBlockCache, DocIdSetBlock docIdSetBlock) {
-    _dataSourceMetadataMap = dataSourceMetadataMap;
-    _docIdSetBlock = docIdSetBlock;
+  public ProjectionBlock(Map<String, DataSource> dataSourceMap, DataBlockCache dataBlockCache) {
+    _dataSourceMap = dataSourceMap;
     _dataBlockCache = dataBlockCache;
   }
 
-  @Override
-  public BlockValSet getBlockValueSet() {
-    throw new UnsupportedOperationException();
+  public int getNumDocs() {
+    return _dataBlockCache.getNumDocs();
   }
 
-  @Override
-  public BlockDocIdValueSet getBlockDocIdValueSet() {
-    throw new UnsupportedOperationException();
+  public BlockValSet getBlockValueSet(String column) {
+    return new ProjectionBlockValSet(_dataBlockCache, column, _dataSourceMap.get(column));
   }
 
   @Override
@@ -61,21 +56,17 @@ public class ProjectionBlock implements Block {
   }
 
   @Override
-  public BlockMetadata getMetadata() {
+  public BlockValSet getBlockValueSet() {
     throw new UnsupportedOperationException();
   }
 
-  public BlockValSet getBlockValueSet(String column) {
-    FieldSpec fieldSpec = _dataSourceMetadataMap.get(column).getFieldSpec();
-    return new ProjectionBlockValSet(_dataBlockCache, column, fieldSpec.getDataType(),
-        fieldSpec.isSingleValueField());
-  }
-
-  public DocIdSetBlock getDocIdSetBlock() {
-    return _docIdSetBlock;
+  @Override
+  public BlockDocIdValueSet getBlockDocIdValueSet() {
+    throw new UnsupportedOperationException();
   }
 
-  public int getNumDocs() {
-    return _docIdSetBlock.getSearchableLength();
+  @Override
+  public BlockMetadata getMetadata() {
+    throw new UnsupportedOperationException();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
index 7d78e07..63f57b6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java
@@ -78,8 +78,4 @@ public class TransformBlock implements Block {
   public BlockMetadata getMetadata() {
     throw new UnsupportedOperationException();
   }
-
-  public DocIdSetBlock getDocIdSetBlock() {
-    return _projectionBlock.getDocIdSetBlock();
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
index efdb41b..c355131 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java
@@ -18,9 +18,12 @@
  */
 package org.apache.pinot.core.operator.docvalsets;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.DataBlockCache;
+import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.operator.ProjectionOperator;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
@@ -32,32 +35,33 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 public class ProjectionBlockValSet implements BlockValSet {
   private final DataBlockCache _dataBlockCache;
   private final String _column;
-  private final DataType _dataType;
-  private final boolean _singleValue;
+  private final DataSource _dataSource;
 
   /**
    * Constructor for the class.
-   * The dataBlockCache argument is initialized in {@link ProjectionOperator},
-   * so that it can be reused across multiple calls to {@link ProjectionOperator#nextBlock()}.
-   *
-   * @param dataBlockCache data block cache
-   * @param column Projection column.
+   * The dataBlockCache is initialized in {@link ProjectionOperator} so that it can be reused across multiple calls to
+   * {@link ProjectionOperator#nextBlock()}.
    */
-  public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataType dataType, boolean singleValue) {
+  public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataSource dataSource) {
     _dataBlockCache = dataBlockCache;
     _column = column;
-    _dataType = dataType;
-    _singleValue = singleValue;
+    _dataSource = dataSource;
   }
 
   @Override
   public DataType getValueType() {
-    return _dataType;
+    return _dataSource.getDataSourceMetadata().getDataType();
   }
 
   @Override
   public boolean isSingleValue() {
-    return _singleValue;
+    return _dataSource.getDataSourceMetadata().isSingleValue();
+  }
+
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return _dataSource.getDictionary();
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
index 78fa38a..fd82adb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.core.operator.docvalsets;
 
+import javax.annotation.Nullable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.ProjectionBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import org.apache.pinot.core.plan.DocIdSetPlanNode;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -52,6 +54,12 @@ public class TransformBlockValSet implements BlockValSet {
     return _transformFunction.getResultMetadata().isSingleValue();
   }
 
+  @Nullable
+  @Override
+  public Dictionary getDictionary() {
+    return _transformFunction.getDictionary();
+  }
+
   @Override
   public int[] getDictionaryIdsSV() {
     return _transformFunction.transformToDictIdsSV(_projectionBlock);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index 7e7ba4b..c13c8c0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.Arrays;
 import java.util.Map;
@@ -29,10 +30,12 @@ import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
+  protected Dictionary _dictionary;
 
   public DistinctCountAggregationFunction(ExpressionContext expression) {
     super(expression);
@@ -64,7 +67,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     BlockValSet blockValSet = blockValSetMap.get(_expression);
     IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
 
-    FieldSpec.DataType valueType = blockValSet.getValueType();
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        valueSet.add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
+    DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
@@ -111,43 +126,55 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    FieldSpec.DataType valueType = blockValSet.getValueType();
 
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        getValueSet(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
+    DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], intValues[i]);
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Long.hashCode(longValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]));
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Float.hashCode(floatValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Double.hashCode(doubleValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], stringValues[i].hashCode());
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode());
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Arrays.hashCode(bytesValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i]));
         }
         break;
       default:
@@ -160,7 +187,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
 
-    FieldSpec.DataType valueType = blockValSet.getValueType();
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[] dictIds = blockValSet.getDictionaryIdsSV();
+      for (int i = 0; i < length; i++) {
+        setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], dictIds[i]);
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
+    DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
@@ -192,6 +231,12 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
           setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode());
         }
         break;
+      case BYTES:
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < length; i++) {
+          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Arrays.hashCode(bytesValues[i]));
+        }
+        break;
       default:
         throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
     }
@@ -202,7 +247,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     IntOpenHashSet valueSet = aggregationResultHolder.getResult();
     if (valueSet == null) {
       return new IntOpenHashSet();
+    }
+
+    if (_dictionary != null) {
+      // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+      return convertToValueSet(valueSet, _dictionary);
     } else {
+      // For non-dictionary-encoded expression, directly return the value set
       return valueSet;
     }
   }
@@ -212,7 +263,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
     if (valueSet == null) {
       return new IntOpenHashSet();
+    }
+
+    if (_dictionary != null) {
+      // For dictionary-encoded expression, convert dictionary ids to hash code of the values
+      return convertToValueSet(valueSet, _dictionary);
     } else {
+      // For non-dictionary-encoded expression, directly return the value set
       return valueSet;
     }
   }
@@ -244,35 +301,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   }
 
   /**
-   * Helper method to set value for a groupKey into the result holder.
-   *
-   * @param groupByResultHolder Result holder
-   * @param groupKey Group-key for which to set the value
-   * @param value Value for the group key
-   */
-  private void setValueForGroupKey(GroupByResultHolder groupByResultHolder, int groupKey, int value) {
-    IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
-    valueSet.add(value);
-  }
-
-  /**
-   * Helper method to set values for a given array of groupKeys, into the result holder.
-   *
-   * @param groupByResultHolder Result holder
-   * @param groupKeys Array of group keys for which to set the value
-   * @param value Value to be set
-   */
-  private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
-    for (int groupKey : groupKeys) {
-      setValueForGroupKey(groupByResultHolder, groupKey, value);
-    }
-  }
-
-  /**
    * Returns the value set from the result holder or creates a new one if it does not exist.
-   *
-   * @param aggregationResultHolder Result holder
-   * @return Value set from the result holder
    */
   protected static IntOpenHashSet getValueSet(AggregationResultHolder aggregationResultHolder) {
     IntOpenHashSet valueSet = aggregationResultHolder.getResult();
@@ -284,11 +313,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   }
 
   /**
-   * Returns the value set for the given group key. If one does not exist, creates a new one and returns that.
-   *
-   * @param groupByResultHolder Result holder
-   * @param groupKey Group key for which to return the value set
-   * @return Value set for the group key
+   * Returns the value set for the given group key or creates a new one if it does not exist.
    */
   protected static IntOpenHashSet getValueSet(GroupByResultHolder groupByResultHolder, int groupKey) {
     IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
@@ -298,4 +323,58 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
     }
     return valueSet;
   }
+
+  /**
+   * Helper method to set value for the given group keys into the result holder.
+   */
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey).add(value);
+    }
+  }
+
+  /**
+   * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded
+   * expression.
+   */
+  private static IntOpenHashSet convertToValueSet(IntOpenHashSet dictIdSet, Dictionary dictionary) {
+    IntOpenHashSet valueSet = new IntOpenHashSet(dictIdSet.size());
+    IntIterator iterator = dictIdSet.iterator();
+    DataType valueType = dictionary.getValueType();
+    switch (valueType) {
+      case INT:
+        while (iterator.hasNext()) {
+          valueSet.add(dictionary.getIntValue(iterator.nextInt()));
+        }
+        break;
+      case LONG:
+        while (iterator.hasNext()) {
+          valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.nextInt())));
+        }
+        break;
+      case FLOAT:
+        while (iterator.hasNext()) {
+          valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.nextInt())));
+        }
+        break;
+      case DOUBLE:
+        while (iterator.hasNext()) {
+          valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.nextInt())));
+        }
+        break;
+      case STRING:
+        while (iterator.hasNext()) {
+          valueSet.add(dictionary.getStringValue(iterator.nextInt()).hashCode());
+        }
+        break;
+      case BYTES:
+        while (iterator.hasNext()) {
+          valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.nextInt())));
+        }
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
+    }
+    return valueSet;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index 4dba11e..7d8d5d4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -25,6 +25,7 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -47,9 +48,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
   @Override
   public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    BlockValSet blockValSet = blockValSetMap.get(_expression);
     IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
 
-    BlockValSet blockValSet = blockValSetMap.get(_expression);
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        for (int dictId : dictIds[i]) {
+          valueSet.add(dictId);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
     FieldSpec.DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
@@ -100,8 +115,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
   public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    FieldSpec.DataType valueType = blockValSet.getValueType();
 
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+        for (int dictId : dictIds[i]) {
+          valueSet.add(dictId);
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
+    FieldSpec.DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();
@@ -157,8 +187,25 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
   public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
       Map<ExpressionContext, BlockValSet> blockValSetMap) {
     BlockValSet blockValSet = blockValSetMap.get(_expression);
-    FieldSpec.DataType valueType = blockValSet.getValueType();
 
+    // For dictionary-encoded expression, store dictionary ids into the value set
+    Dictionary dictionary = blockValSet.getDictionary();
+    if (dictionary != null) {
+      _dictionary = dictionary;
+      int[][] dictIds = blockValSet.getDictionaryIdsMV();
+      for (int i = 0; i < length; i++) {
+        for (int groupKey : groupKeysArray[i]) {
+          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+          for (int dictId : dictIds[i]) {
+            valueSet.add(dictId);
+          }
+        }
+      }
+      return;
+    }
+
+    // For non-dictionary-encoded expression, store hash code of the values into the value set
+    FieldSpec.DataType valueType = blockValSet.getValueType();
     switch (valueType) {
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org