You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ki...@apache.org on 2020/08/16 09:29:19 UTC

[incubator-pinot] branch exact-distinct-count created (now a7ab0fd)

This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a change to branch exact-distinct-count
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at a7ab0fd  Support for exact distinct count for non int data types

This branch includes the following new commits:

     new a7ab0fd  Support for exact distinct count for non int data types

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Support for exact distinct count for non int data types

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kishoreg pushed a commit to branch exact-distinct-count
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit a7ab0fd56978ca4412f86d6c09d86e1d3cb35baf
Author: kishoreg <g....@gmail.com>
AuthorDate: Sun Aug 16 02:28:55 2020 -0700

    Support for exact distinct count for non int data types
---
 .../common/function/AggregationFunctionType.java   |   3 +-
 .../apache/pinot/core/common/ObjectSerDeUtils.java | 176 +++++++++++++--
 .../query/DictionaryBasedAggregationOperator.java  |  24 +-
 .../function/DistinctCountAggregationFunction.java | 243 ++++++++++++++++-----
 .../DistinctCountMVAggregationFunction.java        |  31 +--
 .../DistinctRawBloomFilterAggregationFunction.java | 226 +++++++++++++++++++
 6 files changed, 609 insertions(+), 94 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index fc60ea6..b0db043 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -61,7 +61,8 @@ public enum AggregationFunctionType {
   PERCENTILEMV("percentileMV"),
   PERCENTILEESTMV("percentileEstMV"),
   PERCENTILETDIGESTMV("percentileTDigestMV"),
-  DISTINCT("distinct");
+  DISTINCT("distinct"),
+  DISTINCTRAWBLOOMFILTER("distinctRawBloomFilter");
 
   private final String _name;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 9c87921..8995952 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -19,17 +19,31 @@
 package org.apache.pinot.core.common;
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import com.google.common.base.Charsets;
 import com.google.common.primitives.Longs;
 import com.tdunning.math.stats.MergingDigest;
 import com.tdunning.math.stats.TDigest;
 import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleSet;
+import it.unimi.dsi.fastutil.floats.FloatIterator;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatSet;
 import it.unimi.dsi.fastutil.ints.IntIterator;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -68,7 +82,11 @@ public class ObjectSerDeUtils {
     DistinctTable(11),
     DataSketch(12),
     Geometry(13),
-    RoaringBitmap(14);
+    RoaringBitmap(14),
+    LongSet(15),
+    FloatSet(16),
+    DoubleSet(17),
+    BytesSet(18);
 
     private final int _value;
 
@@ -111,6 +129,14 @@ public class ObjectSerDeUtils {
         return ObjectType.Geometry;
       } else if (value instanceof RoaringBitmap) {
         return ObjectType.RoaringBitmap;
+      } else if (value instanceof LongSet) {
+        return ObjectType.LongSet;
+      } else if (value instanceof it.unimi.dsi.fastutil.floats.FloatSet) {
+        return ObjectType.FloatSet;
+      } else if (value instanceof it.unimi.dsi.fastutil.doubles.DoubleSet) {
+        return ObjectType.DoubleSet;
+      } else if (value instanceof ObjectSet) {
+        return ObjectType.BytesSet;
       } else {
         throw new IllegalArgumentException("Unsupported type of value: " + value.getClass().getSimpleName());
       }
@@ -452,6 +478,135 @@ public class ObjectSerDeUtils {
     }
   };
 
+  public static final ObjectSerDe<LongSet> LONG_SET_SER_DE = new ObjectSerDe<LongSet>() {
+
+    @Override
+    public byte[] serialize(LongSet longSet) {
+      int size = longSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      LongIterator iterator = longSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putLong(iterator.nextLong());
+      }
+      return bytes;
+    }
+
+    @Override
+    public LongSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public LongSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      LongSet longSet = new LongOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        longSet.add(byteBuffer.getLong());
+      }
+      return longSet;
+    }
+  };
+
+  public static final ObjectSerDe<FloatSet> FLOAT_SET_SER_DE = new ObjectSerDe<FloatSet>() {
+
+    @Override
+    public byte[] serialize(FloatSet floatSet) {
+      int size = floatSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      FloatIterator iterator = floatSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putFloat(iterator.nextFloat());
+      }
+      return bytes;
+    }
+
+    @Override
+    public FloatSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public FloatSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      FloatSet floatSet = new FloatOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        floatSet.add(byteBuffer.getLong());
+      }
+      return floatSet;
+    }
+  };
+
+  public static final ObjectSerDe<DoubleSet> DOUBLE_SET_SER_DE = new ObjectSerDe<DoubleSet>() {
+
+    @Override
+    public byte[] serialize(DoubleSet doubleSet) {
+      int size = doubleSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      DoubleIterator iterator = doubleSet.iterator();
+      while (iterator.hasNext()) {
+        byteBuffer.putDouble(iterator.nextDouble());
+      }
+      return bytes;
+    }
+
+    @Override
+    public DoubleSet deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public DoubleSet deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      DoubleSet doubleSet = new DoubleOpenHashSet(size);
+      for (int i = 0; i < size; i++) {
+        doubleSet.add(byteBuffer.getDouble());
+      }
+      return doubleSet;
+    }
+  };
+
+  public static final ObjectSerDe<ObjectSet<byte[]>> BYTES_SET_SER_DE = new ObjectSerDe<ObjectSet<byte[]>>() {
+
+    @Override
+    public byte[] serialize(ObjectSet bytesSet) {
+      int size = bytesSet.size();
+      byte[] bytes = new byte[Integer.BYTES + size * Long.BYTES];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(size);
+      ObjectIterator<byte[]> iterator = bytesSet.iterator();
+      while (iterator.hasNext()) {
+        byte[] val = iterator.next();
+        byteBuffer.putInt(val.length);
+        byteBuffer.put(val);
+      }
+      return bytes;
+    }
+
+    @Override
+    public ObjectSet<byte[]> deserialize(byte[] bytes) {
+      return deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    @Override
+    public ObjectSet<byte[]> deserialize(ByteBuffer byteBuffer) {
+      int size = byteBuffer.getInt();
+      ObjectOpenHashSet<byte[]> bytesSet = new ObjectOpenHashSet<>(size);
+      for (int i = 0; i < size; i++) {
+        int length = byteBuffer.getInt();
+        byte[] val = new byte[length];
+        byteBuffer.get(val);
+        bytesSet.add(val);
+      }
+      return bytesSet;
+    }
+  };
+
   public static final ObjectSerDe<TDigest> TDIGEST_SER_DE = new ObjectSerDe<TDigest>() {
 
     @Override
@@ -538,23 +693,8 @@ public class ObjectSerDeUtils {
 
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
-  private static final ObjectSerDe[] SER_DES = {
-      STRING_SER_DE,
-      LONG_SER_DE,
-      DOUBLE_SER_DE,
-      DOUBLE_ARRAY_LIST_SER_DE,
-      AVG_PAIR_SER_DE,
-      MIN_MAX_RANGE_PAIR_SER_DE,
-      HYPER_LOG_LOG_SER_DE,
-      QUANTILE_DIGEST_SER_DE,
-      MAP_SER_DE,
-      INT_SET_SER_DE,
-      TDIGEST_SER_DE,
-      DISTINCT_TABLE_SER_DE,
-      DATA_SKETCH_SER_DE,
-      GEOMETRY_SER_DE,
-      ROARING_BITMAP_SER_DE
-  };
+  private static final ObjectSerDe[] SER_DES =
+      {STRING_SER_DE, LONG_SER_DE, DOUBLE_SER_DE, DOUBLE_ARRAY_LIST_SER_DE, AVG_PAIR_SER_DE, MIN_MAX_RANGE_PAIR_SER_DE, HYPER_LOG_LOG_SER_DE, QUANTILE_DIGEST_SER_DE, MAP_SER_DE, INT_SET_SER_DE, TDIGEST_SER_DE, DISTINCT_TABLE_SER_DE, DATA_SKETCH_SER_DE, GEOMETRY_SER_DE, ROARING_BITMAP_SER_DE, LONG_SET_SER_DE, FLOAT_SET_SER_DE, DOUBLE_SET_SER_DE, BYTES_SET_SER_DE};
   //@formatter:on
 
   public static byte[] serialize(Object value) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
index 7fa6798..b83a202 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DictionaryBasedAggregationOperator.java
@@ -18,7 +18,13 @@
  */
 package org.apache.pinot.core.operator.query;
 
+import com.google.common.base.Charsets;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.AbstractCollection;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -77,36 +83,42 @@ public class DictionaryBasedAggregationOperator extends BaseOperator<Intermediat
               .add(new MinMaxRangePair(dictionary.getDoubleValue(0), dictionary.getDoubleValue(dictionarySize - 1)));
           break;
         case DISTINCTCOUNT:
-          IntOpenHashSet set = new IntOpenHashSet(dictionarySize);
+          AbstractCollection set;
           switch (dictionary.getValueType()) {
             case INT:
+              set = new IntOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
                 set.add(dictionary.getIntValue(dictId));
               }
               break;
             case LONG:
+              set = new LongOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Long.hashCode(dictionary.getLongValue(dictId)));
+                set.add(dictionary.getLongValue(dictId));
               }
               break;
             case FLOAT:
+              set = new FloatOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Float.hashCode(dictionary.getFloatValue(dictId)));
+                set.add(dictionary.getFloatValue(dictId));
               }
               break;
             case DOUBLE:
