You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/09 22:01:27 UTC

[pinot] branch master updated: Refactor single column distinct executors (#10859)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 880a074c7a Refactor single column distinct executors (#10859)
880a074c7a is described below

commit 880a074c7ae71ff3937171e504f6287d48c0fa7f
Author: Shen Yu <sh...@startree.ai>
AuthorDate: Fri Jun 9 15:01:19 2023 -0700

    Refactor single column distinct executors (#10859)
---
 ...eRawBigDecimalSingleColumnDistinctExecutor.java | 30 +++++++++++++++
 .../BaseRawBytesSingleColumnDistinctExecutor.java  | 30 +++++++++++++++
 .../BaseRawDoubleSingleColumnDistinctExecutor.java | 40 +++++++++++++++++++
 .../BaseRawFloatSingleColumnDistinctExecutor.java  | 40 +++++++++++++++++++
 .../BaseRawIntSingleColumnDistinctExecutor.java    | 40 +++++++++++++++++++
 .../BaseRawLongSingleColumnDistinctExecutor.java   | 40 +++++++++++++++++++
 .../BaseRawStringSingleColumnDistinctExecutor.java | 41 ++++++++++++++++++++
 ...BigDecimalSingleColumnDistinctOnlyExecutor.java | 30 ++-------------
 ...DecimalSingleColumnDistinctOrderByExecutor.java | 24 +-----------
 .../RawBytesSingleColumnDistinctOnlyExecutor.java  | 30 ++-------------
 ...awBytesSingleColumnDistinctOrderByExecutor.java | 37 +++++-------------
 .../RawDoubleSingleColumnDistinctOnlyExecutor.java | 45 ++--------------------
 ...wDoubleSingleColumnDistinctOrderByExecutor.java | 36 +----------------
 .../RawFloatSingleColumnDistinctOnlyExecutor.java  | 43 ++-------------------
 ...awFloatSingleColumnDistinctOrderByExecutor.java | 36 +----------------
 .../RawIntSingleColumnDistinctOnlyExecutor.java    | 43 ++-------------------
 .../RawIntSingleColumnDistinctOrderByExecutor.java | 36 +----------------
 .../RawLongSingleColumnDistinctOnlyExecutor.java   | 43 ++-------------------
 ...RawLongSingleColumnDistinctOrderByExecutor.java | 36 +----------------
 .../RawStringSingleColumnDistinctOnlyExecutor.java | 42 ++------------------
 ...wStringSingleColumnDistinctOrderByExecutor.java | 32 +--------------
 21 files changed, 303 insertions(+), 471 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBigDecimalSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBigDecimalSingleColumnDistinctExecutor.java
index 3dcf78e674..88229cd389 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBigDecimalSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBigDecimalSingleColumnDistinctExecutor.java
@@ -26,10 +26,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -63,4 +66,31 @@ public abstract class BaseRawBigDecimalSingleColumnDistinctExecutor implements D
     }
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    BigDecimal[] values = blockValueSet.getBigDecimalValuesSV();
+    int numDocs = valueBlock.getNumDocs();
+    if (_nullHandlingEnabled) {
+      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+      for (int i = 0; i < numDocs; i++) {
+        if (nullBitmap != null && nullBitmap.contains(i)) {
+          values[i] = null;
+        }
+        if (add(values[i])) {
+          return true;
+        }
+      }
+    } else {
+      for (int i = 0; i < numDocs; i++) {
+        if (add(values[i])) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(BigDecimal value);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java
index 870155bc3d..69c893fec3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java
@@ -25,11 +25,14 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -63,4 +66,31 @@ abstract class BaseRawBytesSingleColumnDistinctExecutor implements DistinctExecu
     }
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    byte[][] values = blockValueSet.getBytesValuesSV();
+    int numDocs = valueBlock.getNumDocs();
+    if (_nullHandlingEnabled) {
+      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+      for (int i = 0; i < numDocs; i++) {
+        if (nullBitmap != null && nullBitmap.contains(i)) {
+          values[i] = null;
+        }
+        if (add(values[i])) {
+          return true;
+        }
+      }
+    } else {
+      for (int i = 0; i < numDocs; i++) {
+        if (add(values[i])) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(byte[] value);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java
index 907b5c3b93..dbe102df31 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java
@@ -26,10 +26,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -68,4 +71,41 @@ abstract class BaseRawDoubleSingleColumnDistinctExecutor implements DistinctExec
     }
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    int numDocs = valueBlock.getNumDocs();
+    if (blockValueSet.isSingleValue()) {
+      double[] values = blockValueSet.getDoubleValuesSV();
+      if (_nullHandlingEnabled) {
+        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+        for (int i = 0; i < numDocs; i++) {
+          if (nullBitmap != null && nullBitmap.contains(i)) {
+            _hasNull = true;
+          } else if (add(values[i])) {
+            return true;
+          }
+        }
+      } else {
+        for (int i = 0; i < numDocs; i++) {
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      }
+    } else {
+      int[][] values = blockValueSet.getIntValuesMV();
+      for (int i = 0; i < numDocs; i++) {
+        for (double value : values[i]) {
+          if (add(value)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(double val);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java
index 9648b26a87..b518a54a6f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java
@@ -26,10 +26,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -69,4 +72,41 @@ abstract class BaseRawFloatSingleColumnDistinctExecutor implements DistinctExecu
     assert records.size() <= _limit;
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    int numDocs = valueBlock.getNumDocs();
+    if (blockValueSet.isSingleValue()) {
+      float[] values = blockValueSet.getFloatValuesSV();
+      if (_nullHandlingEnabled) {
+        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+        for (int i = 0; i < numDocs; i++) {
+          if (nullBitmap != null && nullBitmap.contains(i)) {
+            _hasNull = true;
+          } else if (add(values[i])) {
+            return true;
+          }
+        }
+      } else {
+        for (int i = 0; i < numDocs; i++) {
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      }
+    } else {
+      float[][] values = blockValueSet.getFloatValuesMV();
+      for (int i = 0; i < numDocs; i++) {
+        for (float value : values[i]) {
+          if (add(value)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(float val);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java
index c3b8413d33..d47b6b4368 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java
@@ -26,10 +26,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -70,4 +73,41 @@ abstract class BaseRawIntSingleColumnDistinctExecutor implements DistinctExecuto
     assert records.size() <= _limit;
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    int numDocs = valueBlock.getNumDocs();
+    if (blockValueSet.isSingleValue()) {
+      int[] values = blockValueSet.getIntValuesSV();
+      if (_nullHandlingEnabled) {
+        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+        for (int i = 0; i < numDocs; i++) {
+          if (nullBitmap != null && nullBitmap.contains(i)) {
+            _hasNull = true;
+          } else if (add(values[i])) {
+            return true;
+          }
+        }
+      } else {
+        for (int i = 0; i < numDocs; i++) {
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      }
+    } else {
+      int[][] values = blockValueSet.getIntValuesMV();
+      for (int i = 0; i < numDocs; i++) {
+        for (int value : values[i]) {
+          if (add(value)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(int val);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java
index a75d733c38..6bc3a8b3c6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java
@@ -26,10 +26,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -69,4 +72,41 @@ abstract class BaseRawLongSingleColumnDistinctExecutor implements DistinctExecut
     assert records.size() <= _limit;
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    int numDocs = valueBlock.getNumDocs();
+    if (blockValueSet.isSingleValue()) {
+      long[] values = blockValueSet.getLongValuesSV();
+      if (_nullHandlingEnabled) {
+        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+        for (int i = 0; i < numDocs; i++) {
+          if (nullBitmap != null && nullBitmap.contains(i)) {
+            _hasNull = true;
+          } else if (add(values[i])) {
+            return true;
+          }
+        }
+      } else {
+        for (int i = 0; i < numDocs; i++) {
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      }
+    } else {
+      long[][] values = blockValueSet.getLongValuesMV();
+      for (int i = 0; i < numDocs; i++) {
+        for (long value : values[i]) {
+          if (add(value)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(long val);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java
index 564a9c3e80..bb5cd0bb82 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java
@@ -25,10 +25,13 @@ import java.util.List;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -62,4 +65,42 @@ abstract class BaseRawStringSingleColumnDistinctExecutor implements DistinctExec
     }
     return new DistinctTable(dataSchema, records, _nullHandlingEnabled);
   }
+
+  @Override
+  public boolean process(ValueBlock valueBlock) {
+    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
+    int numDocs = valueBlock.getNumDocs();
+    if (blockValueSet.isSingleValue()) {
+      String[] values = blockValueSet.getStringValuesSV();
+      if (_nullHandlingEnabled) {
+        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
+        for (int i = 0; i < numDocs; i++) {
+          if (nullBitmap != null && nullBitmap.contains(i)) {
+            values[i] = null;
+          }
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      } else {
+        for (int i = 0; i < numDocs; i++) {
+          if (add(values[i])) {
+            return true;
+          }
+        }
+      }
+    } else {
+      String[][] values = blockValueSet.getStringValuesMV();
+      for (int i = 0; i < numDocs; i++) {
+        for (String value : values[i]) {
+          if (add(value)) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  protected abstract boolean add(String val);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java
index d33b4f4f2a..6f5bd46c83 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOnlyExecutor.java
@@ -20,11 +20,8 @@ package org.apache.pinot.core.query.distinct.raw;
 
 import java.math.BigDecimal;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -38,29 +35,8 @@ public class RawBigDecimalSingleColumnDistinctOnlyExecutor extends BaseRawBigDec
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    BigDecimal[] values = blockValueSet.getBigDecimalValuesSV();
-    int numDocs = valueBlock.getNumDocs();
-    if (_nullHandlingEnabled) {
-      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-      for (int i = 0; i < numDocs; i++) {
-        if (nullBitmap != null && nullBitmap.contains(i)) {
-          values[i] = null;
-        }
-        _valueSet.add(values[i]);
-        if (_valueSet.size() >= _limit) {
-          return true;
-        }
-      }
-    } else {
-      for (int i = 0; i < numDocs; i++) {
-        _valueSet.add(values[i]);
-        if (_valueSet.size() >= _limit) {
-          return true;
-        }
-      }
-    }
-    return false;
+  protected boolean add(BigDecimal value) {
+    _valueSet.add(value);
+    return _valueSet.size() >= _limit;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOrderByExecutor.java
index 9f53584922..99fa4d12a7 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBigDecimalSingleColumnDistinctOrderByExecutor.java
@@ -23,11 +23,8 @@ import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
 import java.math.BigDecimal;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -52,25 +49,7 @@ public class RawBigDecimalSingleColumnDistinctOrderByExecutor extends BaseRawBig
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    BigDecimal[] values = blockValueSet.getBigDecimalValuesSV();
-    int numDocs = valueBlock.getNumDocs();
-    if (_nullHandlingEnabled) {
-      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-      for (int i = 0; i < numDocs; i++) {
-        BigDecimal value = nullBitmap != null && nullBitmap.contains(i) ? null : values[i];
-        processInternal(value);
-      }
-    } else {
-      for (int i = 0; i < numDocs; i++) {
-        processInternal(values[i]);
-      }
-    }
-    return false;
-  }
-
-  private void processInternal(BigDecimal value) {
+  protected boolean add(BigDecimal value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit) {
         _valueSet.add(value);
@@ -85,5 +64,6 @@ public class RawBigDecimalSingleColumnDistinctOrderByExecutor extends BaseRawBig
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java
index 8ea7e72d6f..7490f8f1df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java
@@ -19,12 +19,9 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -38,29 +35,8 @@ public class RawBytesSingleColumnDistinctOnlyExecutor extends BaseRawBytesSingle
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    byte[][] values = blockValueSet.getBytesValuesSV();
-    int numDocs = valueBlock.getNumDocs();
-    if (_nullHandlingEnabled) {
-      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-      for (int i = 0; i < numDocs; i++) {
-        if (nullBitmap != null && nullBitmap.contains(i)) {
-          values[i] = null;
-        }
-        _valueSet.add(new ByteArray(values[i]));
-        if (_valueSet.size() >= _limit) {
-          return true;
-        }
-      }
-    } else {
-      for (int i = 0; i < numDocs; i++) {
-        _valueSet.add(new ByteArray(values[i]));
-        if (_valueSet.size() >= _limit) {
-          return true;
-        }
-      }
-    }
-    return false;
+  protected boolean add(byte[] value) {
+    _valueSet.add(new ByteArray(value));
+    return _valueSet.size() >= _limit;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java
index e41dc96e0d..e064420252 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java
@@ -22,12 +22,9 @@ import it.unimi.dsi.fastutil.PriorityQueue;
 import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.utils.ByteArray;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -52,38 +49,22 @@ public class RawBytesSingleColumnDistinctOrderByExecutor extends BaseRawBytesSin
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    byte[][] values = blockValueSet.getBytesValuesSV();
-    int numDocs = valueBlock.getNumDocs();
-    if (_nullHandlingEnabled) {
-      RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-      for (int i = 0; i < numDocs; i++) {
-        ByteArray value = nullBitmap != null && nullBitmap.contains(i) ? null : new ByteArray(values[i]);
-        processInternal(value);
-      }
-    } else {
-      for (int i = 0; i < numDocs; i++) {
-        processInternal(new ByteArray(values[i]));
-      }
-    }
-    return false;
-  }
-
-  private void processInternal(ByteArray value) {
-    if (!_valueSet.contains(value)) {
+  protected boolean add(byte[] value) {
+    ByteArray byteArray = new ByteArray(value);
+    if (!_valueSet.contains(byteArray)) {
       if (_valueSet.size() < _limit) {
-        _valueSet.add(value);
-        _priorityQueue.enqueue(value);
+        _valueSet.add(byteArray);
+        _priorityQueue.enqueue(byteArray);
       } else {
         ByteArray firstValue = _priorityQueue.first();
-        if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+        if (_priorityQueue.comparator().compare(byteArray, firstValue) > 0) {
           _valueSet.remove(firstValue);
-          _valueSet.add(value);
+          _valueSet.add(byteArray);
           _priorityQueue.dequeue();
-          _priorityQueue.enqueue(value);
+          _priorityQueue.enqueue(byteArray);
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
index 83e6852190..835af4f1c3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
@@ -19,11 +19,8 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -37,44 +34,8 @@ public class RawDoubleSingleColumnDistinctOnlyExecutor extends BaseRawDoubleSing
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      double[] values = blockValueSet.getDoubleValuesSV();
-      if (_nullHandlingEnabled) {
-        // TODO(nhejazi): consider having a separate set of classes to handle the case with null handling enabled.
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            _valueSet.add(values[i]);
-            if (_valueSet.size() >= _limit - (_hasNull ? 1 : 0)) {
-              return true;
-            }
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    } else {
-      // TODO(nhejazi): support proper null handling in multi-valued columns.
-      double[][] values = blockValueSet.getDoubleValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (double value : values[i]) {
-          _valueSet.add(value);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
+  protected boolean add(double value) {
+    _valueSet.add(value);
+    return _valueSet.size() >= _limit - (_hasNull ? 1 : 0);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java
index d335628fe5..9fbf439f9b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java
@@ -22,11 +22,8 @@ import it.unimi.dsi.fastutil.doubles.DoubleHeapPriorityQueue;
 import it.unimi.dsi.fastutil.doubles.DoublePriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -46,37 +43,7 @@ public class RawDoubleSingleColumnDistinctOrderByExecutor extends BaseRawDoubleS
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      double[] values = blockValueSet.getDoubleValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            add(values[i]);
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          add(values[i]);
-        }
-      }
-    } else {
-      double[][] values = blockValueSet.getDoubleValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (double value : values[i]) {
-          add(value);
-        }
-      }
-    }
-    return false;
-  }
-
-  private void add(double value) {
+  protected boolean add(double value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit - (_hasNull ? 1 : 0)) {
         _valueSet.add(value);
@@ -91,5 +58,6 @@ public class RawDoubleSingleColumnDistinctOrderByExecutor extends BaseRawDoubleS
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java
index 7df9d3d44a..412beb9785 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java
@@ -19,11 +19,8 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -37,42 +34,8 @@ public class RawFloatSingleColumnDistinctOnlyExecutor extends BaseRawFloatSingle
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      float[] values = blockValueSet.getFloatValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            _valueSet.add(values[i]);
-            if (_valueSet.size() >= _limit - (_hasNull ? 1 : 0)) {
-              return true;
-            }
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    } else {
-      float[][] values = blockValueSet.getFloatValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (float value : values[i]) {
-          _valueSet.add(value);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
+  protected boolean add(float value) {
+    _valueSet.add(value);
+    return _valueSet.size() >= _limit - (_hasNull ? 1 : 0);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java
index 751072d178..a91e806637 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java
@@ -22,11 +22,8 @@ import it.unimi.dsi.fastutil.floats.FloatHeapPriorityQueue;
 import it.unimi.dsi.fastutil.floats.FloatPriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -46,37 +43,7 @@ public class RawFloatSingleColumnDistinctOrderByExecutor extends BaseRawFloatSin
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      float[] values = blockValueSet.getFloatValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            add(values[i]);
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          add(values[i]);
-        }
-      }
-    } else {
-      float[][] values = blockValueSet.getFloatValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (float value : values[i]) {
-          add(value);
-        }
-      }
-    }
-    return false;
-  }
-
-  private void add(float value) {
+  protected boolean add(float value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit - (_hasNull ? 1 : 0)) {
         _valueSet.add(value);
@@ -91,5 +58,6 @@ public class RawFloatSingleColumnDistinctOrderByExecutor extends BaseRawFloatSin
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java
index 2433f08660..4cc130dcf3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java
@@ -19,11 +19,8 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -37,42 +34,8 @@ public class RawIntSingleColumnDistinctOnlyExecutor extends BaseRawIntSingleColu
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      int[] values = blockValueSet.getIntValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            _valueSet.add(values[i]);
-            if (_valueSet.size() >= _limit - (_hasNull ? 1 : 0)) {
-              return true;
-            }
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    } else {
-      int[][] values = blockValueSet.getIntValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (int value : values[i]) {
-          _valueSet.add(value);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
+  protected boolean add(int val) {
+    _valueSet.add(val);
+    return (_valueSet.size() >= _limit - (_hasNull ? 1 : 0));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java
index 00e02a66e7..3dc7869d57 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java
@@ -22,11 +22,8 @@ import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue;
 import it.unimi.dsi.fastutil.ints.IntPriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -46,37 +43,7 @@ public class RawIntSingleColumnDistinctOrderByExecutor extends BaseRawIntSingleC
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      int[] values = blockValueSet.getIntValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            add(values[i]);
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          add(values[i]);
-        }
-      }
-    } else {
-      int[][] values = blockValueSet.getIntValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (int value : values[i]) {
-          add(value);
-        }
-      }
-    }
-    return false;
-  }
-
-  private void add(int value) {
+  protected boolean add(int value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit - (_hasNull ? 1 : 0)) {
         _valueSet.add(value);
@@ -91,5 +58,6 @@ public class RawIntSingleColumnDistinctOrderByExecutor extends BaseRawIntSingleC
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java
index b063374bdf..39fae13f73 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java
@@ -19,11 +19,8 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -37,42 +34,8 @@ public class RawLongSingleColumnDistinctOnlyExecutor extends BaseRawLongSingleCo
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      long[] values = blockValueSet.getLongValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            _valueSet.add(values[i]);
-            if (_valueSet.size() >= _limit - (_hasNull ? 1 : 0)) {
-              return true;
-            }
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    } else {
-      long[][] values = blockValueSet.getLongValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (long value : values[i]) {
-          _valueSet.add(value);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
+  protected boolean add(long val) {
+    _valueSet.add(val);
+    return _valueSet.size() >= _limit - (_hasNull ? 1 : 0);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java
index 52cb3afd38..576f862453 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java
@@ -22,11 +22,8 @@ import it.unimi.dsi.fastutil.longs.LongHeapPriorityQueue;
 import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -46,37 +43,7 @@ public class RawLongSingleColumnDistinctOrderByExecutor extends BaseRawLongSingl
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      long[] values = blockValueSet.getLongValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            _hasNull = true;
-          } else {
-            add(values[i]);
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          add(values[i]);
-        }
-      }
-    } else {
-      long[][] values = blockValueSet.getLongValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (long value : values[i]) {
-          add(value);
-        }
-      }
-    }
-    return false;
-  }
-
-  private void add(long value) {
+  protected boolean add(long value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit - (_hasNull ? 1 : 0)) {
         _valueSet.add(value);
@@ -91,5 +58,6 @@ public class RawLongSingleColumnDistinctOrderByExecutor extends BaseRawLongSingl
         }
       }
     }
+    return false;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java
index cae7b3782c..97d57f0845 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java
@@ -19,11 +19,8 @@
 package org.apache.pinot.core.query.distinct.raw;
 
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -37,41 +34,8 @@ public class RawStringSingleColumnDistinctOnlyExecutor extends BaseRawStringSing
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      String[] values = blockValueSet.getStringValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          if (nullBitmap != null && nullBitmap.contains(i)) {
-            values[i] = null;
-          }
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          _valueSet.add(values[i]);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    } else {
-      String[][] values = blockValueSet.getStringValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (String value : values[i]) {
-          _valueSet.add(value);
-          if (_valueSet.size() >= _limit) {
-            return true;
-          }
-        }
-      }
-    }
-    return false;
+  protected boolean add(String value) {
+    _valueSet.add(value);
+    return _valueSet.size() >= _limit;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java
index 295fb6e20c..8471c60bb3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java
@@ -22,11 +22,8 @@ import it.unimi.dsi.fastutil.PriorityQueue;
 import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
-import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.query.distinct.DistinctExecutor;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.roaringbitmap.RoaringBitmap;
 
 
 /**
@@ -51,33 +48,7 @@ public class RawStringSingleColumnDistinctOrderByExecutor extends BaseRawStringS
   }
 
   @Override
-  public boolean process(ValueBlock valueBlock) {
-    BlockValSet blockValueSet = valueBlock.getBlockValueSet(_expression);
-    int numDocs = valueBlock.getNumDocs();
-    if (blockValueSet.isSingleValue()) {
-      String[] values = blockValueSet.getStringValuesSV();
-      if (_nullHandlingEnabled) {
-        RoaringBitmap nullBitmap = blockValueSet.getNullBitmap();
-        for (int i = 0; i < numDocs; i++) {
-          add(nullBitmap != null && nullBitmap.contains(i) ? null : values[i]);
-        }
-      } else {
-        for (int i = 0; i < numDocs; i++) {
-          add(values[i]);
-        }
-      }
-    } else {
-      String[][] values = blockValueSet.getStringValuesMV();
-      for (int i = 0; i < numDocs; i++) {
-        for (String value : values[i]) {
-          add(value);
-        }
-      }
-    }
-    return false;
-  }
-
-  private void add(String value) {
+  protected boolean add(String value) {
     if (!_valueSet.contains(value)) {
       if (_valueSet.size() < _limit) {
         _valueSet.add(value);
@@ -92,5 +63,6 @@ public class RawStringSingleColumnDistinctOrderByExecutor extends BaseRawStringS
         }
       }
     }
+    return false;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org