You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/07/21 22:40:44 UTC

[incubator-pinot] branch master updated: Support BYTES in group-by (#5708)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0927e15  Support BYTES in group-by (#5708)
0927e15 is described below

commit 0927e150efe5aa84cfc682be1d142d24ef0ea95f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Jul 21 15:40:30 2020 -0700

    Support BYTES in group-by (#5708)
    
    - Add ValueToIdMap (on-the-fly dictionary) for BYTES type
    - Re-order the operation to save the per-value switch case in `NoDictionaryMultiColumnGroupKeyGenerator. generateKeysForBlock()`
    - Add type specific group key iterator in `NoDictionaryMultiColumnGroupKeyGenerator` for performance improvement
    - Enhance `NoDictionaryGroupKeyGeneratorTest` to test BYTES type
---
 .../groupby/DictionaryBasedGroupKeyGenerator.java  |  39 ++-
 .../NoDictionaryMultiColumnGroupKeyGenerator.java  | 166 +++++--------
 .../NoDictionarySingleColumnGroupKeyGenerator.java | 244 ++++++++++++++-----
 .../groupby/utils/BaseValueToIdMap.java            |  13 +
 .../{DoubleToIdMap.java => BytesToIdMap.java}      |  36 +--
 .../aggregation/groupby/utils/DoubleToIdMap.java   |   3 +-
 .../aggregation/groupby/utils/FloatToIdMap.java    |   3 +-
 .../aggregation/groupby/utils/IntToIdMap.java      |   3 +-
 .../aggregation/groupby/utils/LongToIdMap.java     |   3 +-
 .../aggregation/groupby/utils/ValueToIdMap.java    |   7 +
 .../groupby/utils/ValueToIdMapFactory.java         |   7 +-
 .../pinot/queries/DistinctCountQueriesTest.java    |   2 +-
 .../groupby/NoDictionaryGroupKeyGeneratorTest.java | 270 ++++++++++-----------
 13 files changed, 433 insertions(+), 363 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index b598a07..5da07d1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -28,7 +28,6 @@ import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import javax.annotation.Nonnull;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
@@ -151,7 +150,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
   }
 
   @Override
-  public void generateKeysForBlock(@Nonnull TransformBlock transformBlock, @Nonnull int[] groupKeys) {
+  public void generateKeysForBlock(TransformBlock transformBlock, int[] groupKeys) {
     // Fetch dictionary ids in the given block for all group-by columns
     for (int i = 0; i < _numGroupByExpressions; i++) {
       BlockValSet blockValueSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
@@ -162,7 +161,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
   }
 
   @Override
-  public void generateKeysForBlock(@Nonnull TransformBlock transformBlock, @Nonnull int[][] groupKeys) {
+  public void generateKeysForBlock(TransformBlock transformBlock, int[][] groupKeys) {
     // Fetch dictionary ids in the given block for all group-by columns
     for (int i = 0; i < _numGroupByExpressions; i++) {
       BlockValSet blockValueSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
@@ -194,7 +193,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
      * @param numDocs Number of documents inside the block
      * @param outGroupIds Buffer for group id results
      */
-    void processSingleValue(int numDocs, @Nonnull int[] outGroupIds);
+    void processSingleValue(int numDocs, int[] outGroupIds);
 
     /**
      * Process a block of documents for case with multi-valued group-by columns.
@@ -202,7 +201,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
      * @param numDocs Number of documents inside the block
      * @param outGroupIds Buffer for group id results
      */
-    void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds);
+    void processMultiValue(int numDocs, int[][] outGroupIds);
 
     /**
      * Get the upper bound of group id (exclusive) inside the holder.
@@ -219,7 +218,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     private final boolean[] _flags = new boolean[_globalGroupIdUpperBound];
 
     @Override
-    public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+    public void processSingleValue(int numDocs, int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         int groupId = 0;
         for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -231,7 +230,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+    public void processMultiValue(int numDocs, int[][] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         int[] groupIds = getIntRawKeys(i);
         for (int groupId : groupIds) {
@@ -251,7 +250,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       return _flags;
     }
 
-    @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
       return new Iterator<GroupKey>() {
@@ -301,7 +299,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+    public void processSingleValue(int numDocs, int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         int rawKey = 0;
         for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -312,7 +310,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+    public void processMultiValue(int numDocs, int[][] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         int[] groupIds = getIntRawKeys(i);
         int length = groupIds.length;
@@ -344,7 +342,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       return _rawKeyToGroupIdMap;
     }
 
-    @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
       return new Iterator<GroupKey>() {
@@ -490,7 +487,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+    public void processSingleValue(int numDocs, int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         long rawKey = 0L;
         for (int j = _numGroupByExpressions - 1; j >= 0; j--) {
@@ -501,7 +498,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+    public void processMultiValue(int numDocs, int[][] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         long[] rawKeys = getLongRawKeys(i);
         int length = rawKeys.length;
@@ -534,7 +531,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       return _rawKeyToGroupIdMap;
     }
 
-    @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
       return new Iterator<GroupKey>() {
@@ -644,12 +640,12 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
    */
   private String getGroupKey(long rawKey) {
     int cardinality = _cardinalities[0];
-    StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get((int) (rawKey % cardinality)).toString());
+    StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue((int) (rawKey % cardinality)));
     rawKey /= cardinality;
     for (int i = 1; i < _numGroupByExpressions; i++) {
       groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
       cardinality = _cardinalities[i];
-      groupKeyBuilder.append(_dictionaries[i].get((int) (rawKey % cardinality)));
+      groupKeyBuilder.append(_dictionaries[i].getStringValue((int) (rawKey % cardinality)));
       rawKey /= cardinality;
     }
     return groupKeyBuilder.toString();
@@ -671,7 +667,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processSingleValue(int numDocs, @Nonnull int[] outGroupIds) {
+    public void processSingleValue(int numDocs, int[] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         int[] dictIds = new int[_numGroupByExpressions];
         for (int j = 0; j < _numGroupByExpressions; j++) {
@@ -682,7 +678,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     }
 
     @Override
-    public void processMultiValue(int numDocs, @Nonnull int[][] outGroupIds) {
+    public void processMultiValue(int numDocs, int[][] outGroupIds) {
       for (int i = 0; i < numDocs; i++) {
         IntArray[] rawKeys = getIntArrayRawKeys(i);
         int length = rawKeys.length;
@@ -715,7 +711,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       return _rawKeyToGroupIdMap;
     }
 
-    @Nonnull
     @Override
     public Iterator<GroupKey> iterator() {
       return new Iterator<GroupKey>() {
@@ -828,10 +823,10 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
    * @return String group key
    */
   private String getGroupKey(IntArray rawKey) {
-    StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get(rawKey._elements[0]).toString());
+    StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue(rawKey._elements[0]));
     for (int i = 1; i < _numGroupByExpressions; i++) {
       groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
-      groupKeyBuilder.append(_dictionaries[i].get(rawKey._elements[i]));
+      groupKeyBuilder.append(_dictionaries[i].getStringValue(rawKey._elements[i]));
     }
     return groupKeyBuilder.toString();
   }
@@ -839,6 +834,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
   /**
    * Drop un-necessary checks for highest performance.
    */
+  @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
   private static class IntArray {
     public int[] _elements;
 
@@ -855,7 +851,6 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       return result;
     }
 
-    @SuppressWarnings({"Contract", "EqualsWhichDoesntCheckParameterClass"})
     @Override
     public boolean equals(Object obj) {
       int[] that = ((IntArray) obj)._elements;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index a3e2921..1d292c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby;
 
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import java.util.Iterator;
-import java.util.Map;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
@@ -30,7 +31,8 @@ import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMapFactory
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
 import org.apache.pinot.core.util.FixedIntArray;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -45,7 +47,7 @@ import org.apache.pinot.spi.data.FieldSpec;
 public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerator {
   private final ExpressionContext[] _groupByExpressions;
   private final int _numGroupByExpressions;
-  private final FieldSpec.DataType[] _dataTypes;
+  private final DataType[] _dataTypes;
   private final Dictionary[] _dictionaries;
   private final ValueToIdMap[] _onTheFlyDictionaries;
   private final Object2IntOpenHashMap<FixedIntArray> _groupKeyMap;
@@ -57,7 +59,7 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
       ExpressionContext[] groupByExpressions, int numGroupsLimit) {
     _groupByExpressions = groupByExpressions;
     _numGroupByExpressions = groupByExpressions.length;
-    _dataTypes = new FieldSpec.DataType[_numGroupByExpressions];
+    _dataTypes = new DataType[_numGroupByExpressions];
     _dictionaries = new Dictionary[_numGroupByExpressions];
     _onTheFlyDictionaries = new ValueToIdMap[_numGroupByExpressions];
 
@@ -82,55 +84,63 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
     return _globalGroupIdUpperBound;
   }
 
-  @SuppressWarnings("ConstantConditions")
   @Override
   public void generateKeysForBlock(TransformBlock transformBlock, int[] groupKeys) {
     int numDocs = transformBlock.getNumDocs();
-    Object[] values = new Object[_numGroupByExpressions];
+    int[][] keys = new int[numDocs][_numGroupByExpressions];
     for (int i = 0; i < _numGroupByExpressions; i++) {
       BlockValSet blockValSet = transformBlock.getBlockValueSet(_groupByExpressions[i]);
       if (_dictionaries[i] != null) {
-        values[i] = blockValSet.getDictionaryIdsSV();
+        int[] dictIds = blockValSet.getDictionaryIdsSV();
+        for (int j = 0; j < numDocs; j++) {
+          keys[j][i] = dictIds[j];
+        }
       } else {
-        values[i] = getValuesFromBlockValSet(blockValSet, _dataTypes[i]);
+        ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[i];
+        switch (_dataTypes[i]) {
+          case INT:
+            int[] intValues = blockValSet.getIntValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(intValues[j]);
+            }
+            break;
+          case LONG:
+            long[] longValues = blockValSet.getLongValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(longValues[j]);
+            }
+            break;
+          case FLOAT:
+            float[] floatValues = blockValSet.getFloatValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(floatValues[j]);
+            }
+            break;
+          case DOUBLE:
+            double[] doubleValues = blockValSet.getDoubleValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(doubleValues[j]);
+            }
+            break;
+          case STRING:
+            String[] stringValues = blockValSet.getStringValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(stringValues[j]);
+            }
+            break;
+          case BYTES:
+            byte[][] bytesValues = blockValSet.getBytesValuesSV();
+            for (int j = 0; j < numDocs; j++) {
+              keys[j][i] = onTheFlyDictionary.put(new ByteArray(bytesValues[j]));
+            }
+            break;
+          default:
+            throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + _dataTypes[i]);
+        }
       }
     }
-
     for (int i = 0; i < numDocs; i++) {
-      int[] keys = new int[_numGroupByExpressions];
-      for (int j = 0; j < _numGroupByExpressions; j++) {
-        if (_dictionaries[j] != null) {
-          int[] dictIds = (int[]) values[j];
-          keys[j] = dictIds[i];
-        } else {
-          FieldSpec.DataType dataType = _dataTypes[j];
-          switch (dataType) {
-            case INT:
-              int[] intValues = (int[]) values[j];
-              keys[j] = _onTheFlyDictionaries[j].put(intValues[i]);
-              break;
-            case LONG:
-              long[] longValues = (long[]) values[j];
-              keys[j] = _onTheFlyDictionaries[j].put(longValues[i]);
-              break;
-            case FLOAT:
-              float[] floatValues = (float[]) values[j];
-              keys[j] = _onTheFlyDictionaries[j].put(floatValues[i]);
-              break;
-            case DOUBLE:
-              double[] doubleValues = (double[]) values[j];
-              keys[j] = _onTheFlyDictionaries[j].put(doubleValues[i]);
-              break;
-            case STRING:
-              String[] stringValues = (String[]) values[j];
-              keys[j] = _onTheFlyDictionaries[j].put(stringValues[i]);
-              break;
-            default:
-              throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + dataType);
-          }
-        }
-      }
-      groupKeys[i] = getGroupIdForKey(new FixedIntArray(keys));
+      groupKeys[i] = getGroupIdForKey(new FixedIntArray(keys[i]));
     }
   }
 
@@ -147,7 +157,7 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
 
   @Override
   public Iterator<GroupKey> getUniqueGroupKeys() {
-    return new GroupKeyIterator(_groupKeyMap);
+    return new GroupKeyIterator();
   }
 
   /**
@@ -170,12 +180,12 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
   /**
    * Iterator for {Group-Key, Group-id) pair.
    */
-  class GroupKeyIterator implements Iterator<GroupKey> {
-    Iterator<Map.Entry<FixedIntArray, Integer>> _iterator;
-    GroupKey _groupKey;
+  private class GroupKeyIterator implements Iterator<GroupKey> {
+    final ObjectIterator<Object2IntMap.Entry<FixedIntArray>> _iterator;
+    final GroupKey _groupKey;
 
-    public GroupKeyIterator(Map<FixedIntArray, Integer> map) {
-      _iterator = map.entrySet().iterator();
+    public GroupKeyIterator() {
+      _iterator = _groupKeyMap.object2IntEntrySet().fastIterator();
       _groupKey = new GroupKey();
     }
 
@@ -186,8 +196,8 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
 
     @Override
     public GroupKey next() {
-      Map.Entry<FixedIntArray, Integer> entry = _iterator.next();
-      _groupKey._groupId = entry.getValue();
+      Object2IntMap.Entry<FixedIntArray> entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
       _groupKey._stringKey = buildStringKeyFromIds(entry.getKey());
       return _groupKey;
     }
@@ -201,58 +211,16 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
   private String buildStringKeyFromIds(FixedIntArray keyList) {
     StringBuilder builder = new StringBuilder();
     int[] keys = keyList.elements();
-    for (int i = 0; i < keyList.size(); i++) {
-      String key;
-      int dictId = keys[i];
-
-      if (_dictionaries[i] != null) {
-        key = _dictionaries[i].get(dictId).toString();
-      } else {
-        key = _onTheFlyDictionaries[i].getString(dictId);
-      }
-
+    for (int i = 0; i < _numGroupByExpressions; i++) {
       if (i > 0) {
         builder.append(GroupKeyGenerator.DELIMITER);
       }
-      builder.append(key);
+      if (_dictionaries[i] != null) {
+        builder.append(_dictionaries[i].getStringValue(keys[i]));
+      } else {
+        builder.append(_onTheFlyDictionaries[i].getString(keys[i]));
+      }
     }
-
     return builder.toString();
   }
-
-  /**
-   * Helper method to fetch values from BlockValSet
-   * @param dataType Data type
-   * @param blockValSet Block val set
-   * @return Values from block val set
-   */
-  private Object getValuesFromBlockValSet(BlockValSet blockValSet, FieldSpec.DataType dataType) {
-    Object values;
-
-    switch (dataType) {
-      case INT:
-        values = blockValSet.getIntValuesSV();
-        break;
-
-      case LONG:
-        values = blockValSet.getLongValuesSV();
-        break;
-
-      case FLOAT:
-        values = blockValSet.getFloatValuesSV();
-        break;
-
-      case DOUBLE:
-        values = blockValSet.getDoubleValuesSV();
-        break;
-
-      case STRING:
-        values = blockValSet.getStringValuesSV();
-        break;
-
-      default:
-        throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + dataType);
-    }
-    return values;
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
index 49a1930..1d9df51 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java
@@ -28,13 +28,15 @@ import it.unimi.dsi.fastutil.longs.Long2IntMap;
 import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.TransformOperator;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -42,9 +44,10 @@ import org.apache.pinot.spi.data.FieldSpec;
  * in absence of dictionary for the group by column.
  *
  */
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenerator {
   private final ExpressionContext _groupByExpression;
-  private final FieldSpec.DataType _dataType;
+  private final DataType _dataType;
   private final Map _groupKeyMap;
   private final int _globalGroupIdUpperBound;
 
@@ -99,6 +102,12 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
           groupKeys[i] = getKeyForValue(stringValues[i]);
         }
         break;
+      case BYTES:
+        byte[][] bytesValues = blockValSet.getBytesValuesSV();
+        for (int i = 0; i < numDocs; i++) {
+          groupKeys[i] = getKeyForValue(new ByteArray(bytesValues[i]));
+        }
+        break;
       default:
         throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + _dataType);
     }
@@ -111,43 +120,35 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
    * @param keyType DataType for the key
    * @return Map
    */
-  private Map createGroupKeyMap(FieldSpec.DataType keyType) {
-    Map map;
+  private Map createGroupKeyMap(DataType keyType) {
     switch (keyType) {
       case INT:
         Int2IntMap intMap = new Int2IntOpenHashMap();
         intMap.defaultReturnValue(INVALID_ID);
-        map = intMap;
-        break;
-
+        return intMap;
       case LONG:
         Long2IntOpenHashMap longMap = new Long2IntOpenHashMap();
         longMap.defaultReturnValue(INVALID_ID);
-        map = longMap;
-        break;
-
+        return longMap;
       case FLOAT:
         Float2IntOpenHashMap floatMap = new Float2IntOpenHashMap();
         floatMap.defaultReturnValue(INVALID_ID);
-        map = floatMap;
-        break;
-
+        return floatMap;
       case DOUBLE:
         Double2IntOpenHashMap doubleMap = new Double2IntOpenHashMap();
         doubleMap.defaultReturnValue(INVALID_ID);
-        map = doubleMap;
-        break;
-
+        return doubleMap;
       case STRING:
         Object2IntOpenHashMap<String> stringMap = new Object2IntOpenHashMap<>();
         stringMap.defaultReturnValue(INVALID_ID);
-        map = stringMap;
-        break;
-
+        return stringMap;
+      case BYTES:
+        Object2IntOpenHashMap<ByteArray> bytesMap = new Object2IntOpenHashMap<>();
+        bytesMap.defaultReturnValue(INVALID_ID);
+        return bytesMap;
       default:
-        throw new IllegalArgumentException("Illegal data type for no-dictionary key generator: " + keyType);
+        throw new IllegalStateException("Illegal data type for no-dictionary key generator: " + keyType);
     }
-    return map;
   }
 
   @Override
@@ -163,84 +164,201 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
 
   @Override
   public Iterator<GroupKey> getUniqueGroupKeys() {
-    return new GroupKeyIterator(_groupKeyMap);
+    switch (_dataType) {
+      case INT:
+        return new IntGroupKeyIterator((Int2IntOpenHashMap) _groupKeyMap);
+      case LONG:
+        return new LongGroupKeyIterator((Long2IntOpenHashMap) _groupKeyMap);
+      case FLOAT:
+        return new FloatGroupKeyIterator((Float2IntOpenHashMap) _groupKeyMap);
+      case DOUBLE:
+        return new DoubleGroupKeyIterator((Double2IntOpenHashMap) _groupKeyMap);
+      case STRING:
+      case BYTES:
+        return new ObjectGroupKeyIterator((Object2IntOpenHashMap) _groupKeyMap);
+      default:
+        throw new IllegalStateException();
+    }
   }
 
-  @SuppressWarnings("unchecked")
   private int getKeyForValue(int value) {
     Int2IntMap map = (Int2IntMap) _groupKeyMap;
     int groupId = map.get(value);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        map.put(value, _numGroups++);
-      }
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
     }
     return groupId;
   }
 
-  @SuppressWarnings("unchecked")
   private int getKeyForValue(long value) {
     Long2IntMap map = (Long2IntMap) _groupKeyMap;
     int groupId = map.get(value);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        map.put(value, _numGroups++);
-      }
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
     }
     return groupId;
   }
 
-  @SuppressWarnings("unchecked")
   private int getKeyForValue(float value) {
     Float2IntMap map = (Float2IntMap) _groupKeyMap;
     int groupId = map.get(value);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        map.put(value, _numGroups++);
-      }
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
     }
     return groupId;
   }
 