+              set = new DoubleOpenHashSet(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Double.hashCode(dictionary.getDoubleValue(dictId)));
+                set.add(dictionary.getDoubleValue(dictId));
               }
               break;
             case STRING:
+              set = new ObjectOpenHashSet<byte[]>(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(dictionary.getStringValue(dictId).hashCode());
+                set.add(dictionary.getStringValue(dictId).getBytes(Charsets.UTF_8));
               }
               break;
             case BYTES:
+              set = new ObjectOpenHashSet<byte[]>(dictionarySize);
               for (int dictId = 0; dictId < dictionarySize; dictId++) {
-                set.add(Arrays.hashCode(dictionary.getBytesValue(dictId)));
+                set.add(dictionary.getBytesValue(dictId));
               }
               break;
             default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
index e8e7e97..59a812f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java
@@ -18,8 +18,14 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
+import com.google.common.base.Charsets;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
-import java.util.Arrays;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.AbstractCollection;
+import java.util.Iterator;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -35,7 +41,7 @@ import org.roaringbitmap.PeekableIntIterator;
 import org.roaringbitmap.RoaringBitmap;
 
 
-public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> {
+public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<AbstractCollection, Integer> {
 
   public DistinctCountAggregationFunction(ExpressionContext expression) {
     super(expression);
@@ -69,9 +75,10 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       return;
     }
 
-    // For non-dictionary-encoded expression, store hash code of the values into the value set
-    IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
+    // For non-dictionary-encoded expression
     DataType valueType = blockValSet.getValueType();
+
+    AbstractCollection valueSet = getValueSet(aggregationResultHolder, valueType);
     switch (valueType) {
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
@@ -82,31 +89,31 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Long.hashCode(longValues[i]));
+          valueSet.add(longValues[i]);
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Float.hashCode(floatValues[i]));
+          valueSet.add(floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Double.hashCode(doubleValues[i]));
+          valueSet.add(doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(stringValues[i].hashCode());
+          valueSet.add(stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          valueSet.add(Arrays.hashCode(bytesValues[i]));
+          valueSet.add(bytesValues[i]);
         }
         break;
       default:
@@ -135,37 +142,37 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]);
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(longValues[i]);
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode());
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          getValueSet(groupByResultHolder, groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i]));
+          getValueSet(groupByResultHolder, groupKeyArray[i], valueType).add(bytesValues[i]);
         }
         break;
       default:
