You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2024/02/13 07:25:48 UTC

(pinot) branch master updated: [Multi-stage] Optimize group key generation (#12394)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new dc427fb13d [Multi-stage] Optimize group key generation (#12394)
dc427fb13d is described below

commit dc427fb13dbe29c23dc2bd96dc8fffa3a30a067f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon Feb 12 23:25:42 2024 -0800

    [Multi-stage] Optimize group key generation (#12394)
---
 .../NoDictionaryMultiColumnGroupKeyGenerator.java  | 189 ++++++++++++++-------
 .../groupby/utils/BaseValueToIdMap.java            |  87 ----------
 .../aggregation/groupby/utils/BytesToIdMap.java    |  67 --------
 .../aggregation/groupby/utils/DoubleToIdMap.java   |  33 ++--
 .../aggregation/groupby/utils/FloatToIdMap.java    |  33 ++--
 .../aggregation/groupby/utils/IntToIdMap.java      |  33 ++--
 .../aggregation/groupby/utils/LongToIdMap.java     |  33 ++--
 .../{StringToIdMap.java => ObjectToIdMap.java}     |  32 ++--
 .../aggregation/groupby/utils/ValueToIdMap.java    |  44 +++--
 .../groupby/utils/ValueToIdMapFactory.java         |  11 +-
 .../pinot/core/util/DataBlockExtractUtils.java     |  14 +-
 .../operator/MultistageGroupByExecutor.java        |  92 ++++------
 .../runtime/operator/groupby/GroupIdGenerator.java |  50 ++++++
 .../operator/groupby/GroupIdGeneratorFactory.java  |  48 ++++++
 .../groupby/MultiKeysGroupIdGenerator.java         | 106 ++++++++++++
 .../groupby/OneDoubleKeyGroupIdGenerator.java      |  92 ++++++++++
 .../groupby/OneFloatKeyGroupIdGenerator.java       |  90 ++++++++++
 .../groupby/OneIntKeyGroupIdGenerator.java         |  91 ++++++++++
 .../groupby/OneLongKeyGroupIdGenerator.java        |  91 ++++++++++
 .../groupby/OneObjectKeyGroupIdGenerator.java      |  72 ++++++++
 .../operator/groupby/TwoKeysGroupIdGenerator.java  | 111 ++++++++++++
 .../runtime/operator/AggregateOperatorTest.java    |   2 +-
 22 files changed, 1033 insertions(+), 388 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index 91becf3bbe..9c7cf193a6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -57,11 +57,9 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
   private final ValueToIdMap[] _onTheFlyDictionaries;
   private final Object2IntOpenHashMap<FixedIntArray> _groupKeyMap;
   private final boolean[] _isSingleValueExpressions;
-  private final int _globalGroupIdUpperBound;
+  private final int _numGroupsLimit;
   private final boolean _nullHandlingEnabled;
 
-  private int _numGroups = 0;
-
   public NoDictionaryMultiColumnGroupKeyGenerator(BaseProjectOperator<?> projectOperator,
       ExpressionContext[] groupByExpressions, int numGroupsLimit, boolean nullHandlingEnabled) {
     _groupByExpressions = groupByExpressions;
@@ -87,12 +85,12 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
 
     _groupKeyMap = new Object2IntOpenHashMap<>();
     _groupKeyMap.defaultReturnValue(INVALID_ID);
-    _globalGroupIdUpperBound = numGroupsLimit;
+    _numGroupsLimit = numGroupsLimit;
   }
 
   @Override
   public int getGlobalGroupKeyUpperBound() {
-    return _globalGroupIdUpperBound;
+    return _numGroupsLimit;
   }
 
   @Override
@@ -117,6 +115,9 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
           case DOUBLE:
             values[i] = blockValSet.getDoubleValuesSV();
             break;
+          case BIG_DECIMAL:
+            values[i] = blockValSet.getBigDecimalValuesSV();
+            break;
           case STRING:
             values[i] = blockValSet.getStringValuesSV();
             break;
@@ -137,53 +138,134 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
         nullBitmaps[i] = valueBlock.getBlockValueSet(_groupByExpressions[i]).getNullBitmap();
       }
       for (int row = 0; row < numDocs; row++) {
-        for (int col = 0; col < _numGroupByExpressions; col++) {
-          if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
-            keyValues[col] = ID_FOR_NULL;
-          } else {
+        int numGroups = _groupKeyMap.size();
+        boolean hasInvalidKeyValue = false;
+        if (numGroups < _numGroupsLimit) {
+          for (int col = 0; col < _numGroupByExpressions; col++) {
+            if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
+              keyValues[col] = ID_FOR_NULL;
+            } else {
+              Object columnValues = values[col];
+              ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
+              int keyValue;
+              if (columnValues instanceof int[]) {
+                keyValue = onTheFlyDictionary.put(((int[]) columnValues)[row]);
+              } else if (columnValues instanceof long[]) {
+                keyValue = onTheFlyDictionary.put(((long[]) columnValues)[row]);
+              } else if (columnValues instanceof float[]) {
+                keyValue = onTheFlyDictionary.put(((float[]) columnValues)[row]);
+              } else if (columnValues instanceof double[]) {
+                keyValue = onTheFlyDictionary.put(((double[]) columnValues)[row]);
+              } else if (columnValues instanceof byte[][]) {
+                keyValue = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
+              } else {
+                keyValue = onTheFlyDictionary.put(((Object[]) columnValues)[row]);
+              }
+              keyValues[col] = keyValue;
+            }
+          }
+        } else {
+          for (int col = 0; col < _numGroupByExpressions; col++) {
+            if (nullBitmaps[col] != null && nullBitmaps[col].contains(row)) {
+              keyValues[col] = ID_FOR_NULL;
+            } else {
+              Object columnValues = values[col];
+              ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
+              int keyValue;
+              if (columnValues instanceof int[]) {
+                keyValue = onTheFlyDictionary.getId(((int[]) columnValues)[row]);
+              } else if (columnValues instanceof long[]) {
+                keyValue = onTheFlyDictionary.getId(((long[]) columnValues)[row]);
+              } else if (columnValues instanceof float[]) {
+                keyValue = onTheFlyDictionary.getId(((float[]) columnValues)[row]);
+              } else if (columnValues instanceof double[]) {
+                keyValue = onTheFlyDictionary.getId(((double[]) columnValues)[row]);
+              } else if (columnValues instanceof byte[][]) {
+                keyValue = onTheFlyDictionary.getId(new ByteArray(((byte[][]) columnValues)[row]));
+              } else {
+                keyValue = onTheFlyDictionary.getId(((Object[]) columnValues)[row]);
+              }
+              if (keyValue == INVALID_ID) {
+                hasInvalidKeyValue = true;
+                break;
+              }
+            }
+          }
+        }
+        if (hasInvalidKeyValue) {
+          groupKeys[row] = INVALID_ID;
+        } else {
+          int groupId = getGroupIdForKey(flyweightKey);
+          if (groupId == numGroups) {
+            // When a new group is added, create a new FixedIntArray
+            keyValues = new int[_numGroupByExpressions];
+            flyweightKey = new FixedIntArray(keyValues);
+          }
+          groupKeys[row] = groupId;
+        }
+      }
+    } else {
+      for (int row = 0; row < numDocs; row++) {
+        int numGroups = _groupKeyMap.size();
+        boolean hasInvalidKeyValue = false;
+        if (numGroups < _numGroupsLimit) {
+          for (int col = 0; col < _numGroupByExpressions; col++) {
             Object columnValues = values[col];
             ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
+            int keyValue;
             if (columnValues instanceof int[]) {
-              keyValues[col] = onTheFlyDictionary.put(((int[]) columnValues)[row]);
+              int columnValue = ((int[]) columnValues)[row];
+              keyValue = onTheFlyDictionary != null ? onTheFlyDictionary.put(columnValue) : columnValue;
             } else if (columnValues instanceof long[]) {
-              keyValues[col] = onTheFlyDictionary.put(((long[]) columnValues)[row]);
+              keyValue = onTheFlyDictionary.put(((long[]) columnValues)[row]);
             } else if (columnValues instanceof float[]) {
-              keyValues[col] = onTheFlyDictionary.put(((float[]) columnValues)[row]);
+              keyValue = onTheFlyDictionary.put(((float[]) columnValues)[row]);
             } else if (columnValues instanceof double[]) {
-              keyValues[col] = onTheFlyDictionary.put(((double[]) columnValues)[row]);
-            } else if (columnValues instanceof String[]) {
-              keyValues[col] = onTheFlyDictionary.put(((String[]) columnValues)[row]);
+              keyValue = onTheFlyDictionary.put(((double[]) columnValues)[row]);
             } else if (columnValues instanceof byte[][]) {
-              keyValues[col] = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
+              keyValue = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
+            } else {
+              keyValue = onTheFlyDictionary.put(((Object[]) columnValues)[row]);
             }
+            keyValues[col] = keyValue;
           }
-        }
-        groupKeys[row] = getGroupIdForFlyweightKey(flyweightKey);
-      }
-    } else {
-      for (int row = 0; row < numDocs; row++) {
-        for (int col = 0; col < _numGroupByExpressions; col++) {
-          Object columnValues = values[col];
-          ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
-          if (columnValues instanceof int[]) {
-            if (onTheFlyDictionary == null) {
-              keyValues[col] = ((int[]) columnValues)[row];
+        } else {
+          for (int col = 0; col < _numGroupByExpressions; col++) {
+            Object columnValues = values[col];
+            ValueToIdMap onTheFlyDictionary = _onTheFlyDictionaries[col];
+            int keyValue;
+            if (columnValues instanceof int[]) {
+              int columnValue = ((int[]) columnValues)[row];
+              keyValue = onTheFlyDictionary != null ? onTheFlyDictionary.getId(columnValue) : columnValue;
+            } else if (columnValues instanceof long[]) {
+              keyValue = onTheFlyDictionary.getId(((long[]) columnValues)[row]);
+            } else if (columnValues instanceof float[]) {
+              keyValue = onTheFlyDictionary.getId(((float[]) columnValues)[row]);
+            } else if (columnValues instanceof double[]) {
+              keyValue = onTheFlyDictionary.getId(((double[]) columnValues)[row]);
+            } else if (columnValues instanceof byte[][]) {
+              keyValue = onTheFlyDictionary.getId(new ByteArray(((byte[][]) columnValues)[row]));
             } else {
-              keyValues[col] = onTheFlyDictionary.put(((int[]) columnValues)[row]);
+              keyValue = onTheFlyDictionary.getId(((Object[]) columnValues)[row]);
+            }
+            if (keyValue == INVALID_ID) {
+              hasInvalidKeyValue = true;
+              break;
             }
-          } else if (columnValues instanceof long[]) {
-            keyValues[col] = onTheFlyDictionary.put(((long[]) columnValues)[row]);
-          } else if (columnValues instanceof float[]) {
-            keyValues[col] = onTheFlyDictionary.put(((float[]) columnValues)[row]);
-          } else if (columnValues instanceof double[]) {
-            keyValues[col] = onTheFlyDictionary.put(((double[]) columnValues)[row]);
-          } else if (columnValues instanceof String[]) {
-            keyValues[col] = onTheFlyDictionary.put(((String[]) columnValues)[row]);
-          } else if (columnValues instanceof byte[][]) {
-            keyValues[col] = onTheFlyDictionary.put(new ByteArray(((byte[][]) columnValues)[row]));
+            keyValues[col] = keyValue;
+          }
+        }
+        if (hasInvalidKeyValue) {
+          groupKeys[row] = INVALID_ID;
+        } else {
+          int groupId = getGroupIdForKey(flyweightKey);
+          if (groupId == numGroups) {
+            // When a new group is added, create a new FixedIntArray
+            keyValues = new int[_numGroupByExpressions];
+            flyweightKey = new FixedIntArray(keyValues);
           }
+          groupKeys[row] = groupId;
         }
-        groupKeys[row] = getGroupIdForFlyweightKey(flyweightKey);
       }
     }
   }
@@ -329,23 +411,6 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
     return new GroupKeyIterator();
   }
 