-  @SuppressWarnings("unchecked")
   private int getKeyForValue(double value) {
     Double2IntMap map = (Double2IntMap) _groupKeyMap;
     int groupId = map.get(value);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        map.put(value, _numGroups++);
-      }
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
     }
     return groupId;
   }
 
-  @SuppressWarnings("unchecked")
   private int getKeyForValue(String value) {
     Object2IntMap<String> map = (Object2IntMap<String>) _groupKeyMap;
     int groupId = map.getInt(value);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        map.put(value, _numGroups++);
-      }
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
     }
     return groupId;
   }
 
-  /**
-   * Iterator for {Group-Key, Group-id) pair.
-   */
-  class GroupKeyIterator implements Iterator<GroupKey> {
-    Iterator<Map.Entry<Object, Integer>> _iterator;
-    GroupKey _groupKey;
+  private int getKeyForValue(ByteArray value) {
+    Object2IntMap<ByteArray> map = (Object2IntMap<ByteArray>) _groupKeyMap;
+    int groupId = map.getInt(value);
+    if (groupId == INVALID_ID && _numGroups < _globalGroupIdUpperBound) {
+      groupId = _numGroups++;
+      map.put(value, groupId);
+    }
+    return groupId;
+  }
+
+  private static class IntGroupKeyIterator implements Iterator<GroupKey> {
+    final Iterator<Int2IntMap.Entry> _iterator;
+    final GroupKey _groupKey;
+
+    IntGroupKeyIterator(Int2IntOpenHashMap intMap) {
+      _iterator = intMap.int2IntEntrySet().fastIterator();
+      _groupKey = new GroupKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _iterator.hasNext();
+    }
+
+    @Override
+    public GroupKey next() {
+      Int2IntMap.Entry entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
+      _groupKey._stringKey = Integer.toString(entry.getIntKey());
+      return _groupKey;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class LongGroupKeyIterator implements Iterator<GroupKey> {
+    final Iterator<Long2IntMap.Entry> _iterator;
+    final GroupKey _groupKey;
+
+    LongGroupKeyIterator(Long2IntOpenHashMap longMap) {
+      _iterator = longMap.long2IntEntrySet().fastIterator();
+      _groupKey = new GroupKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _iterator.hasNext();
+    }
+
+    @Override
+    public GroupKey next() {
+      Long2IntMap.Entry entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
+      _groupKey._stringKey = Long.toString(entry.getLongKey());
+      return _groupKey;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class FloatGroupKeyIterator implements Iterator<GroupKey> {
+    final Iterator<Float2IntMap.Entry> _iterator;
+    final GroupKey _groupKey;
+
+    FloatGroupKeyIterator(Float2IntOpenHashMap floatMap) {
+      _iterator = floatMap.float2IntEntrySet().fastIterator();
+      _groupKey = new GroupKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _iterator.hasNext();
+    }
+
+    @Override
+    public GroupKey next() {
+      Float2IntMap.Entry entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
+      _groupKey._stringKey = Float.toString(entry.getFloatKey());
+      return _groupKey;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class DoubleGroupKeyIterator implements Iterator<GroupKey> {
+    final Iterator<Double2IntMap.Entry> _iterator;
+    final GroupKey _groupKey;
+
+    DoubleGroupKeyIterator(Double2IntOpenHashMap doubleMap) {
+      _iterator = doubleMap.double2IntEntrySet().fastIterator();
+      _groupKey = new GroupKey();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return _iterator.hasNext();
+    }
+
+    @Override
+    public GroupKey next() {
+      Double2IntMap.Entry entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
+      _groupKey._stringKey = Double.toString(entry.getDoubleKey());
+      return _groupKey;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class ObjectGroupKeyIterator implements Iterator<GroupKey> {
+    final ObjectIterator<Object2IntMap.Entry> _iterator;
+    final GroupKey _groupKey;
 
-    @SuppressWarnings("unchecked")
-    public GroupKeyIterator(Map map) {
-      _iterator = (Iterator<Map.Entry<Object, Integer>>) map.entrySet().iterator();
+    ObjectGroupKeyIterator(Object2IntOpenHashMap objectMap) {
+      _iterator = objectMap.object2IntEntrySet().fastIterator();
       _groupKey = new GroupKey();
     }
 
@@ -251,8 +369,8 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera
 
     @Override
     public GroupKey next() {
-      Map.Entry<Object, Integer> entry = _iterator.next();
-      _groupKey._groupId = entry.getValue();
+      Object2IntMap.Entry entry = _iterator.next();
+      _groupKey._groupId = entry.getIntValue();
       _groupKey._stringKey = entry.getKey().toString();
       return _groupKey;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
index adfe70e..6d889d3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
+import org.apache.pinot.spi.utils.ByteArray;
+
+
 /**
  * Abstract base class for {@link ValueToIdMap} interface.
  */
@@ -48,6 +51,11 @@ public abstract class BaseValueToIdMap implements ValueToIdMap {
   }
 
   @Override
+  public int put(ByteArray value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public int getInt(int id) {
     throw new UnsupportedOperationException();
   }
@@ -71,4 +79,9 @@ public abstract class BaseValueToIdMap implements ValueToIdMap {
   public String getString(int id) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public ByteArray getBytes(int id) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
similarity index 61%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
index c882c3d..a503d79 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
@@ -18,28 +18,29 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.doubles.Double2IntMap;
-import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectList;
+import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
- * Implementation of {@link ValueToIdMap} for double.
+ * Implementation of {@link ValueToIdMap} for ByteArray.
  */
-public class DoubleToIdMap extends BaseValueToIdMap {
-  Double2IntMap _valueToIdMap;
-  DoubleList _idToValueMap;
+public class BytesToIdMap extends BaseValueToIdMap {
+  Object2IntMap<ByteArray> _valueToIdMap;
+  ObjectList<ByteArray> _idToValueMap;
 
-  public DoubleToIdMap() {
-    _valueToIdMap = new Double2IntOpenHashMap();
+  public BytesToIdMap() {
+    _valueToIdMap = new Object2IntOpenHashMap<>();
     _valueToIdMap.defaultReturnValue(INVALID_KEY);
-    _idToValueMap = new DoubleArrayList();
+    _idToValueMap = new ObjectArrayList<>();
   }
 
   @Override
-  public int put(double value) {
-    int id = _valueToIdMap.get(value);
+  public int put(ByteArray value) {
+    int id = _valueToIdMap.getInt(value);
     if (id == INVALID_KEY) {
       id = _idToValueMap.size();
       _valueToIdMap.put(value, id);
@@ -49,14 +50,13 @@ public class DoubleToIdMap extends BaseValueToIdMap {
   }
 
   @Override
-  public double getDouble(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.getDouble(id);
+  public String getString(int id) {
+    return getBytes(id).toHexString();
   }
 
   @Override
-  public String getString(int id) {
+  public ByteArray getBytes(int id) {
     assert id < _idToValueMap.size();
-    return (Double.valueOf(_idToValueMap.getDouble(id)).toString());
+    return _idToValueMap.get(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
index c882c3d..396a136 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
@@ -56,7 +56,6 @@ public class DoubleToIdMap extends BaseValueToIdMap {
 
   @Override
   public String getString(int id) {
-    assert id < _idToValueMap.size();
-    return (Double.valueOf(_idToValueMap.getDouble(id)).toString());
+    return Double.toString(getDouble(id));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
index 0f21c2c..390f38d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
@@ -56,7 +56,6 @@ public class FloatToIdMap extends BaseValueToIdMap {
 
   @Override
   public String getString(int id) {
-    assert id < _idToValueMap.size();
-    return (Float.valueOf(_idToValueMap.getFloat(id)).toString());
+    return Float.toString(getFloat(id));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
index 71b4025..0afc61d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
@@ -56,7 +56,6 @@ public class IntToIdMap extends BaseValueToIdMap {
 
   @Override
   public String getString(int id) {
-    assert id < _idToValueMap.size();
-    return (Integer.valueOf(_idToValueMap.getInt(id)).toString());
+    return Integer.toString(getInt(id));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
index a498662..6a68792 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
@@ -56,7 +56,6 @@ public class LongToIdMap extends BaseValueToIdMap {
 
   @Override
   public String getString(int id) {
-    assert id < _idToValueMap.size();
-    return (Long.valueOf(_idToValueMap.getLong(id)).toString());
+    return Long.toString(getLong(id));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
index 79873ae..96d1be0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
+import org.apache.pinot.spi.utils.ByteArray;
+
+
 /**
  * Interface for mapping primitive values to contiguous id's.
  */
@@ -34,6 +37,8 @@ public interface ValueToIdMap {
 
   int put(String value);
 
+  int put(ByteArray value);
+
   int getInt(int id);
 
   long getLong(int id);
@@ -43,4 +48,6 @@ public interface ValueToIdMap {
   double getDouble(int id);
 
   String getString(int id);
+
+  ByteArray getBytes(int id);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
index 803d68a..444899e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
@@ -33,19 +33,16 @@ public class ValueToIdMapFactory {
     switch (dataType) {
       case INT:
         return new IntToIdMap();
-
       case LONG:
         return new LongToIdMap();
-
       case FLOAT:
         return new FloatToIdMap();
-
       case DOUBLE:
         return new DoubleToIdMap();
-
       case STRING:
         return new StringToIdMap();
-
+      case BYTES:
+        return new BytesToIdMap();
       default:
         throw new IllegalArgumentException("Illegal data type for ValueToIdMapFactory: " + dataType);
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
index 9ef2c07..0a9c105 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountQueriesTest.java
@@ -257,7 +257,7 @@ public class DistinctCountQueriesTest extends BaseQueriesTest {
   @AfterClass
   public void tearDown()
       throws IOException {
-    FileUtils.deleteDirectory(INDEX_DIR);
     _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
index 16b7046..7b9df60 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
@@ -19,16 +19,16 @@
 package org.apache.pinot.query.aggregation.groupby;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.segment.ReadMode;
 import org.apache.pinot.core.data.readers.GenericRowRecordReader;
@@ -49,231 +49,207 @@ import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUt
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.BytesUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 /**
  * Unit test for {@link NoDictionaryMultiColumnGroupKeyGenerator}
  */
 public class NoDictionaryGroupKeyGeneratorTest {
-  private static final String SEGMENT_NAME = "testSegment";
-  private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "NoDictionaryGroupKeyGeneratorTest");
+  private static final Random RANDOM = new Random();
 
-  private static final String STRING_DICT_COLUMN = "string_dict_column";
-  private static final String[] COLUMN_NAMES =
-      {"int_column", "long_column", "float_column", "double_column", "string_column", STRING_DICT_COLUMN};
-  private static final String[] NO_DICT_COLUMN_NAMES =
-      {"int_column", "long_column", "float_column", "double_column", "string_column"};
-  private static final FieldSpec.DataType[] DATA_TYPES =
-      {FieldSpec.DataType.INT, FieldSpec.DataType.LONG, FieldSpec.DataType.FLOAT, FieldSpec.DataType.DOUBLE, FieldSpec.DataType.STRING, FieldSpec.DataType.STRING};
-  private static final int NUM_COLUMNS = COLUMN_NAMES.length;
-  private static final int NUM_ROWS = 1000;
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
 
-  private RecordReader _recordReader;
+  private static final String INT_COLUMN = "intColumn";
+  private static final String LONG_COLUMN = "longColumn";
+  private static final String FLOAT_COLUMN = "floatColumn";
+  private static final String DOUBLE_COLUMN = "doubleColumn";
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String BYTES_COLUMN = "bytesColumn";
+  private static final String BYTES_DICT_COLUMN = "bytesDictColumn";
+  private static final List<String> COLUMNS = Arrays
+      .asList(INT_COLUMN, LONG_COLUMN, FLOAT_COLUMN, DOUBLE_COLUMN, STRING_COLUMN, BYTES_COLUMN, BYTES_DICT_COLUMN);
+  private static final int NUM_COLUMNS = COLUMNS.size();
+  private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(COLUMNS.subList(0, NUM_COLUMNS - 1)).build();
+  private static final Schema SCHEMA =
+      new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+          .addSingleValueDimension(LONG_COLUMN, FieldSpec.DataType.LONG)
+          .addSingleValueDimension(FLOAT_COLUMN, FieldSpec.DataType.FLOAT)
+          .addSingleValueDimension(DOUBLE_COLUMN, FieldSpec.DataType.DOUBLE)
+          .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+          .addSingleValueDimension(BYTES_COLUMN, FieldSpec.DataType.BYTES)
+          .addSingleValueDimension(BYTES_DICT_COLUMN, FieldSpec.DataType.BYTES).build();
+
+  private static final int NUM_RECORDS = 1000;
+  private static final int NUM_UNIQUE_RECORDS = 100;
+
+  private final String[][] _stringValues = new String[NUM_UNIQUE_RECORDS][NUM_COLUMNS];
+  private IndexSegment _indexSegment;
   private TransformOperator _transformOperator;
   private TransformBlock _transformBlock;
 
   @BeforeClass
-  public void setup()
+  public void setUp()
       throws Exception {
-    FileUtils.deleteQuietly(new File(INDEX_DIR_PATH));
+    FileUtils.deleteDirectory(TEMP_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+    for (int i = 0; i < NUM_UNIQUE_RECORDS; i++) {
+      GenericRow record = new GenericRow();
+      String[] values = _stringValues[i];
+      int intValue = RANDOM.nextInt();
+      record.putValue(INT_COLUMN, intValue);
+      values[0] = Integer.toString(intValue);
+      long longValue = RANDOM.nextLong();
+      record.putValue(LONG_COLUMN, longValue);
+      values[1] = Long.toString(longValue);
+      float floatValue = RANDOM.nextFloat();
+      record.putValue(FLOAT_COLUMN, floatValue);
+      values[2] = Float.toString(floatValue);
+      double doubleValue = RANDOM.nextDouble();
+      record.putValue(DOUBLE_COLUMN, doubleValue);
+      values[3] = Double.toString(doubleValue);
+      String stringValue = RandomStringUtils.randomAlphabetic(10);
+      record.putValue(STRING_COLUMN, stringValue);
+      values[4] = stringValue;
+      // NOTE: Create fixed-length bytes so that dictionary can be generated.
+      byte[] bytesValue = new byte[10];
+      RANDOM.nextBytes(bytesValue);
+      record.putValue(BYTES_COLUMN, bytesValue);
+      record.putValue(BYTES_DICT_COLUMN, bytesValue);
+      values[5] = BytesUtils.toHexString(bytesValue);
+      values[6] = values[5];
+      for (int j = 0; j < NUM_RECORDS / NUM_UNIQUE_RECORDS; j++) {
+        records.add(record);
+      }
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(TEMP_DIR.getPath());
 
-    _recordReader = buildSegment();
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
 
-    // Load the segment.
-    IndexSegment indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR_PATH, SEGMENT_NAME), ReadMode.heap);
+    _indexSegment = ImmutableSegmentLoader.load(new File(TEMP_DIR, SEGMENT_NAME), ReadMode.mmap);
 
     // Create transform operator and block
     // NOTE: put all columns into group-by so that transform operator has expressions for all columns
-    String query = String.format("SELECT COUNT(*) FROM table GROUP BY %s", StringUtils.join(COLUMN_NAMES, ", "));
+    String query = "SELECT COUNT(*) FROM table GROUP BY " + StringUtils.join(COLUMNS, ", ");
     QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query);
-
     List<ExpressionContext> expressions = new ArrayList<>();
-    for (String column : COLUMN_NAMES) {
+    for (String column : COLUMNS) {
       expressions.add(ExpressionContext.forIdentifier(column));
     }
     TransformPlanNode transformPlanNode =
-        new TransformPlanNode(indexSegment, queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL);
+        new TransformPlanNode(_indexSegment, queryContext, expressions, DocIdSetPlanNode.MAX_DOC_PER_CALL);
     _transformOperator = transformPlanNode.run();
     _transformBlock = _transformOperator.nextBlock();
   }
 
   /**
-   * Unit test for {@link org.apache.pinot.core.query.aggregation.groupby.NoDictionarySingleColumnGroupKeyGenerator}
-   * @throws Exception
+   * Unit test for {@link NoDictionarySingleColumnGroupKeyGenerator}
    */
   @Test
-  public void testSingleColumnGroupKeyGenerator()
-      throws Exception {
-    for (String column : COLUMN_NAMES) {
-      testGroupKeyGenerator(new String[]{column});
+  public void testSingleColumnGroupKeyGenerator() {
+    for (int i = 0; i < NUM_COLUMNS - 1; i++) {
+      testGroupKeyGenerator(new int[]{i});
     }
   }
 
   /**
    * Unit test for {@link NoDictionaryMultiColumnGroupKeyGenerator}
-   * @throws Exception
    */
   @Test
-  public void testMultiColumnGroupKeyGenerator()
-      throws Exception {
-    testGroupKeyGenerator(COLUMN_NAMES);
+  public void testMultiColumnGroupKeyGenerator() {
+    testGroupKeyGenerator(new int[]{0, 1});
+    testGroupKeyGenerator(new int[]{2, 3});
+    testGroupKeyGenerator(new int[]{4, 5});
+    testGroupKeyGenerator(new int[]{1, 2, 3});
+    testGroupKeyGenerator(new int[]{4, 5, 0});
+    testGroupKeyGenerator(new int[]{5, 4, 3, 2, 1, 0});
   }
 
   /**
    * Tests multi-column group key generator when at least one column as dictionary, and others don't.
    */
   @Test
-  public void testMultiColumnHybridGroupKeyGenerator()
-      throws Exception {
-    for (String noDictColumn : NO_DICT_COLUMN_NAMES) {
-      testGroupKeyGenerator(new String[]{noDictColumn, STRING_DICT_COLUMN});
+  public void testMultiColumnHybridGroupKeyGenerator() {
+    for (int i = 0; i < NUM_COLUMNS - 1; i++) {
+      testGroupKeyGenerator(new int[]{i, NUM_COLUMNS - 1});
     }
   }
 
-  private void testGroupKeyGenerator(String[] groupByColumns)
-      throws Exception {
-    int numGroupByColumns = groupByColumns.length;
-    ExpressionContext[] groupByExpressions = new ExpressionContext[numGroupByColumns];
-    for (int i = 0; i < numGroupByColumns; i++) {
-      groupByExpressions[i] = ExpressionContext.forIdentifier(groupByColumns[i]);
-    }
-
+  private void testGroupKeyGenerator(int[] groupByColumnIndexes) {
+    int numGroupByColumns = groupByColumnIndexes.length;
     GroupKeyGenerator groupKeyGenerator;
     if (numGroupByColumns == 1) {
-      groupKeyGenerator = new NoDictionarySingleColumnGroupKeyGenerator(_transformOperator, groupByExpressions[0],
+      groupKeyGenerator = new NoDictionarySingleColumnGroupKeyGenerator(_transformOperator,
+          ExpressionContext.forIdentifier(COLUMNS.get(groupByColumnIndexes[0])),
           InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
     } else {
+      ExpressionContext[] groupByExpressions = new ExpressionContext[numGroupByColumns];
+      for (int i = 0; i < numGroupByColumns; i++) {
+        groupByExpressions[i] = ExpressionContext.forIdentifier(COLUMNS.get(groupByColumnIndexes[i]));
+      }
       groupKeyGenerator = new NoDictionaryMultiColumnGroupKeyGenerator(_transformOperator, groupByExpressions,
           InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT);
     }
-    groupKeyGenerator.generateKeysForBlock(_transformBlock, new int[NUM_ROWS]);
+    groupKeyGenerator.generateKeysForBlock(_transformBlock, new int[NUM_RECORDS]);
 
     // Assert total number of group keys is as expected
-    Set<String> expectedGroupKeys = getExpectedGroupKeys(_recordReader, groupByColumns);
-    Assert.assertEquals(groupKeyGenerator.getCurrentGroupKeyUpperBound(), expectedGroupKeys.size(),
+    Set<String> expectedGroupKeys = getExpectedGroupKeys(groupByColumnIndexes);
+    assertEquals(groupKeyGenerator.getCurrentGroupKeyUpperBound(), expectedGroupKeys.size(),
         "Number of group keys mis-match.");
 
     // Assert all group key values are as expected
     Iterator<GroupKeyGenerator.GroupKey> uniqueGroupKeys = groupKeyGenerator.getUniqueGroupKeys();
     while (uniqueGroupKeys.hasNext()) {
       GroupKeyGenerator.GroupKey groupKey = uniqueGroupKeys.next();
-      String actual = groupKey._stringKey;
-      Assert.assertTrue(expectedGroupKeys.contains(actual), "Unexpected group key: " + actual);
+      if (!expectedGroupKeys.contains(groupKey._stringKey)) {
+        System.out.println(expectedGroupKeys);
+      }
+      assertTrue(expectedGroupKeys.contains(groupKey._stringKey), "Unexpected group key: " + groupKey._stringKey);
     }
   }
 
-  /**
-   * Helper method to build group keys for a given array of group-by columns.
-   *
-   * @param groupByColumns Group-by columns for which to generate the group-keys.
-   * @return Set of unique group keys.
-   * @throws Exception
-   */
-  private Set<String> getExpectedGroupKeys(RecordReader recordReader, String[] groupByColumns)
-      throws Exception {
+  private Set<String> getExpectedGroupKeys(int[] groupByColumnIndexes) {
+    int numGroupByColumns = groupByColumnIndexes.length;
     Set<String> groupKeys = new HashSet<>();
     StringBuilder stringBuilder = new StringBuilder();
-
-    recordReader.rewind();
-    while (recordReader.hasNext()) {
-      GenericRow row = recordReader.next();
-
+    for (int i = 0; i < NUM_UNIQUE_RECORDS; i++) {
       stringBuilder.setLength(0);
-      for (int i = 0; i < groupByColumns.length; i++) {
-        stringBuilder.append(row.getValue(groupByColumns[i]));
-        if (i < groupByColumns.length - 1) {
+      String[] values = _stringValues[i];
+      for (int j = 0; j < numGroupByColumns; j++) {
+        if (j > 0) {
           stringBuilder.append(GroupKeyGenerator.DELIMITER);
         }
+        stringBuilder.append(values[groupByColumnIndexes[j]]);
       }
       groupKeys.add(stringBuilder.toString());
     }
     return groupKeys;
   }
 
-  /**
-   * Helper method to build a segment as follows:
-   * <ul>
-   *   <li> One string column without dictionary. </li>
-   *   <li> One integer column with dictionary. </li>
-   * </ul>
-   *
-   * It also computes the unique group keys while it generates the index.
-   *
-   * @return Set containing unique group keys from the created segment.
-   *
-   * @throws Exception
-   */
-  private static RecordReader buildSegment()
-      throws Exception {
-    Schema schema = new Schema();
-
-    for (int i = 0; i < COLUMN_NAMES.length; i++) {
-      DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec(COLUMN_NAMES[i], DATA_TYPES[i], true);
-      schema.addField(dimensionFieldSpec);
-    }
-
-    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
-
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
-    config.setRawIndexCreationColumns(Arrays.asList(NO_DICT_COLUMN_NAMES));
-
-    config.setOutDir(INDEX_DIR_PATH);
-    config.setSegmentName(SEGMENT_NAME);
-
-    Random random = new Random();
-    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Map<String, Object> map = new HashMap<>(NUM_COLUMNS);
-
-      for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
-        String column = fieldSpec.getName();
-
-        FieldSpec.DataType dataType = fieldSpec.getDataType();
-        switch (dataType) {
-          case INT:
-            map.put(column, random.nextInt());
-            break;
-
-          case LONG:
-            map.put(column, random.nextLong());
-            break;
-
-          case FLOAT:
-            map.put(column, random.nextFloat());
-            break;
-
-          case DOUBLE:
-            map.put(column, random.nextDouble());
-            break;
-
-          case STRING:
-            map.put(column, "value_" + i);
-            break;
-
-          default:
-            throw new IllegalArgumentException("Illegal data type specified: " + dataType);
-        }
-      }
-
-      GenericRow genericRow = new GenericRow();
-      genericRow.init(map);
-      rows.add(genericRow);
-    }
-
-    RecordReader recordReader = new GenericRowRecordReader(rows);
-    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    driver.init(config, recordReader);
-    driver.build();
-
-    return recordReader;
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(TEMP_DIR);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org