@@ -194,37 +201,37 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       case INT:
         int[] intValues = blockValSet.getIntValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], intValues[i]);
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], intValues[i]);
         }
         break;
       case LONG:
         long[] longValues = blockValSet.getLongValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Long.hashCode(longValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], (longValues[i]));
         }
         break;
       case FLOAT:
         float[] floatValues = blockValSet.getFloatValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Float.hashCode(floatValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], floatValues[i]);
         }
         break;
       case DOUBLE:
         double[] doubleValues = blockValSet.getDoubleValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Double.hashCode(doubleValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], doubleValues[i]);
         }
         break;
       case STRING:
         String[] stringValues = blockValSet.getStringValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode());
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], stringValues[i].getBytes(Charsets.UTF_8));
         }
         break;
       case BYTES:
         byte[][] bytesValues = blockValSet.getBytesValuesSV();
         for (int i = 0; i < length; i++) {
-          setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Arrays.hashCode(bytesValues[i]));
+          setValueForGroupKeys(groupByResultHolder, valueType, groupKeysArray[i], bytesValues[i]);
         }
         break;
       default:
@@ -233,10 +240,10 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   }
 
   @Override
-  public IntOpenHashSet extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+  public AbstractCollection extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
     Object result = aggregationResultHolder.getResult();
     if (result == null) {
-      return new IntOpenHashSet();
+      return emptyCollection();
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -244,15 +251,39 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       return convertToValueSet((DictIdsWrapper) result);
     } else {
       // For non-dictionary-encoded expression, directly return the value set
-      return (IntOpenHashSet) result;
+      return (AbstractCollection) result;
     }
   }
 