-  /**
-   * Helper method to get or create group-id for a group key.
-   *
-   * @param flyweight Group key, that is a list of objects to be grouped, will be cloned on first occurrence
-   * @return Group id
-   */
-  private int getGroupIdForFlyweightKey(FixedIntArray flyweight) {
-    int groupId = _groupKeyMap.getInt(flyweight);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        _groupKeyMap.put(flyweight.clone(), _numGroups++);
-      }
-    }
-    return groupId;
-  }
-
   /**
    * Helper method to get or create group-id for a group key.
    *
@@ -353,14 +418,12 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
    * @return Group id
    */
   private int getGroupIdForKey(FixedIntArray keyList) {
-    int groupId = _groupKeyMap.getInt(keyList);
-    if (groupId == INVALID_ID) {
-      if (_numGroups < _globalGroupIdUpperBound) {
-        groupId = _numGroups;
-        _groupKeyMap.put(keyList, _numGroups++);
-      }
+    int numGroups = _groupKeyMap.size();
+    if (numGroups < _numGroupsLimit) {
+      return _groupKeyMap.computeIfAbsent(keyList, k -> numGroups);
+    } else {
+      return _groupKeyMap.getInt(keyList);
     }
-    return groupId;
   }
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
deleted file mode 100644
index 6d889d3442..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BaseValueToIdMap.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.groupby.utils;
-
-import org.apache.pinot.spi.utils.ByteArray;
-
-
-/**
- * Abstract base class for {@link ValueToIdMap} interface.
- */
-public abstract class BaseValueToIdMap implements ValueToIdMap {
-  @Override
-  public int put(int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int put(long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int put(float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int put(double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int put(String value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int put(ByteArray value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int getInt(int id) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public long getLong(int id) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public float getFloat(int id) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public double getDouble(int id) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getString(int id) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public ByteArray getBytes(int id) {
-    throw new UnsupportedOperationException();
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
deleted file mode 100644
index 2f2fe6ec2e..0000000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/BytesToIdMap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation.groupby.utils;
-
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectArrayList;
-import it.unimi.dsi.fastutil.objects.ObjectList;
-import org.apache.pinot.spi.utils.ByteArray;
-
-
-/**
- * Implementation of {@link ValueToIdMap} for ByteArray.
- */
-public class BytesToIdMap extends BaseValueToIdMap {
-  Object2IntMap<ByteArray> _valueToIdMap;
-  ObjectList<ByteArray> _idToValueMap;
-
-  public BytesToIdMap() {
-    _valueToIdMap = new Object2IntOpenHashMap<>();
-    _valueToIdMap.defaultReturnValue(INVALID_KEY);
-    _idToValueMap = new ObjectArrayList<>();
-  }
-
-  @Override
-  public int put(ByteArray value) {
-    int id = _valueToIdMap.getInt(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
-      _idToValueMap.add(value);
-    }
-    return id;
-  }
-
-  @Override
-  public String getString(int id) {
-    return getBytes(id).toHexString();
-  }
-
-  @Override
-  public ByteArray getBytes(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.get(id);
-  }
-
-  @Override
-  public Object get(int id) {
-    return getBytes(id);
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
index 4bc5a1d8d1..0b3754ee4f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/DoubleToIdMap.java
@@ -18,18 +18,16 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.doubles.Double2IntMap;
 import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import it.unimi.dsi.fastutil.doubles.DoubleList;
 
 
 /**
  * Implementation of {@link ValueToIdMap} for double.
  */
-public class DoubleToIdMap extends BaseValueToIdMap {
-  Double2IntMap _valueToIdMap;
-  DoubleList _idToValueMap;
+public class DoubleToIdMap implements ValueToIdMap {
+  private final Double2IntOpenHashMap _valueToIdMap;
+  private final DoubleArrayList _idToValueMap;
 
   public DoubleToIdMap() {
     _valueToIdMap = new Double2IntOpenHashMap();
@@ -39,28 +37,31 @@ public class DoubleToIdMap extends BaseValueToIdMap {
 
   @Override
   public int put(double value) {
-    int id = _valueToIdMap.get(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
+    int numValues = _valueToIdMap.size();
+    int id = _valueToIdMap.computeIfAbsent(value, k -> numValues);
+    if (id == numValues) {
       _idToValueMap.add(value);
     }
     return id;
   }
 
   @Override
-  public double getDouble(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.getDouble(id);
+  public int put(Object value) {
+    return put((double) value);
   }
 
   @Override
-  public String getString(int id) {
-    return Double.toString(getDouble(id));
+  public int getId(double value) {
+    return _valueToIdMap.get(value);
   }
 
   @Override
-  public Object get(int id) {
-    return getDouble(id);
+  public int getId(Object value) {
+    return getId((double) value);
+  }
+
+  @Override
+  public Double get(int id) {
+    return _idToValueMap.getDouble(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
index a928c2de0b..8b4e41aba3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/FloatToIdMap.java
@@ -18,18 +18,16 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.floats.Float2IntMap;
 import it.unimi.dsi.fastutil.floats.Float2IntOpenHashMap;
 import it.unimi.dsi.fastutil.floats.FloatArrayList;
-import it.unimi.dsi.fastutil.floats.FloatList;
 
 
 /**
  * Implementation of {@link ValueToIdMap} for float.
  */
-public class FloatToIdMap extends BaseValueToIdMap {
-  Float2IntMap _valueToIdMap;
-  FloatList _idToValueMap;
+public class FloatToIdMap implements ValueToIdMap {
+  private final Float2IntOpenHashMap _valueToIdMap;
+  private final FloatArrayList _idToValueMap;
 
   public FloatToIdMap() {
     _valueToIdMap = new Float2IntOpenHashMap();
@@ -39,28 +37,31 @@ public class FloatToIdMap extends BaseValueToIdMap {
 
   @Override
   public int put(float value) {
-    int id = _valueToIdMap.get(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
+    int numValues = _valueToIdMap.size();
+    int id = _valueToIdMap.computeIfAbsent(value, k -> numValues);
+    if (id == numValues) {
       _idToValueMap.add(value);
     }
     return id;
   }
 
   @Override
-  public float getFloat(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.getFloat(id);
+  public int put(Object value) {
+    return put((float) value);
   }
 
   @Override
-  public String getString(int id) {
-    return Float.toString(getFloat(id));
+  public int getId(float value) {
+    return _valueToIdMap.get(value);
   }
 
   @Override
-  public Object get(int id) {
-    return getFloat(id);
+  public int getId(Object value) {
+    return getId((float) value);
+  }
+
+  @Override
+  public Float get(int id) {
+    return _idToValueMap.getFloat(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
index d07e4d62db..2a05d8a2a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/IntToIdMap.java
@@ -18,18 +18,16 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.ints.Int2IntMap;
 import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.ints.IntList;
 
 
 /**
  * Implementation of {@link ValueToIdMap} for int.
  */
-public class IntToIdMap extends BaseValueToIdMap {
-  Int2IntMap _valueToIdMap;
-  IntList _idToValueMap;
+public class IntToIdMap implements ValueToIdMap {
+  private final Int2IntOpenHashMap _valueToIdMap;
+  private final IntArrayList _idToValueMap;
 
   public IntToIdMap() {
     _valueToIdMap = new Int2IntOpenHashMap();
@@ -39,28 +37,31 @@ public class IntToIdMap extends BaseValueToIdMap {
 
   @Override
   public int put(int value) {
-    int id = _valueToIdMap.get(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
+    int numValues = _valueToIdMap.size();
+    int id = _valueToIdMap.computeIfAbsent(value, k -> numValues);
+    if (id == numValues) {
       _idToValueMap.add(value);
     }
     return id;
   }
 
   @Override
-  public int getInt(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.getInt(id);
+  public int put(Object value) {
+    return put((int) value);
   }
 
   @Override
-  public String getString(int id) {
-    return Integer.toString(getInt(id));
+  public int getId(int value) {
+    return _valueToIdMap.get(value);
   }
 
   @Override
-  public Object get(int id) {
-    return getInt(id);
+  public int getId(Object value) {
+    return getId((int) value);
+  }
+
+  @Override
+  public Integer get(int id) {
+    return _idToValueMap.getInt(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
index 06df55c2cb..cc2259d278 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/LongToIdMap.java
@@ -18,18 +18,16 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.longs.Long2IntMap;
 import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
-import it.unimi.dsi.fastutil.longs.LongList;
 
 
 /**
  * Implementation of {@link ValueToIdMap} for long.
  */
-public class LongToIdMap extends BaseValueToIdMap {
-  Long2IntMap _valueToIdMap;
-  LongList _idToValueMap;
+public class LongToIdMap implements ValueToIdMap {
+  private final Long2IntOpenHashMap _valueToIdMap;
+  private final LongArrayList _idToValueMap;
 
   public LongToIdMap() {
     _valueToIdMap = new Long2IntOpenHashMap();
@@ -39,28 +37,31 @@ public class LongToIdMap extends BaseValueToIdMap {
 
   @Override
   public int put(long value) {
-    int id = _valueToIdMap.get(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
+    int numValues = _valueToIdMap.size();
+    int id = _valueToIdMap.computeIfAbsent(value, k -> numValues);
+    if (id == numValues) {
       _idToValueMap.add(value);
     }
     return id;
   }
 
   @Override
-  public long getLong(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.getLong(id);
+  public int put(Object value) {
+    return put((long) value);
   }
 
   @Override
-  public String getString(int id) {
-    return Long.toString(getLong(id));
+  public int getId(long value) {
+    return _valueToIdMap.get(value);
   }
 
   @Override
-  public Object get(int id) {
-    return getLong(id);
+  public int getId(Object value) {
+    return getId((long) value);
+  }
+
+  @Override
+  public Long get(int id) {
+    return _idToValueMap.getLong(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/StringToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ObjectToIdMap.java
similarity index 64%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/StringToIdMap.java
rename to pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ObjectToIdMap.java
index 290eaddaf5..9a0734362d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/StringToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ObjectToIdMap.java
@@ -18,44 +18,40 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectArrayList;
-import it.unimi.dsi.fastutil.objects.ObjectList;
+import java.util.ArrayList;
 
 
 /**
- * Implementation of {@link ValueToIdMap} for String.
+ * Implementation of {@link ValueToIdMap} for Object.
  */
-public class StringToIdMap extends BaseValueToIdMap {
-  Object2IntMap<String> _valueToIdMap;
-  ObjectList<String> _idToValueMap;
+public class ObjectToIdMap implements ValueToIdMap {
+  private final Object2IntOpenHashMap<Object> _valueToIdMap;
+  private final ArrayList<Object> _idToValueMap;
 
-  public StringToIdMap() {
+  public ObjectToIdMap() {
     _valueToIdMap = new Object2IntOpenHashMap<>();
     _valueToIdMap.defaultReturnValue(INVALID_KEY);
-    _idToValueMap = new ObjectArrayList<>();
+    _idToValueMap = new ArrayList<>();
   }
 
   @Override
-  public int put(String value) {
-    int id = _valueToIdMap.getInt(value);
-    if (id == INVALID_KEY) {
-      id = _idToValueMap.size();
-      _valueToIdMap.put(value, id);
+  public int put(Object value) {
+    int numValues = _valueToIdMap.size();
+    int id = _valueToIdMap.computeIntIfAbsent(value, k -> numValues);
+    if (id == numValues) {
       _idToValueMap.add(value);
     }
     return id;
   }
 
   @Override
-  public String getString(int id) {
-    assert id < _idToValueMap.size();
-    return _idToValueMap.get(id);
+  public int getId(Object value) {
+    return _valueToIdMap.getInt(value);
   }
 
   @Override
   public Object get(int id) {
-    return getString(id);
+    return _idToValueMap.get(id);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
index 858e814c51..93d65383e7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMap.java
@@ -18,38 +18,47 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import org.apache.pinot.spi.utils.ByteArray;
-
-
 /**
  * Interface for mapping primitive values to contiguous id's.
  */
 public interface ValueToIdMap {
   int INVALID_KEY = -1;
 
-  int put(int value);
-
-  int put(long value);
-
-  int put(float value);
+  default int put(int value) {
+    throw new UnsupportedOperationException();
+  }
 
-  int put(double value);
+  default int put(long value) {
+    throw new UnsupportedOperationException();
+  }
 
-  int put(String value);
+  default int put(float value) {
+    throw new UnsupportedOperationException();
+  }
 
-  int put(ByteArray value);
+  default int put(double value) {
+    throw new UnsupportedOperationException();
+  }
 
-  int getInt(int id);
+  int put(Object value);
 
-  long getLong(int id);
+  default int getId(int value) {
+    throw new UnsupportedOperationException();
+  }
 
-  float getFloat(int id);
+  default int getId(long value) {
+    throw new UnsupportedOperationException();
+  }
 
-  double getDouble(int id);
+  default int getId(float value) {
+    throw new UnsupportedOperationException();
+  }
 
-  String getString(int id);
+  default int getId(double value) {
+    throw new UnsupportedOperationException();
+  }
 
-  ByteArray getBytes(int id);
+  int getId(Object value);
 
   /**
    * Returns the value for the given id.
@@ -59,6 +68,7 @@ public interface ValueToIdMap {
    *   <li>LONG -> Long</li>
    *   <li>FLOAT -> Float</li>
    *   <li>DOUBLE -> Double</li>
+   *   <li>BIG_DECIMAL -> BigDecimal</li>
    *   <li>STRING -> String</li>
    *   <li>BYTES -> ByteArray</li>
    * </ul>
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
index 444899e4b7..ee62225660 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/utils/ValueToIdMapFactory.java
@@ -18,18 +18,17 @@
  */
 package org.apache.pinot.core.query.aggregation.groupby.utils;
 
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
 
 
 /**
  * Factory for various implementations for {@link ValueToIdMap}
  */
 public class ValueToIdMapFactory {
-  // Private constructor to prevent instantiating the class.
   private ValueToIdMapFactory() {
   }
 
-  public static ValueToIdMap get(FieldSpec.DataType dataType) {
+  public static ValueToIdMap get(DataType dataType) {
     switch (dataType) {
       case INT:
         return new IntToIdMap();
@@ -39,12 +38,8 @@ public class ValueToIdMapFactory {
         return new FloatToIdMap();
       case DOUBLE:
         return new DoubleToIdMap();
-      case STRING:
-        return new StringToIdMap();
-      case BYTES:
-        return new BytesToIdMap();
       default:
-        throw new IllegalArgumentException("Illegal data type for ValueToIdMapFactory: " + dataType);
+        return new ObjectToIdMap();
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java
index 823527102f..4458815277 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/DataBlockExtractUtils.java
@@ -28,7 +28,6 @@ import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.CommonConstants.NullValuePlaceHolder;
 import org.roaringbitmap.PeekableIntIterator;
@@ -105,7 +104,7 @@ public final class DataBlockExtractUtils {
     }
   }
 
-  public static Key[] extractKeys(DataBlock dataBlock, int[] keyIds) {
+  public static Object[][] extractKeys(DataBlock dataBlock, int[] keyIds) {
     DataSchema dataSchema = dataBlock.getDataSchema();
     int numKeys = keyIds.length;
     ColumnDataType[] storedTypes = new ColumnDataType[numKeys];
@@ -115,7 +114,7 @@ public final class DataBlockExtractUtils {
       nullBitmaps[colId] = dataBlock.getNullRowIds(keyIds[colId]);
     }
     int numRows = dataBlock.getNumberOfRows();
-    Key[] keys = new Key[numRows];
+    Object[][] keys = new Object[numRows][];
     for (int rowId = 0; rowId < numRows; rowId++) {
       Object[] values = new Object[numKeys];
       for (int colId = 0; colId < numKeys; colId++) {
@@ -124,12 +123,13 @@ public final class DataBlockExtractUtils {
           values[colId] = extractValue(dataBlock, storedTypes[colId], rowId, keyIds[colId]);
         }
       }
-      keys[rowId] = new Key(values);
+      keys[rowId] = values;
     }
     return keys;
   }
 
-  public static Key[] extractKeys(DataBlock dataBlock, int[] keyIds, int numMatchedRows, RoaringBitmap matchedBitmap) {
+  public static Object[][] extractKeys(DataBlock dataBlock, int[] keyIds, int numMatchedRows,
+      RoaringBitmap matchedBitmap) {
     DataSchema dataSchema = dataBlock.getDataSchema();
     int numKeys = keyIds.length;
     ColumnDataType[] storedTypes = new ColumnDataType[numKeys];
@@ -138,7 +138,7 @@ public final class DataBlockExtractUtils {
       storedTypes[colId] = dataSchema.getColumnDataType(keyIds[colId]).getStoredType();
       nullBitmaps[colId] = dataBlock.getNullRowIds(keyIds[colId]);
     }
-    Key[] keys = new Key[numMatchedRows];
+    Object[][] keys = new Object[numMatchedRows][];
     PeekableIntIterator iterator = matchedBitmap.getIntIterator();
     for (int matchedRowId = 0; matchedRowId < numMatchedRows; matchedRowId++) {
       int rowId = iterator.next();
@@ -149,7 +149,7 @@ public final class DataBlockExtractUtils {
           values[colId] = extractValue(dataBlock, storedTypes[colId], rowId, keyIds[colId]);
         }
       }
-      keys[matchedRowId] = new Key(values);
+      keys[matchedRowId] = values;
     }
     return keys;
   }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
index 7b4cb28071..a89125d048 100644
--- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultistageGroupByExecutor.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
-import it.unimi.dsi.fastutil.objects.Object2IntMap;
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -32,7 +31,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
@@ -41,6 +39,8 @@ import org.apache.pinot.core.util.DataBlockExtractUtils;
 import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
 import org.apache.pinot.query.planner.plannode.AggregateNode.AggType;
 import org.apache.pinot.query.runtime.blocks.TransferableBlock;
+import org.apache.pinot.query.runtime.operator.groupby.GroupIdGenerator;
+import org.apache.pinot.query.runtime.operator.groupby.GroupIdGeneratorFactory;
 import org.apache.pinot.query.runtime.operator.utils.TypeUtils;
 import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
@@ -65,7 +65,7 @@ public class MultistageGroupByExecutor {
 
   // Mapping from the row-key to a zero based integer index. This is used when we invoke the v1 aggregation functions
   // because they use the zero based integer indexes to store results.
-  private final Object2IntOpenHashMap<Object> _groupKeyToIdMap;
+  private final GroupIdGenerator _groupIdGenerator;
 
   public MultistageGroupByExecutor(int[] groupKeyIds, AggregationFunction[] aggFunctions, int[] filterArgIds,
       int maxFilterArgId, AggType aggType, DataSchema resultSchema, Map<String, String> opChainMetadata,
@@ -92,8 +92,9 @@ public class MultistageGroupByExecutor {
       _aggregateResultHolders = null;
     }
 
-    _groupKeyToIdMap = new Object2IntOpenHashMap<>();
-    _groupKeyToIdMap.defaultReturnValue(GroupKeyGenerator.INVALID_ID);
+    _groupIdGenerator =
+        GroupIdGeneratorFactory.getGroupIdGenerator(_resultSchema.getStoredColumnDataTypes(), groupKeyIds.length,
+            _numGroupsLimit);
   }
 
   private int getNumGroupsLimit(Map<String, String> opChainMetadata, @Nullable AbstractPlanNode.NodeHint nodeHint) {
@@ -146,39 +147,27 @@ public class MultistageGroupByExecutor {
    * Fetches the result.
    */
   public List<Object[]> getResult() {
-    if (_groupKeyToIdMap.isEmpty()) {
+    int numGroups = _groupIdGenerator.getNumGroups();
+    if (numGroups == 0) {
       return Collections.emptyList();
     }
-    List<Object[]> rows = new ArrayList<>(_groupKeyToIdMap.size());
+    List<Object[]> rows = new ArrayList<>(numGroups);
     int numKeys = _groupKeyIds.length;
     int numFunctions = _aggFunctions.length;
-    int numColumns = numKeys + numFunctions;
     ColumnDataType[] resultStoredTypes = _resultSchema.getStoredColumnDataTypes();
-    if (numKeys == 1) {
-      for (Object2IntMap.Entry<Object> entry : _groupKeyToIdMap.object2IntEntrySet()) {
-        Object[] row = new Object[numColumns];
-        row[0] = entry.getKey();
-        int groupId = entry.getIntValue();
-        for (int i = 0; i < numFunctions; i++) {
-          row[i + 1] = getResultValue(i, groupId);
-        }
-        // Convert the results from AggregationFunction to the desired type
-        TypeUtils.convertRow(row, resultStoredTypes);
-        rows.add(row);
-      }
-    } else {
-      for (Object2IntMap.Entry<Object> entry : _groupKeyToIdMap.object2IntEntrySet()) {
-        Object[] row = new Object[numColumns];
-        Object[] keyValues = ((Key) entry.getKey()).getValues();
-        System.arraycopy(keyValues, 0, row, 0, numKeys);
-        int groupId = entry.getIntValue();
-        for (int i = 0; i < numFunctions; i++) {
-          row[numKeys + i] = getResultValue(i, groupId);
-        }
-        // Convert the results from AggregationFunction to the desired type
-        TypeUtils.convertRow(row, resultStoredTypes);
-        rows.add(row);
+    Iterator<GroupIdGenerator.GroupKey> groupKeyIterator =
+        _groupIdGenerator.getGroupKeyIterator(numKeys + numFunctions);
+    while (groupKeyIterator.hasNext()) {
+      GroupIdGenerator.GroupKey groupKey = groupKeyIterator.next();
+      int groupId = groupKey._groupId;
+      Object[] row = groupKey._row;
+      int columnId = numKeys;
+      for (int i = 0; i < numFunctions; i++) {
+        row[columnId++] = getResultValue(i, groupId);
       }
+      // Convert the results from AggregationFunction to the desired type
+      TypeUtils.convertRow(row, resultStoredTypes);
+      rows.add(row);
     }
     return rows;
   }
@@ -201,7 +190,7 @@ public class MultistageGroupByExecutor {
   }
 
   public boolean isNumGroupsLimitReached() {
-    return _groupKeyToIdMap.size() == _numGroupsLimit;
+    return _groupIdGenerator.getNumGroups() == _numGroupsLimit;
   }
 
   private void processAggregate(TransferableBlock block) {
@@ -212,7 +201,7 @@ public class MultistageGroupByExecutor {
         AggregationFunction aggFunction = _aggFunctions[i];
         Map<ExpressionContext, BlockValSet> blockValSetMap = AggregateOperator.getBlockValSetMap(aggFunction, block);
         GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
-        groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+        groupByResultHolder.ensureCapacity(_groupIdGenerator.getNumGroups());
         aggFunction.aggregateGroupBySV(block.getNumRows(), intKeys, groupByResultHolder, blockValSetMap);
       }
     } else {
@@ -231,7 +220,7 @@ public class MultistageGroupByExecutor {
           }
           Map<ExpressionContext, BlockValSet> blockValSetMap = AggregateOperator.getBlockValSetMap(aggFunction, block);
           GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
-          groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+          groupByResultHolder.ensureCapacity(_groupIdGenerator.getNumGroups());
           aggFunction.aggregateGroupBySV(block.getNumRows(), intKeys, groupByResultHolder, blockValSetMap);
         } else {
           // Need to filter the block before aggregation
@@ -248,7 +237,7 @@ public class MultistageGroupByExecutor {
           Map<ExpressionContext, BlockValSet> blockValSetMap =
               AggregateOperator.getFilteredBlockValSetMap(aggFunction, block, numMatchedRows, matchedBitmap);
           GroupByResultHolder groupByResultHolder = _aggregateResultHolders[i];
-          groupByResultHolder.ensureCapacity(_groupKeyToIdMap.size());
+          groupByResultHolder.ensureCapacity(_groupIdGenerator.getNumGroups());
           aggFunction.aggregateGroupBySV(numMatchedRows, filteredIntKeys, groupByResultHolder, blockValSetMap);
         }
       }
@@ -308,16 +297,16 @@ public class MultistageGroupByExecutor {
     if (numKeys == 1) {
       int groupKeyId = _groupKeyIds[0];
       for (int i = 0; i < numRows; i++) {
-        intKeys[i] = getGroupId(rows.get(i)[groupKeyId]);
+        intKeys[i] = _groupIdGenerator.getGroupId(rows.get(i)[groupKeyId]);
       }
     } else {
+      Object[] key = new Object[numKeys];
       for (int i = 0; i < numRows; i++) {
         Object[] row = rows.get(i);
-        Object[] keyValues = new Object[numKeys];
         for (int j = 0; j < numKeys; j++) {
-          keyValues[j] = row[_groupKeyIds[j]];
+          key[j] = row[_groupKeyIds[j]];
         }
-        intKeys[i] = getGroupId(new Key(keyValues));
+        intKeys[i] = _groupIdGenerator.getGroupId(key);
       }
     }
     return intKeys;
@@ -333,7 +322,7 @@ public class MultistageGroupByExecutor {
     int numRows = keys.length;
     int[] intKeys = new int[numRows];
     for (int i = 0; i < numRows; i++) {
-      intKeys[i] = getGroupId(keys[i]);
+      intKeys[i] = _groupIdGenerator.getGroupId(keys[i]);
     }
     return intKeys;
   }
@@ -354,17 +343,17 @@ public class MultistageGroupByExecutor {
     if (numKeys == 1) {
       int groupKeyId = _groupKeyIds[0];
       for (int i = 0; i < numMatchedRows; i++) {
-        intKeys[i] = getGroupId(rows.get(iterator.next())[groupKeyId]);
+        intKeys[i] = _groupIdGenerator.getGroupId(rows.get(iterator.next())[groupKeyId]);
       }
     } else {
+      Object[] key = new Object[numKeys];
       for (int i = 0; i < numMatchedRows; i++) {
         int rowId = iterator.next();
         Object[] row = rows.get(rowId);
-        Object[] keyValues = new Object[numKeys];
         for (int j = 0; j < numKeys; j++) {
-          keyValues[j] = row[_groupKeyIds[j]];
+          key[j] = row[_groupKeyIds[j]];
         }
-        intKeys[i] = getGroupId(new Key(keyValues));
+        intKeys[i] = _groupIdGenerator.getGroupId(key);
       }
     }
     return intKeys;
@@ -379,17 +368,8 @@ public class MultistageGroupByExecutor {
     }
     int[] intKeys = new int[numMatchedRows];
     for (int i = 0; i < numMatchedRows; i++) {
-      intKeys[i] = getGroupId(keys[i]);
+      intKeys[i] = _groupIdGenerator.getGroupId(keys[i]);
     }
     return intKeys;
   }
-
-  private int getGroupId(Object key) {
-    int numGroups = _groupKeyToIdMap.size();
-    if (numGroups < _numGroupsLimit) {
-      return _groupKeyToIdMap.computeIntIfAbsent(key, k -> numGroups);
-    } else {
-      return _groupKeyToIdMap.getInt(key);
-    }
-  }
 }
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGenerator.java
new file mode 100644
index 0000000000..de95033ab2
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGenerator.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import java.util.Iterator;
+
+
+public interface GroupIdGenerator {
+  int INVALID_ID = -1;
+  int NULL_ID = -2;
+
+  /**
+   * Returns the group id for the given key. When a new key is encountered, it assigns a new group id to it before
+   * reaching the groups limit, or returns {@link #INVALID_ID} when the limit is reached.
+   * For single key column, the input is a single Object. For multi key columns, the input is an Object[] containing
+   * the values for each key column.
+   */
+  int getGroupId(Object key);
+
+  int getNumGroups();
+
+  Iterator<GroupKey> getGroupKeyIterator(int numColumns);
+
+  class GroupKey {
+    public final int _groupId;
+    // Row is pre-allocated for key and value columns, and is safe to be modified
+    public final Object[] _row;
+
+    public GroupKey(int groupId, Object[] row) {
+      _groupId = groupId;
+      _row = row;
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
new file mode 100644
index 0000000000..16be037f38
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/GroupIdGeneratorFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+
+
+public class GroupIdGeneratorFactory {
+  private GroupIdGeneratorFactory() {
+  }
+
+  public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) {
+    if (numKeyColumns == 1) {
+      switch (keyTypes[0]) {
+        case INT:
+          return new OneIntKeyGroupIdGenerator(numGroupsLimit);
+        case LONG:
+          return new OneLongKeyGroupIdGenerator(numGroupsLimit);
+        case FLOAT:
+          return new OneFloatKeyGroupIdGenerator(numGroupsLimit);
+        case DOUBLE:
+          return new OneDoubleKeyGroupIdGenerator(numGroupsLimit);
+        default:
+          return new OneObjectKeyGroupIdGenerator(numGroupsLimit);
+      }
+    } else if (numKeyColumns == 2) {
+      return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1], numGroupsLimit);
+    } else {
+      return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns, numGroupsLimit);
+    }
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
new file mode 100644
index 0000000000..30019746b4
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/MultiKeysGroupIdGenerator.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMap;
+import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMapFactory;
+import org.apache.pinot.spi.utils.FixedIntArray;
+
+
+public class MultiKeysGroupIdGenerator implements GroupIdGenerator {
+  private final Object2IntOpenHashMap<FixedIntArray> _groupIdMap;
+  private final ValueToIdMap[] _keyToIdMaps;
+  private final int _numGroupsLimit;
+
+  public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) {
+    _groupIdMap = new Object2IntOpenHashMap<>();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _keyToIdMaps = new ValueToIdMap[numKeyColumns];
+    for (int i = 0; i < numKeyColumns; i++) {
+      _keyToIdMaps[i] = ValueToIdMapFactory.get(keyTypes[i].toDataType());
+    }
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    Object[] keyValues = (Object[]) key;
+    int numKeyColumns = keyValues.length;
+    int[] keyIds = new int[numKeyColumns];
+    int numGroups = _groupIdMap.size();
+    if (numGroups < _numGroupsLimit) {
+      for (int i = 0; i < numKeyColumns; i++) {
+        Object keyValue = keyValues[i];
+        keyIds[i] = keyValue != null ? _keyToIdMaps[i].put(keyValue) : NULL_ID;
+      }
+      return _groupIdMap.computeIntIfAbsent(new FixedIntArray(keyIds), k -> numGroups);
+    } else {
+      for (int i = 0; i < numKeyColumns; i++) {
+        Object keyValue = keyValues[i];
+        if (keyValue == null) {
+          keyIds[i] = NULL_ID;
+        } else {
+          int keyId = _keyToIdMaps[i].getId(keyValue);
+          if (keyId == INVALID_ID) {
+            return INVALID_ID;
+          }
+          keyIds[i] = keyId;
+        }
+      }
+      return _groupIdMap.getInt(new FixedIntArray(keyIds));
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _groupIdMap.size();
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Object2IntOpenHashMap.Entry<FixedIntArray>> _entryIterator =
+          _groupIdMap.object2IntEntrySet().fastIterator();
+
+      @Override
+      public boolean hasNext() {
+        return _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object2IntOpenHashMap.Entry<FixedIntArray> entry = _entryIterator.next();
+        int[] keyIds = entry.getKey().elements();
+        Object[] row = new Object[numColumns];
+        int numKeyColumns = keyIds.length;
+        for (int i = 0; i < numKeyColumns; i++) {
+          int keyId = keyIds[i];
+          if (keyId != NULL_ID) {
+            row[i] = _keyToIdMaps[i].get(keyId);
+          }
+        }
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
new file mode 100644
index 0000000000..cf3f920d22
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneDoubleKeyGroupIdGenerator.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.doubles.Double2IntMap;
+import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+
+
+public class OneDoubleKeyGroupIdGenerator implements GroupIdGenerator {
+  private final Double2IntOpenHashMap _groupIdMap;
+  private final int _numGroupsLimit;
+
+  private int _numGroups = 0;
+  private int _nullGroupId = INVALID_ID;
+
+  public OneDoubleKeyGroupIdGenerator(int numGroupsLimit) {
+    _groupIdMap = new Double2IntOpenHashMap();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    if (_numGroups < _numGroupsLimit) {
+      if (key == null) {
+        if (_nullGroupId == INVALID_ID) {
+          _nullGroupId = _numGroups++;
+        }
+        return _nullGroupId;
+      }
+      int groupId = _groupIdMap.computeIfAbsent((double) key, k -> _numGroups);
+      if (groupId == _numGroups) {
+        _numGroups++;
+      }
+      return groupId;
+    } else {
+      if (key == null) {
+        return _nullGroupId;
+      }
+      return _groupIdMap.get((double) key);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _numGroups;
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Double2IntOpenHashMap.Entry> _entryIterator =
+          _groupIdMap.double2IntEntrySet().fastIterator();
+      boolean _returnNull = _nullGroupId != INVALID_ID;
+
+      @Override
+      public boolean hasNext() {
+        return _returnNull || _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object[] row = new Object[numColumns];
+        if (_returnNull) {
+          _returnNull = false;
+          return new GroupKey(_nullGroupId, row);
+        }
+        Double2IntMap.Entry entry = _entryIterator.next();
+        row[0] = entry.getDoubleKey();
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
new file mode 100644
index 0000000000..5d3005dc6b
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneFloatKeyGroupIdGenerator.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.floats.Float2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+
+
+public class OneFloatKeyGroupIdGenerator implements GroupIdGenerator {
+  private final Float2IntOpenHashMap _groupIdMap;
+  private final int _numGroupsLimit;
+
+  private int _numGroups = 0;
+  private int _nullGroupId = INVALID_ID;
+
+  public OneFloatKeyGroupIdGenerator(int numGroupsLimit) {
+    _groupIdMap = new Float2IntOpenHashMap();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    if (_numGroups < _numGroupsLimit) {
+      if (key == null) {
+        if (_nullGroupId == INVALID_ID) {
+          _nullGroupId = _numGroups++;
+        }
+        return _nullGroupId;
+      }
+      int groupId = _groupIdMap.computeIfAbsent((float) key, k -> _numGroups);
+      if (groupId == _numGroups) {
+        _numGroups++;
+      }
+      return groupId;
+    } else {
+      if (key == null) {
+        return _nullGroupId;
+      }
+      return _groupIdMap.get((float) key);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _numGroups;
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Float2IntOpenHashMap.Entry> _entryIterator = _groupIdMap.float2IntEntrySet().fastIterator();
+      boolean _returnNull = _nullGroupId != INVALID_ID;
+
+      @Override
+      public boolean hasNext() {
+        return _returnNull || _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object[] row = new Object[numColumns];
+        if (_returnNull) {
+          _returnNull = false;
+          return new GroupKey(_nullGroupId, row);
+        }
+        Float2IntOpenHashMap.Entry entry = _entryIterator.next();
+        row[0] = entry.getFloatKey();
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
new file mode 100644
index 0000000000..77064f8b3e
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneIntKeyGroupIdGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+
+
+public class OneIntKeyGroupIdGenerator implements GroupIdGenerator {
+  private final Int2IntOpenHashMap _groupIdMap;
+  private final int _numGroupsLimit;
+
+  private int _numGroups = 0;
+  private int _nullGroupId = INVALID_ID;
+
+  public OneIntKeyGroupIdGenerator(int numGroupsLimit) {
+    _groupIdMap = new Int2IntOpenHashMap();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    if (_numGroups < _numGroupsLimit) {
+      if (key == null) {
+        if (_nullGroupId == INVALID_ID) {
+          _nullGroupId = _numGroups++;
+        }
+        return _nullGroupId;
+      }
+      int groupId = _groupIdMap.computeIfAbsent((int) key, k -> _numGroups);
+      if (groupId == _numGroups) {
+        _numGroups++;
+      }
+      return groupId;
+    } else {
+      if (key == null) {
+        return _nullGroupId;
+      }
+      return _groupIdMap.get((int) key);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _numGroups;
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Int2IntMap.Entry> _entryIterator = _groupIdMap.int2IntEntrySet().fastIterator();
+      boolean _returnNull = _nullGroupId != INVALID_ID;
+
+      @Override
+      public boolean hasNext() {
+        return _returnNull || _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object[] row = new Object[numColumns];
+        if (_returnNull) {
+          _returnNull = false;
+          return new GroupKey(_nullGroupId, row);
+        }
+        Int2IntMap.Entry entry = _entryIterator.next();
+        row[0] = entry.getIntKey();
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
new file mode 100644
index 0000000000..5862df3def
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneLongKeyGroupIdGenerator.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+
+
+public class OneLongKeyGroupIdGenerator implements GroupIdGenerator {
+  private final Long2IntOpenHashMap _groupIdMap;
+  private final int _numGroupsLimit;
+
+  private int _numGroups = 0;
+  private int _nullGroupId = INVALID_ID;
+
+  public OneLongKeyGroupIdGenerator(int numGroupsLimit) {
+    _groupIdMap = new Long2IntOpenHashMap();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    if (_numGroups < _numGroupsLimit) {
+      if (key == null) {
+        if (_nullGroupId == INVALID_ID) {
+          _nullGroupId = _numGroups++;
+        }
+        return _nullGroupId;
+      }
+      int groupId = _groupIdMap.computeIfAbsent((long) key, k -> _numGroups);
+      if (groupId == _numGroups) {
+        _numGroups++;
+      }
+      return groupId;
+    } else {
+      if (key == null) {
+        return _nullGroupId;
+      }
+      return _groupIdMap.get((long) key);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _numGroups;
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Long2IntMap.Entry> _entryIterator = _groupIdMap.long2IntEntrySet().fastIterator();
+      boolean _returnNull = _nullGroupId != INVALID_ID;
+
+      @Override
+      public boolean hasNext() {
+        return _returnNull || _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object[] row = new Object[numColumns];
+        if (_returnNull) {
+          _returnNull = false;
+          return new GroupKey(_nullGroupId, row);
+        }
+        Long2IntMap.Entry entry = _entryIterator.next();
+        row[0] = entry.getLongKey();
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
new file mode 100644
index 0000000000..e7d7bc3815
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/OneObjectKeyGroupIdGenerator.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+
+
+public class OneObjectKeyGroupIdGenerator implements GroupIdGenerator {
+  private final Object2IntOpenHashMap<Object> _groupIdMap;
+  private final int _numGroupsLimit;
+
+  public OneObjectKeyGroupIdGenerator(int numGroupsLimit) {
+    _groupIdMap = new Object2IntOpenHashMap<>();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    int numGroups = _groupIdMap.size();
+    if (numGroups < _numGroupsLimit) {
+      return _groupIdMap.computeIntIfAbsent(key, k -> numGroups);
+    } else {
+      return _groupIdMap.getInt(key);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _groupIdMap.size();
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Object2IntOpenHashMap.Entry<Object>> _entryIterator =
+          _groupIdMap.object2IntEntrySet().fastIterator();
+
+      @Override
+      public boolean hasNext() {
+        return _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Object2IntMap.Entry<Object> entry = _entryIterator.next();
+        Object[] row = new Object[numColumns];
+        row[0] = entry.getKey();
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
new file mode 100644
index 0000000000..21e8fcf144
--- /dev/null
+++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/groupby/TwoKeysGroupIdGenerator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.runtime.operator.groupby;
+
+import it.unimi.dsi.fastutil.longs.Long2IntMap;
+import it.unimi.dsi.fastutil.longs.Long2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import java.util.Iterator;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMap;
+import org.apache.pinot.core.query.aggregation.groupby.utils.ValueToIdMapFactory;
+
+
+public class TwoKeysGroupIdGenerator implements GroupIdGenerator {
+  private final Long2IntOpenHashMap _groupIdMap;
+  private final ValueToIdMap _firstKeyToIdMap;
+  private final ValueToIdMap _secondKeyToIdMap;
+  private final int _numGroupsLimit;
+
+  public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType, ColumnDataType secondKeyType, int numGroupsLimit) {
+    _groupIdMap = new Long2IntOpenHashMap();
+    _groupIdMap.defaultReturnValue(INVALID_ID);
+    _firstKeyToIdMap = ValueToIdMapFactory.get(firstKeyType.toDataType());
+    _secondKeyToIdMap = ValueToIdMapFactory.get(secondKeyType.toDataType());
+    _numGroupsLimit = numGroupsLimit;
+  }
+
+  @Override
+  public int getGroupId(Object key) {
+    Object[] keyValues = (Object[]) key;
+    Object firstKey = keyValues[0];
+    Object secondKey = keyValues[1];
+    int numGroups = _groupIdMap.size();
+    if (numGroups < _numGroupsLimit) {
+      int firstKeyId = firstKey != null ? _firstKeyToIdMap.put(firstKey) : NULL_ID;
+      int secondKeyId = secondKey != null ? _secondKeyToIdMap.put(secondKey) : NULL_ID;
+      long longKey = ((long) firstKeyId << 32) | (secondKeyId & 0xFFFFFFFFL);
+      return _groupIdMap.computeIfAbsent(longKey, k -> numGroups);
+    } else {
+      int firstKeyId;
+      if (firstKey != null) {
+        firstKeyId = _firstKeyToIdMap.getId(firstKey);
+        if (firstKeyId == INVALID_ID) {
+          return INVALID_ID;
+        }
+      } else {
+        firstKeyId = NULL_ID;
+      }
+      int secondKeyId;
+      if (secondKey != null) {
+        secondKeyId = _secondKeyToIdMap.getId(secondKey);
+        if (secondKeyId == INVALID_ID) {
+          return INVALID_ID;
+        }
+      } else {
+        secondKeyId = NULL_ID;
+      }
+      long longKey = ((long) firstKeyId << 32) | (secondKeyId & 0xFFFFFFFFL);
+      return _groupIdMap.get(longKey);
+    }
+  }
+
+  @Override
+  public int getNumGroups() {
+    return _groupIdMap.size();
+  }
+
+  @Override
+  public Iterator<GroupKey> getGroupKeyIterator(int numColumns) {
+    return new Iterator<GroupKey>() {
+      final ObjectIterator<Long2IntOpenHashMap.Entry> _entryIterator = _groupIdMap.long2IntEntrySet().fastIterator();
+
+      @Override
+      public boolean hasNext() {
+        return _entryIterator.hasNext();
+      }
+
+      @Override
+      public GroupKey next() {
+        Long2IntMap.Entry entry = _entryIterator.next();
+        long longKey = entry.getLongKey();
+        Object[] row = new Object[numColumns];
+        int firstKeyId = (int) (longKey >>> 32);
+        int secondKeyId = (int) longKey;
+        if (firstKeyId != NULL_ID) {
+          row[0] = _firstKeyToIdMap.get(firstKeyId);
+        }
+        if (secondKeyId != NULL_ID) {
+          row[1] = _secondKeyToIdMap.get(secondKeyId);
+        }
+        return new GroupKey(entry.getIntValue(), row);
+      }
+    };
+  }
+}
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
index 93b65dad7f..c1e5255f85 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java
@@ -266,7 +266,7 @@ public class AggregateOperatorTest {
 
     // Then:
     Assert.assertTrue(block.isErrorBlock(), "expected ERROR block from invalid computation");
-    Assert.assertTrue(block.getExceptions().get(1000).contains("String cannot be cast to class"),
+    Assert.assertTrue(block.getExceptions().get(1000).contains("cannot be cast to class"),
         "expected it to fail with class cast exception");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org