+  private AbstractCollection emptyCollection() {
+    return new AbstractCollection() {
+      @Override
+      public Iterator iterator() {
+        return new Iterator() {
+          @Override
+          public boolean hasNext() {
+            return false;
+          }
+
+          @Override
+          public Object next() {
+            return null;
+          }
+        };
+      }
+
+      @Override
+      public int size() {
+        return 0;
+      }
+    };
+  }
+
   @Override
-  public IntOpenHashSet extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+  public AbstractCollection extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
     Object result = groupByResultHolder.getResult(groupKey);
     if (result == null) {
-      return new IntOpenHashSet();
+      return emptyCollection();
     }
 
     if (result instanceof DictIdsWrapper) {
@@ -260,14 +291,52 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
       return convertToValueSet((DictIdsWrapper) result);
     } else {
       // For non-dictionary-encoded expression, directly return the value set
-      return (IntOpenHashSet) result;
+      return (AbstractCollection) result;
     }
   }
 
   @Override
-  public IntOpenHashSet merge(IntOpenHashSet intermediateResult1, IntOpenHashSet intermediateResult2) {
-    intermediateResult1.addAll(intermediateResult2);
-    return intermediateResult1;
+  public AbstractCollection merge(AbstractCollection intermediateResult1, AbstractCollection intermediateResult2) {
+    if (intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass())) {
+      intermediateResult1.addAll(intermediateResult2);
+      return intermediateResult1;
+    } else {
+      //handle backwards compatibility, we used to use IntHashSet for all datatypes earlier
+      //so we try to convert other types into int using hashcode
+      //Note this code path is executed only while brokers and servers are getting upgraded.
+      //When both are on the same version, they will satisfy the intermediateResult1.getClass().isAssignableFrom(intermediateResult2.getClass() condition
+      IntOpenHashSet intOpenHashSet;
+      AbstractCollection toMerge;
+      if (intermediateResult1 instanceof IntOpenHashSet) {
+        intOpenHashSet = (IntOpenHashSet) intermediateResult1;
+        toMerge = intermediateResult2;
+      } else {
+        intOpenHashSet = (IntOpenHashSet) intermediateResult2;
+        toMerge = intermediateResult1;
+      }
+      if (toMerge instanceof LongOpenHashSet) {
+        LongOpenHashSet longOpenHashSet = (LongOpenHashSet) toMerge;
+        for (long e : longOpenHashSet) {
+          intOpenHashSet.add(Long.hashCode(e));
+        }
+      } else if (toMerge instanceof FloatOpenHashSet) {
+        FloatOpenHashSet floatOpenHashSet = (FloatOpenHashSet) toMerge;
+        for (float e : floatOpenHashSet) {
+          intOpenHashSet.add(Float.hashCode(e));
+        }
+      } else if (toMerge instanceof DoubleOpenHashSet) {
+        DoubleOpenHashSet doubleOpenHashSet = (DoubleOpenHashSet) toMerge;
+        for (double e : doubleOpenHashSet) {
+          intOpenHashSet.add(Double.hashCode(e));
+        }
+      } else if (toMerge instanceof ObjectOpenHashSet) {
+        ObjectOpenHashSet objectOpenHashSet = (ObjectOpenHashSet) toMerge;
+        for (Object e : objectOpenHashSet) {
+          intOpenHashSet.add(e.hashCode());
+        }
+      }
+      return intOpenHashSet;
+    }
   }
 
   @Override
@@ -286,7 +355,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   }
 
   @Override
-  public Integer extractFinalResult(IntOpenHashSet intermediateResult) {
+  public Integer extractFinalResult(AbstractCollection intermediateResult) {
     return intermediateResult.size();
   }
 
@@ -306,15 +375,42 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   /**
    * Returns the value set from the result holder or creates a new one if it does not exist.
    */
-  protected static IntOpenHashSet getValueSet(AggregationResultHolder aggregationResultHolder) {
-    IntOpenHashSet valueSet = aggregationResultHolder.getResult();
+  protected static AbstractCollection getValueSet(AggregationResultHolder aggregationResultHolder, DataType valueType) {
+    AbstractCollection valueSet = aggregationResultHolder.getResult();
     if (valueSet == null) {
-      valueSet = new IntOpenHashSet();
+      valueSet = getAbstractCollection(valueType);
       aggregationResultHolder.setValue(valueSet);
     }
     return valueSet;
   }
 
+  private static AbstractCollection getAbstractCollection(DataType valueType) {
+    AbstractCollection valueSet;
+    switch (valueType) {
+      case INT:
+        valueSet = new IntOpenHashSet();
+        break;
+      case LONG:
+        valueSet = new LongOpenHashSet();
+        break;
+      case FLOAT:
+        valueSet = new FloatOpenHashSet();
+        break;
+      case DOUBLE:
+        valueSet = new DoubleOpenHashSet();
+        break;
+      case STRING:
+        valueSet = new ObjectOpenHashSet<byte[]>();
+        break;
+      case BYTES:
+        valueSet = new ObjectOpenHashSet<byte[]>();
+        break;
+      default:
+        throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
+    }
+    return valueSet;
+  }
+
   /**
    * Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist.
    */
@@ -331,10 +427,11 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   /**
    * Returns the value set for the given group key or creates a new one if it does not exist.
    */
-  protected static IntOpenHashSet getValueSet(GroupByResultHolder groupByResultHolder, int groupKey) {
-    IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey);
+  protected static AbstractCollection getValueSet(GroupByResultHolder groupByResultHolder, int groupKey,
+      DataType valueType) {
+    AbstractCollection valueSet = groupByResultHolder.getResult(groupKey);
     if (valueSet == null) {
-      valueSet = new IntOpenHashSet();
+      valueSet = getAbstractCollection(valueType);
       groupByResultHolder.setValueForKey(groupKey, valueSet);
     }
     return valueSet;
@@ -353,9 +450,38 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
   /**
    * Helper method to set value for the given group keys into the result holder.
    */
-  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) {
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys,
+      int value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys,
+      long value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys,
+      float value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys,
+      double value) {
+    for (int groupKey : groupKeys) {
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
+    }
+  }
+
+  private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, DataType valueType, int[] groupKeys,
+      byte[] value) {
     for (int groupKey : groupKeys) {
-      getValueSet(groupByResultHolder, groupKey).add(value);
+      getValueSet(groupByResultHolder, groupKey, valueType).add(value);
     }
   }
 
@@ -363,47 +489,56 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation
    * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded
    * expression.
    */
-  private static IntOpenHashSet convertToValueSet(DictIdsWrapper dictIdsWrapper) {
+  private static AbstractCollection convertToValueSet(DictIdsWrapper dictIdsWrapper) {
     Dictionary dictionary = dictIdsWrapper._dictionary;
     RoaringBitmap dictIdBitmap = dictIdsWrapper._dictIdBitmap;
-    IntOpenHashSet valueSet = new IntOpenHashSet(dictIdBitmap.getCardinality());
     PeekableIntIterator iterator = dictIdBitmap.getIntIterator();
     DataType valueType = dictionary.getValueType();
     switch (valueType) {
       case INT:
+        IntOpenHashSet intOpenHashSet = new IntOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(dictionary.getIntValue(iterator.next()));
+          intOpenHashSet.add(dictionary.getIntValue(iterator.next()));
         }
-        break;
+        return intOpenHashSet;
       case LONG:
+        LongOpenHashSet longOpenHashSet = new LongOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.next())));
+          longOpenHashSet.add(dictionary.getLongValue(iterator.next()));
         }
-        break;
+        return longOpenHashSet;
       case FLOAT:
+        FloatOpenHashSet floatOpenHashSet = new FloatOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.next())));
+          floatOpenHashSet.add(dictionary.getFloatValue(iterator.next()));
         }
-        break;
+        return floatOpenHashSet;
+
       case DOUBLE:
+        DoubleOpenHashSet doubleOpenHashSet = new DoubleOpenHashSet(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.next())));
+          doubleOpenHashSet.add(dictionary.getDoubleValue(iterator.next()));
         }
-        break;
+        return doubleOpenHashSet;
       case STRING:
+        ObjectOpenHashSet<byte[]> stringObjectOpenHashSet =
+            new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality());
         while (iterator.hasNext()) {
-          valueSet.add(dictionary.getStringValue(iterator.next()).hashCode());
+          stringObjectOpenHashSet.add(dictionary.getStringValue(iterator.next()).getBytes(Charsets.UTF_8));
         }
-        break;
+        return stringObjectOpenHashSet;
+
       case BYTES:
+        ObjectOpenHashSet<byte[]> bytesObjectOpenHashSet =
+            new ObjectOpenHashSet<byte[]>(dictIdBitmap.getCardinality());
+
         while (iterator.hasNext()) {
-          valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.next())));
+          bytesObjectOpenHashSet.add((dictionary.getBytesValue(iterator.next())));
         }
-        break;
+        return bytesObjectOpenHashSet;
       default:
         throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType);
     }
-    return valueSet;
   }
 
   private static final class DictIdsWrapper {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
index fb6b2e3..4c93181 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.query.aggregation.function;
 
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.util.AbstractCollection;
 import java.util.Map;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.core.common.BlockValSet;
@@ -58,8 +59,8 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
     }
 
     // For non-dictionary-encoded expression, store hash code of the values into the value set
-    IntOpenHashSet valueSet = getValueSet(aggregationResultHolder);
     FieldSpec.DataType valueType = blockValSet.getValueType();
+    AbstractCollection valueSet = getValueSet(aggregationResultHolder, valueType);
     switch (valueType) {
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();
@@ -126,7 +127,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
       case INT:
         int[][] intValues = blockValSet.getIntValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType);
           for (int value : intValues[i]) {
             valueSet.add(value);
           }
@@ -135,36 +136,36 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
       case LONG:
         long[][] longValues = blockValSet.getLongValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType);
           for (long value : longValues[i]) {
-            valueSet.add(Long.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case FLOAT:
         float[][] floatValues = blockValSet.getFloatValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType);
           for (float value : floatValues[i]) {
-            valueSet.add(Float.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case DOUBLE:
         double[][] doubleValues = blockValSet.getDoubleValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType);
           for (double value : doubleValues[i]) {
-            valueSet.add(Double.hashCode(value));
+            valueSet.add(value);
           }
         }
         break;
       case STRING:
         String[][] stringValues = blockValSet.getStringValuesMV();
         for (int i = 0; i < length; i++) {
-          IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]);
+          AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKeyArray[i], valueType);
           for (String value : stringValues[i]) {
-            valueSet.add(value.hashCode());
+            valueSet.add(value);
           }
         }
         break;
@@ -197,7 +198,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
         int[][] intValues = blockValSet.getIntValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType);
             for (int value : intValues[i]) {
               valueSet.add(value);
             }
@@ -208,7 +209,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
         long[][] longValues = blockValSet.getLongValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType);
             for (long value : longValues[i]) {
               valueSet.add(Long.hashCode(value));
             }
@@ -219,7 +220,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
         float[][] floatValues = blockValSet.getFloatValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType);
             for (float value : floatValues[i]) {
               valueSet.add(Float.hashCode(value));
             }
@@ -230,7 +231,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
         double[][] doubleValues = blockValSet.getDoubleValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType);
             for (double value : doubleValues[i]) {
               valueSet.add(Double.hashCode(value));
             }
@@ -241,7 +242,7 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation
         String[][] stringValues = blockValSet.getStringValuesMV();
         for (int i = 0; i < length; i++) {
           for (int groupKey : groupKeysArray[i]) {
-            IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey);
+            AbstractCollection valueSet = getValueSet(groupByResultHolder, groupKey, valueType);
             for (String value : stringValues[i]) {
               valueSet.add(value.hashCode());
             }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java
new file mode 100644
index 0000000..72e7edc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctRawBloomFilterAggregationFunction.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * The DISTINCT clause in SQL is executed as the DISTINCT aggregation function.
+ * TODO: Support group-by
+ */
+@SuppressWarnings("rawtypes")
+public class DistinctAggregationFunction implements AggregationFunction<DistinctTable, Comparable> {
+  private final List<ExpressionContext> _expressions;
+  private final String[] _columns;
+  private final List<OrderByExpressionContext> _orderByExpressions;
+  private final int _limit;
+
+  /**
+   * Constructor for the class.
+   *
+   * @param expressions Distinct columns to return
+   * @param orderByExpressions Order By clause
+   * @param limit Limit clause
+   */
+  public DistinctAggregationFunction(List<ExpressionContext> expressions,
+      @Nullable List<OrderByExpressionContext> orderByExpressions, int limit) {
+    _expressions = expressions;
+    int numExpressions = expressions.size();
+    _columns = new String[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      _columns[i] = expressions.get(i).toString();
+    }
+    _orderByExpressions = orderByExpressions;
+    _limit = limit;
+  }
+
+  public String[] getColumns() {
+    return _columns;
+  }
+
+  public List<OrderByExpressionContext> getOrderByExpressions() {
+    return _orderByExpressions;
+  }
+
+  public int getLimit() {
+    return _limit;
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.DISTINCT;
+  }
+
+  @Override
+  public String getColumnName() {
+    return AggregationFunctionType.DISTINCT.getName() + "_" + AggregationFunctionUtils.concatArgs(_columns);
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return AggregationFunctionType.DISTINCT.getName().toLowerCase() + "(" + AggregationFunctionUtils
+        .concatArgs(_columns) + ")";
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    return _expressions;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    int numBlockValSets = blockValSetMap.size();
+    int numExpressions = _expressions.size();
+    Preconditions
+        .checkState(numBlockValSets == numExpressions, "Size mismatch: numBlockValSets = %s, numExpressions = %s",
+            numBlockValSets, numExpressions);
+
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      blockValSets[i] = blockValSetMap.get(_expressions.get(i));
+    }
+
+    DistinctTable distinctTable = aggregationResultHolder.getResult();
+    if (distinctTable == null) {
+      ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
+      for (int i = 0; i < numExpressions; i++) {
+        columnDataTypes[i] = ColumnDataType.fromDataTypeSV(blockValSetMap.get(_expressions.get(i)).getValueType());
+      }
+      DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
+      distinctTable = new DistinctTable(dataSchema, _orderByExpressions, _limit);
+      aggregationResultHolder.setValue(distinctTable);
+    }
+
+    // TODO: Follow up PR will make few changes to start using DictionaryBasedAggregationOperator for DISTINCT queries
+    //       without filter.
+
+    if (distinctTable.hasOrderBy()) {
+      // With order-by, no need to check whether the DistinctTable is already satisfied
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        distinctTable.addWithOrderBy(new Record(blockValueFetcher.getRow(i)));
+      }
+    } else {
+      // Without order-by, early-terminate when the DistinctTable is already satisfied
+      if (distinctTable.isSatisfied()) {
+        return;
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        if (distinctTable.addWithoutOrderBy(new Record(blockValueFetcher.getRow(i)))) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public DistinctTable extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
+    DistinctTable distinctTable = aggregationResultHolder.getResult();
+    if (distinctTable != null) {
+      return distinctTable;
+    } else {
+      ColumnDataType[] columnDataTypes = new ColumnDataType[_columns.length];
+      // NOTE: Use STRING for unknown type
+      Arrays.fill(columnDataTypes, ColumnDataType.STRING);
+      return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderByExpressions, _limit);
+    }
+  }
+
+  /**
+   * NOTE: This method only handles merging of 2 main DistinctTables. It should not be used on Broker-side because it
+   *       does not support merging deserialized DistinctTables.
+   * <p>{@inheritDoc}
+   */
+  @Override
+  public DistinctTable merge(DistinctTable intermediateResult1, DistinctTable intermediateResult2) {
+    if (intermediateResult1.size() == 0) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2.size() != 0) {
+      intermediateResult1.mergeMainDistinctTable(intermediateResult2);
+    }
+    return intermediateResult1;
+  }
+
+  @Override
+  public boolean isIntermediateResultComparable() {
+    return false;
+  }
+
+  @Override
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
+  }
+
+  @Override
+  public ColumnDataType getFinalResultColumnType() {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+
+  @Override
+  public DistinctTable extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+
+  @Override
+  public Comparable extractFinalResult(DistinctTable intermediateResult) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org