You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/05/12 19:31:45 UTC

[pinot] branch master updated: [feature] [null support # 8] Support null in all null related transform functions (#10594)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e1160272a [feature] [null support # 8] Support null in all null related transform functions (#10594)
9e1160272a is described below

commit 9e1160272aec88591d9e768d15104b72521b2ffb
Author: Yao Liu <ya...@startree.ai>
AuthorDate: Fri May 12 12:31:36 2023 -0700

    [feature] [null support # 8] Support null in all null related transform functions (#10594)
---
 .../transform/function/BaseTransformFunction.java  |   2 +
 .../transform/function/CaseTransformFunction.java  | 840 +++++++++++++++++----
 .../function/CoalesceTransformFunction.java        | 120 ++-
 .../function/IsNotNullTransformFunction.java       |  60 +-
 .../function/IsNullTransformFunction.java          |  51 +-
 .../function/BaseTransformFunctionTest.java        |  38 +
 .../function/CaseTransformFunctionTest.java        |  65 +-
 .../function/CoalesceTransformFunctionTest.java    | 451 ++---------
 .../NullHandlingTransformFunctionTest.java         |  38 +-
 9 files changed, 944 insertions(+), 721 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
index 24ede2b23b..84364a982f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunction.java
@@ -81,6 +81,8 @@ public abstract class BaseTransformFunction implements TransformFunction {
       new TransformResultMetadata(DataType.JSON, false, false);
   protected static final TransformResultMetadata BYTES_MV_NO_DICTIONARY_METADATA =
       new TransformResultMetadata(DataType.BYTES, false, false);
+  protected static final TransformResultMetadata UNKNOWN_METADATA =
+      new TransformResultMetadata(DataType.UNKNOWN, true, false);
 
   // These buffers are used to hold the result for different result types. When the subclass overrides a method, it can
   // reuse the buffer for that method. E.g. if transformToIntValuesSV is overridden, the result can be written into
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
index b9578f4b84..58d97d069a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunction.java
@@ -22,45 +22,46 @@ import com.google.common.base.Preconditions;
 import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
+import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.ColumnContext;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 
 
 /**
  * The <code>CaseTransformFunction</code> class implements the CASE-WHEN-THEN-ELSE transformation.
- *
- * The SQL Syntax is:
- *    CASE
- *        WHEN condition1 THEN result1
- *        WHEN condition2 THEN result2
- *        WHEN conditionN THEN resultN
- *        ELSE result
- *    END;
- *
- * Usage:
- *    case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N},
- *         ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
- *         ${ELSE_EXPRESSION})
- *
+ * <p>
+ * The SQL Syntax is: CASE WHEN condition1 THEN result1 WHEN condition2 THEN result2 WHEN conditionN THEN resultN ELSE
+ * result END;
+ * <p>
+ * Usage: case(${WHEN_STATEMENT_1}, ..., ${WHEN_STATEMENT_N}, ${THEN_EXPRESSION_1}, ..., ${THEN_EXPRESSION_N},
+ * ${ELSE_EXPRESSION})
+ * <p>
  * There are 2 * N + 1 arguments:
- *    <code>WHEN_STATEMENT_$i</code> is a <code>BinaryOperatorTransformFunction</code> represents
- *    <code>condition$i</code>
- *    <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> represents <code>result$i</code>
- *    <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> represents <code>result</code>
- *
+ * <code>WHEN_STATEMENT_$i</code> is a <code>BinaryOperatorTransformFunction</code> represents
+ * <code>condition$i</code>
+ * <code>THEN_EXPRESSION_$i</code> is a <code>TransformFunction</code> represents <code>result$i</code>
+ * <code>ELSE_EXPRESSION</code> is a <code>TransformFunction</code> represents <code>result</code>
+ * <p>
+ * ELSE_EXPRESSION can be omitted. When none of when statements is evaluated to be true, and there is no else
+ * expression, we output null. Note that when statement is considered as false if it is evaluated to be null.
  */
 public class CaseTransformFunction extends BaseTransformFunction {
   public static final String FUNCTION_NAME = "case";
 
   private List<TransformFunction> _whenStatements = new ArrayList<>();
-  private List<TransformFunction> _elseThenStatements = new ArrayList<>();
-  private boolean[] _selections;
-  private int _numSelections;
+  private List<TransformFunction> _thenStatements = new ArrayList<>();
+  private TransformFunction _elseStatement;
+
+  private boolean[] _computeThenStatements;
   private TransformResultMetadata _resultMetadata;
   private int[] _selectedResults;
 
@@ -71,17 +72,16 @@ public class CaseTransformFunction extends BaseTransformFunction {
 
   @Override
   public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
-    // Check that there are more than 1 arguments
-    if (arguments.size() % 2 != 1 || arguments.size() < 3) {
-      throw new IllegalArgumentException("At least 3 odd number of arguments are required for CASE-WHEN-ELSE function");
+    // Check that there are more than 2 arguments
+    // Else statement can be omitted.
+    if (arguments.size() < 2) {
+      throw new IllegalArgumentException("At least two arguments are required for CASE-WHEN function");
     }
     int numWhenStatements = arguments.size() / 2;
     _whenStatements = new ArrayList<>(numWhenStatements);
-    _elseThenStatements = new ArrayList<>(numWhenStatements + 1);
+    _thenStatements = new ArrayList<>(numWhenStatements);
     constructStatementList(arguments);
-    _selections = new boolean[_elseThenStatements.size()];
-    Collections.reverse(_elseThenStatements);
-    Collections.reverse(_whenStatements);
+    _computeThenStatements = new boolean[_thenStatements.size()];
     _resultMetadata = calculateResultMetadata();
   }
 
@@ -109,9 +109,11 @@ public class CaseTransformFunction extends BaseTransformFunction {
     // alternating WHEN and THEN clause, last one ELSE
     for (int i = 0; i < numWhenStatements; i++) {
       _whenStatements.add(arguments.get(i * 2));
-      _elseThenStatements.add(arguments.get(i * 2 + 1));
+      _thenStatements.add(arguments.get(i * 2 + 1));
+    }
+    if (arguments.size() % 2 != 0) {
+      _elseStatement = arguments.get(arguments.size() - 1);
     }
-    _elseThenStatements.add(arguments.get(arguments.size() - 1));
   }
 
   // TODO: Legacy format, this is here for backward compatibility support, remove after release 0.12
@@ -121,20 +123,22 @@ public class CaseTransformFunction extends BaseTransformFunction {
     for (int i = 0; i < numWhenStatements; i++) {
       _whenStatements.add(arguments.get(i));
     }
-    for (int i = numWhenStatements; i < numWhenStatements * 2 + 1; i++) {
-      _elseThenStatements.add(arguments.get(i));
+    for (int i = numWhenStatements; i < numWhenStatements * 2; i++) {
+      _thenStatements.add(arguments.get(i));
+    }
+    if (arguments.size() % 2 != 0) {
+      _elseStatement = arguments.get(arguments.size() - 1);
     }
   }
 
   private TransformResultMetadata calculateResultMetadata() {
-    TransformFunction elseStatement = _elseThenStatements.get(0);
-    TransformResultMetadata elseStatementResultMetadata = elseStatement.getResultMetadata();
+    TransformResultMetadata elseStatementResultMetadata = _elseStatement.getResultMetadata();
     DataType dataType = elseStatementResultMetadata.getDataType();
     Preconditions.checkState(elseStatementResultMetadata.isSingleValue(),
         "Unsupported multi-value expression in the ELSE clause");
-    int numThenStatements = _elseThenStatements.size() - 1;
+    int numThenStatements = _thenStatements.size();
     for (int i = 0; i < numThenStatements; i++) {
-      TransformFunction thenStatement = _elseThenStatements.get(i + 1);
+      TransformFunction thenStatement = _thenStatements.get(i);
       TransformResultMetadata thenStatementResultMetadata = thenStatement.getResultMetadata();
       if (!thenStatementResultMetadata.isSingleValue()) {
         throw new IllegalStateException("Unsupported multi-value expression in the THEN clause of index: " + i);
@@ -149,6 +153,7 @@ public class CaseTransformFunction extends BaseTransformFunction {
       // - FLOAT & DOUBLE -> DOUBLE
       // - Any numeric data type with BIG_DECIMAL -> BIG_DECIMAL
       // Use STRING to handle non-numeric types
+      // UNKNOWN data type is ignored unless all data types are unknown, we return unknown types.
       if (thenStatementDataType == dataType) {
         continue;
       }
@@ -165,6 +170,8 @@ public class CaseTransformFunction extends BaseTransformFunction {
             case BIG_DECIMAL:
               dataType = DataType.BIG_DECIMAL;
               break;
+            case UNKNOWN:
+              break;
             default:
               dataType = DataType.STRING;
               break;
@@ -172,7 +179,8 @@ public class CaseTransformFunction extends BaseTransformFunction {
           break;
         case LONG:
           switch (thenStatementDataType) {
-            case INT:
+            case INT: // fall through
+            case UNKNOWN:
               break;
             case FLOAT:
             case DOUBLE:
@@ -196,6 +204,8 @@ public class CaseTransformFunction extends BaseTransformFunction {
             case BIG_DECIMAL:
               dataType = DataType.BIG_DECIMAL;
               break;
+            case UNKNOWN:
+              break;
             default:
               dataType = DataType.STRING;
               break;
@@ -206,6 +216,7 @@ public class CaseTransformFunction extends BaseTransformFunction {
             case INT:
             case FLOAT:
             case LONG:
+            case UNKNOWN:
               break;
             case BIG_DECIMAL:
               dataType = DataType.BIG_DECIMAL;
@@ -221,12 +232,16 @@ public class CaseTransformFunction extends BaseTransformFunction {
             case FLOAT:
             case LONG:
             case DOUBLE:
+            case UNKNOWN:
               break;
             default:
               dataType = DataType.STRING;
               break;
           }
           break;
+        case UNKNOWN:
+          dataType = thenStatementDataType;
+          break;
         default:
           dataType = DataType.STRING;
           break;
@@ -241,226 +256,739 @@ public class CaseTransformFunction extends BaseTransformFunction {
   }
 
   /**
-   * Evaluate the ValueBlock for the WHEN statements, returns an array with the
-   * index(1 to N) of matched WHEN clause ordered by match priority, 0 means nothing
-   * matched, so go to ELSE.
+   * Evaluate the ValueBlock for the WHEN statements, returns an array with the index(1 to N) of matched WHEN clause -1
+   * means there is no match.
    */
-  private int[] getSelectedArray(ValueBlock valueBlock) {
+  private int[] getSelectedArray(ValueBlock valueBlock, boolean nullHandlingEnabled) {
     int numDocs = valueBlock.getNumDocs();
     if (_selectedResults == null || _selectedResults.length < numDocs) {
       _selectedResults = new int[numDocs];
-    } else {
-      Arrays.fill(_selectedResults, 0, numDocs, 0);
-      Arrays.fill(_selections, false);
     }
+    Arrays.fill(_selectedResults, -1);
+    Arrays.fill(_computeThenStatements, false);
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
     int numWhenStatements = _whenStatements.size();
-    for (int i = numWhenStatements - 1; i >= 0; i--) {
+    for (int i = 0; i < numWhenStatements; i++) {
       TransformFunction whenStatement = _whenStatements.get(i);
-      int[] conditions = whenStatement.transformToIntValuesSV(valueBlock);
-      for (int j = 0; j < numDocs & j < conditions.length; j++) {
-        _selectedResults[j] = Math.max(conditions[j] * (i + 1), _selectedResults[j]);
+      int[] conditions = getWhenConditions(whenStatement, valueBlock, nullHandlingEnabled);
+      for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+        if (conditions[docId] == 1) {
+          unselectedDocs.clear(docId);
+          _selectedResults[docId] = i;
+        }
+      }
+      if (unselectedDocs.isEmpty()) {
+        break;
       }
     }
     // try to prune clauses now
     for (int i = 0; i < numDocs; i++) {
-      _selections[_selectedResults[i]] = true;
-    }
-    int numSelections = 0;
-    for (boolean selection : _selections) {
-      if (selection) {
-        numSelections++;
+      if (_selectedResults[i] != -1) {
+        _computeThenStatements[_selectedResults[i]] = true;
       }
     }
-    _numSelections = numSelections;
     return _selectedResults;
   }
 
+  // Returns an array of valueBlock length to indicate whether a row is selected or not.
+  // When nullHandlingEnabled is set to true, we also check whether the row is null and set to false if null.
+  private static int[] getWhenConditions(TransformFunction whenStatement, ValueBlock valueBlock,
+      boolean nullHandlingEnabled) {
+    if (!nullHandlingEnabled) {
+      return whenStatement.transformToIntValuesSV(valueBlock);
+    }
+    Pair<int[], RoaringBitmap> result = whenStatement.transformToIntValuesSVWithNull(valueBlock);
+    RoaringBitmap bitmap = result.getRight();
+    int[] intResult = result.getLeft();
+    if (bitmap != null) {
+      for (int i : bitmap) {
+        intResult[i] = 0;
+      }
+    }
+    return intResult;
+  }
+
   @Override
   public int[] transformToIntValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.INT) {
       return super.transformToIntValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initIntValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        int[] intValues = transformFunction.transformToIntValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(intValues, 0, _intValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _intValuesSV[j] = intValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, int[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToIntValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _intValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _intValuesSV[docId] = (int) DataSchema.ColumnDataType.INT.getNullPlaceholder();
+        }
+      } else {
+        int[] intValuesSV = _elseStatement.transformToIntValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _intValuesSV[docId] = intValuesSV[docId];
         }
       }
     }
     return _intValuesSV;
   }
 
+  @Override
+  public Pair<int[], RoaringBitmap> transformToIntValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.INT) {
+      return super.transformToIntValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initIntValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<int[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToIntValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<int[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _intValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _intValuesSV[docId] = (int) DataSchema.ColumnDataType.INT.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<int[], RoaringBitmap> intValuesNullPair = _elseStatement.transformToIntValuesSVWithNull(valueBlock);
+        int[] intValues = intValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = intValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _intValuesSV[docId] = intValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_intValuesSV, bitmap);
+  }
+
   @Override
   public long[] transformToLongValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.LONG) {
       return super.transformToLongValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initLongValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        long[] longValues = transformFunction.transformToLongValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(longValues, 0, _longValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _longValuesSV[j] = longValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, long[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToLongValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _longValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _longValuesSV[docId] = (long) DataSchema.ColumnDataType.LONG.getNullPlaceholder();
+        }
+      } else {
+        long[] longValuesSV = _elseStatement.transformToLongValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _longValuesSV[docId] = longValuesSV[docId];
         }
       }
     }
     return _longValuesSV;
   }
 
+  @Override
+  public Pair<long[], RoaringBitmap> transformToLongValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.LONG) {
+      return super.transformToLongValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initLongValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<long[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToLongValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<long[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _longValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _longValuesSV[docId] = (long) DataSchema.ColumnDataType.LONG.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<long[], RoaringBitmap> longValuesNullPair = _elseStatement.transformToLongValuesSVWithNull(valueBlock);
+        long[] longValues = longValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = longValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _longValuesSV[docId] = longValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_longValuesSV, bitmap);
+  }
+
   @Override
   public float[] transformToFloatValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.FLOAT) {
       return super.transformToFloatValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initFloatValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        float[] floatValues = transformFunction.transformToFloatValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(floatValues, 0, _floatValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _floatValuesSV[j] = floatValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, float[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToFloatValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _floatValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _floatValuesSV[docId] = (float) DataSchema.ColumnDataType.FLOAT.getNullPlaceholder();
+        }
+      } else {
+        float[] floatValuesSV = _elseStatement.transformToFloatValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _floatValuesSV[docId] = floatValuesSV[docId];
         }
       }
     }
     return _floatValuesSV;
   }
 
+  @Override
+  public Pair<float[], RoaringBitmap> transformToFloatValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.FLOAT) {
+      return super.transformToFloatValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initFloatValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<float[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToFloatValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<float[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _floatValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _floatValuesSV[docId] = (float) DataSchema.ColumnDataType.FLOAT.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<float[], RoaringBitmap> floatValuesNullPair = _elseStatement.transformToFloatValuesSVWithNull(valueBlock);
+        float[] floatValues = floatValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = floatValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _floatValuesSV[docId] = floatValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_floatValuesSV, bitmap);
+  }
+
   @Override
   public double[] transformToDoubleValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.DOUBLE) {
       return super.transformToDoubleValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initDoubleValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        double[] doubleValues = transformFunction.transformToDoubleValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(doubleValues, 0, _doubleValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _doubleValuesSV[j] = doubleValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, double[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToDoubleValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _doubleValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _doubleValuesSV[docId] = (double) DataSchema.ColumnDataType.DOUBLE.getNullPlaceholder();
+        }
+      } else {
+        float[] doubleValuesSV = _elseStatement.transformToFloatValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _doubleValuesSV[docId] = doubleValuesSV[docId];
         }
       }
     }
     return _doubleValuesSV;
   }
 
+  @Override
+  public Pair<double[], RoaringBitmap> transformToDoubleValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.DOUBLE) {
+      return super.transformToDoubleValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initDoubleValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<double[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToDoubleValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<double[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _doubleValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _doubleValuesSV[docId] = (double) DataSchema.ColumnDataType.DOUBLE.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<double[], RoaringBitmap> doubleValuesNullPair =
+            _elseStatement.transformToDoubleValuesSVWithNull(valueBlock);
+        double[] doubleValues = doubleValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = doubleValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _doubleValuesSV[docId] = doubleValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_doubleValuesSV, bitmap);
+  }
+
   @Override
   public BigDecimal[] transformToBigDecimalValuesSV(ValueBlock valueBlock) {
-    if (_resultMetadata.getDataType() != DataType.BIG_DECIMAL) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.BIG_DECIMAL) {
       return super.transformToBigDecimalValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initBigDecimalValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(bigDecimalValues, 0, _bigDecimalValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _bigDecimalValuesSV[j] = bigDecimalValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, BigDecimal[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToBigDecimalValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _bigDecimalValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bigDecimalValuesSV[docId] = (BigDecimal) DataSchema.ColumnDataType.BIG_DECIMAL.getNullPlaceholder();
+        }
+      } else {
+        BigDecimal[] bigDecimalValuesSV = _elseStatement.transformToBigDecimalValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bigDecimalValuesSV[docId] = bigDecimalValuesSV[docId];
         }
       }
     }
     return _bigDecimalValuesSV;
   }
 
+  @Override
+  public Pair<BigDecimal[], RoaringBitmap> transformToBigDecimalValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.BIG_DECIMAL) {
+      return super.transformToBigDecimalValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initBigDecimalValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<BigDecimal[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToBigDecimalValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<BigDecimal[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _bigDecimalValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bigDecimalValuesSV[docId] = (BigDecimal) DataSchema.ColumnDataType.BIG_DECIMAL.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<BigDecimal[], RoaringBitmap> bigDecimalValuesNullPair =
+            _elseStatement.transformToBigDecimalValuesSVWithNull(valueBlock);
+        BigDecimal[] bigDecimalValues = bigDecimalValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = bigDecimalValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bigDecimalValuesSV[docId] = bigDecimalValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_bigDecimalValuesSV, bitmap);
+  }
+
   @Override
   public String[] transformToStringValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.STRING) {
       return super.transformToStringValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initStringValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        String[] stringValues = transformFunction.transformToStringValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(stringValues, 0, _stringValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _stringValuesSV[j] = stringValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, String[]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToStringValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _stringValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _stringValuesSV[docId] = (String) DataSchema.ColumnDataType.STRING.getNullPlaceholder();
+        }
+      } else {
+        String[] stringValuesSV = _elseStatement.transformToStringValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _stringValuesSV[docId] = stringValuesSV[docId];
         }
       }
     }
     return _stringValuesSV;
   }
 
+  @Override
+  public Pair<String[], RoaringBitmap> transformToStringValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.STRING) {
+      return super.transformToStringValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initStringValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<String[], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToStringValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<String[], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _stringValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _stringValuesSV[docId] = (String) DataSchema.ColumnDataType.STRING.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<String[], RoaringBitmap> stringValuesNullPair =
+            _elseStatement.transformToStringValuesSVWithNull(valueBlock);
+        String[] stringValues = stringValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = stringValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _stringValuesSV[docId] = stringValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_stringValuesSV, bitmap);
+  }
+
   @Override
   public byte[][] transformToBytesValuesSV(ValueBlock valueBlock) {
     if (_resultMetadata.getDataType().getStoredType() != DataType.BYTES) {
       return super.transformToBytesValuesSV(valueBlock);
     }
-    int[] selected = getSelectedArray(valueBlock);
+    int[] selected = getSelectedArray(valueBlock, false);
     int numDocs = valueBlock.getNumDocs();
     initBytesValuesSV(numDocs);
-    int numElseThenStatements = _elseThenStatements.size();
-    for (int i = 0; i < numElseThenStatements; i++) {
-      if (_selections[i]) {
-        TransformFunction transformFunction = _elseThenStatements.get(i);
-        byte[][] bytesValues = transformFunction.transformToBytesValuesSV(valueBlock);
-        if (_numSelections == 1) {
-          System.arraycopy(bytesValues, 0, _bytesValuesSV, 0, numDocs);
-        } else {
-          for (int j = 0; j < numDocs; j++) {
-            if (selected[j] == i) {
-              _bytesValuesSV[j] = bytesValues[j];
-            }
-          }
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, byte[][]> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToBytesValuesSV(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        _bytesValuesSV[docId] = thenStatementsIndexToValues.get(selected[docId])[docId];
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bytesValuesSV[docId] = (byte[]) DataSchema.ColumnDataType.BYTES.getNullPlaceholder();
+        }
+      } else {
+        byte[][] byteValuesSV = _elseStatement.transformToBytesValuesSV(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bytesValuesSV[docId] = byteValuesSV[docId];
         }
       }
     }
     return _bytesValuesSV;
   }
+
+  @Override
+  public Pair<byte[][], RoaringBitmap> transformToBytesValuesSVWithNull(ValueBlock valueBlock) {
+    if (_resultMetadata.getDataType().getStoredType() != DataType.BYTES) {
+      return super.transformToBytesValuesSVWithNull(valueBlock);
+    }
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    initStringValuesSV(numDocs);
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    Map<Integer, Pair<byte[][], RoaringBitmap>> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).transformToBytesValuesSVWithNull(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        Pair<byte[][], RoaringBitmap> nullValuePair = thenStatementsIndexToValues.get(selected[docId]);
+        _bytesValuesSV[docId] = nullValuePair.getLeft()[docId];
+        RoaringBitmap nullBitmap = nullValuePair.getRight();
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+        if (unselectedDocs.isEmpty()) {
+          break;
+        }
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bytesValuesSV[docId] = (byte[]) DataSchema.ColumnDataType.BYTES.getNullPlaceholder();
+          bitmap.add(docId);
+        }
+      } else {
+        Pair<byte[][], RoaringBitmap> bytesValuesNullPair = _elseStatement.transformToBytesValuesSVWithNull(valueBlock);
+        byte[][] byteValues = bytesValuesNullPair.getLeft();
+        RoaringBitmap nullBitmap = bytesValuesNullPair.getRight();
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          _bytesValuesSV[docId] = byteValues[docId];
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    return ImmutablePair.of(_bytesValuesSV, bitmap);
+  }
+
+  @Override
+  public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
+    int[] selected = getSelectedArray(valueBlock, true);
+    int numDocs = valueBlock.getNumDocs();
+    int numThenStatements = _thenStatements.size();
+    BitSet unselectedDocs = new BitSet();
+    unselectedDocs.set(0, numDocs);
+    final RoaringBitmap bitmap = new RoaringBitmap();
+    Map<Integer, RoaringBitmap> thenStatementsIndexToValues = new HashMap<>();
+    for (int i = 0; i < numThenStatements; i++) {
+      if (_computeThenStatements[i]) {
+        thenStatementsIndexToValues.put(i, _thenStatements.get(i).getNullBitmap(valueBlock));
+      }
+    }
+    for (int docId = 0; docId < numDocs; docId++) {
+      if (selected[docId] >= 0) {
+        RoaringBitmap nullBitmap = thenStatementsIndexToValues.get(selected[docId]);
+        if (nullBitmap != null && nullBitmap.contains(docId)) {
+          bitmap.add(docId);
+        }
+        unselectedDocs.clear(docId);
+      }
+    }
+    if (!unselectedDocs.isEmpty()) {
+      if (_elseStatement == null) {
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          bitmap.add(docId);
+        }
+      } else {
+        RoaringBitmap nullBitmap = _elseStatement.getNullBitmap(valueBlock);
+        for (int docId = unselectedDocs.nextSetBit(0); docId >= 0; docId = unselectedDocs.nextSetBit(docId + 1)) {
+          if (nullBitmap != null && nullBitmap.contains(docId)) {
+            bitmap.add(docId);
+          }
+        }
+      }
+    }
+    if (bitmap.isEmpty()) {
+      return null;
+    }
+    return bitmap;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java
index 322119f824..ba677c2579 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunction.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.function.TransformFunctionType;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.operator.ColumnContext;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
@@ -33,58 +34,41 @@ import org.roaringbitmap.RoaringBitmap;
 /**
  * The <code>CoalesceTransformFunction</code> implements the Coalesce operator.
  *
- * The results are in String format for first non-null value in the argument list.
- * If all arguments are null, return a 'null' string.
- * Note: arguments have to be column names and single type. The type can be either numeric or string.
+ * The result is first non-null value in the argument list.
+ * If all arguments are null, return null.
+ *
+ * Note: arguments have to be compatible type.
  * Number of arguments has to be greater than 0.
  *
  * Expected result:
  * Coalesce(nullColumn, columnA): columnA
- * Coalesce(columnA, nullColumn): nullColumn
- * Coalesce(nullColumnA, nullColumnB): "null"
+ * Coalesce(columnA, nullColumn): columnA
+ * Coalesce(nullColumnA, nullColumnB): null
  *
- * Note this operator only takes column names for now.
- * SQL Syntax:
- *    Coalesce(columnA, columnB)
  */
 public class CoalesceTransformFunction extends BaseTransformFunction {
-  public static final int NULL_INT = Integer.MIN_VALUE;
-  public static final long NULL_LONG = Long.MIN_VALUE;
-  public static final float NULL_FLOAT = Float.NEGATIVE_INFINITY;
-  public static final double NULL_DOUBLE = Double.NEGATIVE_INFINITY;
-  public static final BigDecimal NULL_BIG_DECIMAL = BigDecimal.valueOf(Long.MIN_VALUE);
-  public static final String NULL_STRING = "null";
-
   private TransformFunction[] _transformFunctions;
   private DataType _dataType;
   private TransformResultMetadata _resultMetadata;
 
   /**
-   * Returns a bit map of corresponding column.
-   * Returns an empty bitmap by default if null option is disabled.
+   * Returns a bit map of corresponding column. Returns an empty bitmap by default if null option is disabled.
    */
   private static RoaringBitmap[] getNullBitMaps(ValueBlock valueBlock, TransformFunction[] transformFunctions) {
     RoaringBitmap[] roaringBitmaps = new RoaringBitmap[transformFunctions.length];
     for (int i = 0; i < roaringBitmaps.length; i++) {
       TransformFunction func = transformFunctions[i];
-      if (func instanceof IdentifierTransformFunction) {
-        String columnName = ((IdentifierTransformFunction) func).getColumnName();
-        RoaringBitmap nullBitmap = valueBlock.getBlockValueSet(columnName).getNullBitmap();
-        roaringBitmaps[i] = nullBitmap;
-      } else {
-        // Consider literal as not null.
-        roaringBitmaps[i] = new RoaringBitmap();
-      }
+      roaringBitmaps[i] = func.getNullBitmap(valueBlock);
     }
     return roaringBitmaps;
   }
 
   /**
    * Get compatible data type of left and right.
-   *
    * When left or right is numerical, we check both data types are numerical and widen the type.
-   * Otherwise, left and right have to be the same type.
-   * @param left data type
+   * Otherwise, return string type.
+   *
+   * @param left  data type
    * @param right data type
    * @return compatible data type.
    */
@@ -104,6 +88,12 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
       }
       return DataType.INT;
     }
+    if (left == DataType.UNKNOWN) {
+      return right;
+    }
+    if (right == DataType.UNKNOWN) {
+      return left;
+    }
     return DataType.STRING;
   }
 
@@ -115,8 +105,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initIntValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    int[][] data = new int[width][length];
-    RoaringBitmap filledData = new RoaringBitmap();
+    int[][] data = new int[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -124,8 +113,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToIntValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -133,7 +121,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _intValuesSV[i] = NULL_INT;
+        _intValuesSV[i] = (int) DataSchema.ColumnDataType.INT.getNullPlaceholder();
       }
     }
     return _intValuesSV;
@@ -147,8 +135,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initLongValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    long[][] data = new long[width][length];
-    RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data.
+    long[][] data = new long[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -156,8 +143,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToLongValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -165,7 +151,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _longValuesSV[i] = NULL_LONG;
+        _longValuesSV[i] = (long) DataSchema.ColumnDataType.LONG.getNullPlaceholder();
       }
     }
     return _longValuesSV;
@@ -179,8 +165,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initFloatValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    float[][] data = new float[width][length];
-    RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data.
+    float[][] data = new float[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -188,8 +173,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToFloatValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -197,7 +181,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _floatValuesSV[i] = NULL_FLOAT;
+        _floatValuesSV[i] = (float) DataSchema.ColumnDataType.FLOAT.getNullPlaceholder();
       }
     }
     return _floatValuesSV;
@@ -211,8 +195,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initDoubleValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    double[][] data = new double[width][length];
-    RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data.
+    double[][] data = new double[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -220,8 +203,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToDoubleValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -229,7 +211,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _doubleValuesSV[i] = NULL_DOUBLE;
+        _doubleValuesSV[i] = (double) DataSchema.ColumnDataType.DOUBLE.getNullPlaceholder();
       }
     }
     return _doubleValuesSV;
@@ -243,8 +225,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initBigDecimalValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    BigDecimal[][] data = new BigDecimal[width][length];
-    RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data.
+    BigDecimal[][] data = new BigDecimal[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -252,8 +233,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToBigDecimalValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -261,7 +241,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _bigDecimalValuesSV[i] = NULL_BIG_DECIMAL;
+        _bigDecimalValuesSV[i] = (BigDecimal) DataSchema.ColumnDataType.BIG_DECIMAL.getNullPlaceholder();
       }
     }
     return _bigDecimalValuesSV;
@@ -275,8 +255,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     initStringValuesSV(length);
     int width = _transformFunctions.length;
     RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
-    String[][] data = new String[width][length];
-    RoaringBitmap filledData = new RoaringBitmap(); // indicates whether certain column has be filled in data.
+    String[][] data = new String[width][];
     for (int i = 0; i < length; i++) {
       boolean hasNonNullValue = false;
       for (int j = 0; j < width; j++) {
@@ -284,8 +263,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         if (nullBitMaps[j] != null && nullBitMaps[j].contains(i)) {
           continue;
         }
-        if (!filledData.contains(j)) {
-          filledData.add(j);
+        if (data[j] == null) {
           data[j] = _transformFunctions[j].transformToStringValuesSV(valueBlock);
         }
         hasNonNullValue = true;
@@ -293,7 +271,7 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
         break;
       }
       if (!hasNonNullValue) {
-        _stringValuesSV[i] = NULL_STRING;
+        _stringValuesSV[i] = (String) DataSchema.ColumnDataType.STRING.getNullPlaceholder();
       }
     }
     return _stringValuesSV;
@@ -311,9 +289,6 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     _transformFunctions = new TransformFunction[argSize];
     for (int i = 0; i < argSize; i++) {
       TransformFunction func = arguments.get(i);
-      Preconditions.checkArgument(
-          func instanceof IdentifierTransformFunction || func instanceof LiteralTransformFunction,
-          "Only column names and literals are supported in COALESCE.");
       DataType dataType = func.getResultMetadata().getDataType();
       if (_dataType != null) {
         _dataType = getCompatibleType(_dataType, dataType);
@@ -342,6 +317,9 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
       case STRING:
         _resultMetadata = STRING_SV_NO_DICTIONARY_METADATA;
         break;
+      case UNKNOWN:
+        _resultMetadata = UNKNOWN_METADATA;
+        break;
       default:
         throw new UnsupportedOperationException("Coalesce only supports numerical and string data type");
     }
@@ -399,4 +377,24 @@ public class CoalesceTransformFunction extends BaseTransformFunction {
     }
     return getStringTransformResults(valueBlock);
   }
+
+  @Override
+  public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
+    RoaringBitmap[] nullBitMaps = getNullBitMaps(valueBlock, _transformFunctions);
+    RoaringBitmap bitmap = nullBitMaps[0];
+    if (bitmap == null || bitmap.isEmpty()) {
+      return null;
+    }
+    for (int i = 1; i < nullBitMaps.length; i++) {
+      RoaringBitmap curBitmap = nullBitMaps[i];
+      if (curBitmap == null || curBitmap.isEmpty()) {
+        return null;
+      }
+      bitmap.and(curBitmap);
+    }
+    if (bitmap == null || bitmap.isEmpty()) {
+      return null;
+    }
+    return bitmap;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNotNullTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNotNullTransformFunction.java
index 538576dd51..5b73a62149 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNotNullTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNotNullTransformFunction.java
@@ -18,71 +18,17 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
-import com.google.common.base.Preconditions;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
 import org.apache.pinot.common.function.TransformFunctionType;
-import org.apache.pinot.core.operator.ColumnContext;
-import org.apache.pinot.core.operator.blocks.ValueBlock;
-import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
-import org.roaringbitmap.PeekableIntIterator;
 
 
-public class IsNotNullTransformFunction extends BaseTransformFunction {
-  private PeekableIntIterator _nullValueVectorIterator;
-
+public class IsNotNullTransformFunction extends IsNullTransformFunction {
   @Override
   public String getName() {
     return TransformFunctionType.IS_NOT_NULL.getName();
   }
 
   @Override
-  public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
-    Preconditions.checkArgument(arguments.size() == 1, "Exact 1 argument is required for IS_NOT_NULL");
-    TransformFunction transformFunction = arguments.get(0);
-    Preconditions.checkArgument(transformFunction instanceof IdentifierTransformFunction,
-        "Only column names are supported in IS_NOT_NULL. Support for functions is planned for future release");
-    String columnName = ((IdentifierTransformFunction) transformFunction).getColumnName();
-    ColumnContext columnContext = columnContextMap.get(columnName);
-    Preconditions.checkArgument(columnContext.getDataSource() != null,
-        "Column must be projected from the original table in IS_NOT_NULL");
-    NullValueVectorReader nullValueVectorReader = columnContext.getDataSource().getNullValueVector();
-    if (nullValueVectorReader != null) {
-      _nullValueVectorIterator = nullValueVectorReader.getNullBitmap().getIntIterator();
-    } else {
-      _nullValueVectorIterator = null;
-    }
-  }
-
-  @Override
-  public TransformResultMetadata getResultMetadata() {
-    return BOOLEAN_SV_NO_DICTIONARY_METADATA;
-  }
-
-  @Override
-  public int[] transformToIntValuesSV(ValueBlock valueBlock) {
-    int length = valueBlock.getNumDocs();
-    initIntValuesSV(length);
-    Arrays.fill(_intValuesSV, 1);
-    int[] docIds = valueBlock.getDocIds();
-    assert docIds != null;
-    if (_nullValueVectorIterator != null) {
-      int currentDocIdIndex = 0;
-      while (_nullValueVectorIterator.hasNext() & currentDocIdIndex < length) {
-        _nullValueVectorIterator.advanceIfNeeded(docIds[currentDocIdIndex]);
-        if (_nullValueVectorIterator.hasNext()) {
-          currentDocIdIndex = Arrays.binarySearch(docIds, currentDocIdIndex, length, _nullValueVectorIterator.next());
-          if (currentDocIdIndex >= 0) {
-            _intValuesSV[currentDocIdIndex] = 0;
-            currentDocIdIndex++;
-          } else {
-            currentDocIdIndex = -currentDocIdIndex - 1;
-          }
-        }
-      }
-    }
-    return _intValuesSV;
+  protected int getIsNullValue() {
+    return 0;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNullTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNullTransformFunction.java
index 13c28cb981..084fd635d0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNullTransformFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/IsNullTransformFunction.java
@@ -26,12 +26,12 @@ import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.core.operator.ColumnContext;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.transform.TransformResultMetadata;
-import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader;
-import org.roaringbitmap.PeekableIntIterator;
+import org.roaringbitmap.IntConsumer;
+import org.roaringbitmap.RoaringBitmap;
 
 
 public class IsNullTransformFunction extends BaseTransformFunction {
-  private PeekableIntIterator _nullValueVectorIterator;
+  private TransformFunction _transformFunction;
 
   @Override
   public String getName() {
@@ -41,19 +41,7 @@ public class IsNullTransformFunction extends BaseTransformFunction {
   @Override
   public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
     Preconditions.checkArgument(arguments.size() == 1, "Exact 1 argument is required for IS_NULL");
-    TransformFunction transformFunction = arguments.get(0);
-    Preconditions.checkArgument(transformFunction instanceof IdentifierTransformFunction,
-        "Only column names are supported in IS_NULL. Support for functions is planned for future release");
-    String columnName = ((IdentifierTransformFunction) transformFunction).getColumnName();
-    ColumnContext columnContext = columnContextMap.get(columnName);
-    Preconditions.checkArgument(columnContext.getDataSource() != null,
-        "Column must be projected from the original table in IS_NULL");
-    NullValueVectorReader nullValueVectorReader = columnContext.getDataSource().getNullValueVector();
-    if (nullValueVectorReader != null) {
-      _nullValueVectorIterator = nullValueVectorReader.getNullBitmap().getIntIterator();
-    } else {
-      _nullValueVectorIterator = null;
-    }
+    _transformFunction = arguments.get(0);
   }
 
   @Override
@@ -63,25 +51,22 @@ public class IsNullTransformFunction extends BaseTransformFunction {
 
   @Override
   public int[] transformToIntValuesSV(ValueBlock valueBlock) {
+    RoaringBitmap bitmap = _transformFunction.getNullBitmap(valueBlock);
     int length = valueBlock.getNumDocs();
-    initZeroFillingIntValuesSV(length);
-    int[] docIds = valueBlock.getDocIds();
-    assert docIds != null;
-    if (_nullValueVectorIterator != null) {
-      int currentDocIdIndex = 0;
-      while (_nullValueVectorIterator.hasNext() & currentDocIdIndex < length) {
-        _nullValueVectorIterator.advanceIfNeeded(docIds[currentDocIdIndex]);
-        if (_nullValueVectorIterator.hasNext()) {
-          currentDocIdIndex = Arrays.binarySearch(docIds, currentDocIdIndex, length, _nullValueVectorIterator.next());
-          if (currentDocIdIndex >= 0) {
-            _intValuesSV[currentDocIdIndex] = 1;
-            currentDocIdIndex++;
-          } else {
-            currentDocIdIndex = -currentDocIdIndex - 1;
-          }
-        }
-      }
+    initIntValuesSV(length);
+    Arrays.fill(_intValuesSV, getIsNullValue() ^ 1);
+    if (bitmap != null) {
+      bitmap.forEach((IntConsumer) i -> _intValuesSV[i] = getIsNullValue());
     }
     return _intValuesSV;
   }
+
+  @Override
+  public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
+    return null;
+  }
+
+  protected int getIsNullValue() {
+    return 1;
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 0dfd26d417..641d6a6422 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -80,6 +80,8 @@ public abstract class BaseTransformFunctionTest {
   protected static final String DOUBLE_SV_COLUMN = "doubleSV";
   protected static final String BIG_DECIMAL_SV_COLUMN = "bigDecimalSV";
   protected static final String STRING_SV_COLUMN = "stringSV";
+  protected static final String STRING_SV_NULL_COLUMN = "stringSVNull";
+
   protected static final String BYTES_SV_COLUMN = "bytesSV";
   protected static final String STRING_ALPHANUM_SV_COLUMN = "stringAlphaNumSV";
 
@@ -173,6 +175,11 @@ public abstract class BaseTransformFunctionTest {
       map.put(DOUBLE_SV_COLUMN, _doubleSVValues[i]);
       map.put(BIG_DECIMAL_SV_COLUMN, _bigDecimalSVValues[i]);
       map.put(STRING_SV_COLUMN, _stringSVValues[i]);
+      if (i % 2 == 0) {
+        map.put(STRING_SV_NULL_COLUMN, _stringSVValues[i]);
+      } else {
+        map.put(STRING_SV_NULL_COLUMN, null);
+      }
       map.put(STRING_ALPHANUM_SV_COLUMN, _stringAlphaNumericSVValues[i]);
       if (i % 2 == 0) {
         map.put(STRING_ALPHANUM_NULL_SV_COLUMN, _stringAlphaNumericSVValues[i]);
@@ -208,6 +215,7 @@ public abstract class BaseTransformFunctionTest {
         .addSingleValueDimension(DOUBLE_SV_COLUMN, FieldSpec.DataType.DOUBLE)
         .addMetric(BIG_DECIMAL_SV_COLUMN, FieldSpec.DataType.BIG_DECIMAL)
         .addSingleValueDimension(STRING_SV_COLUMN, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_SV_NULL_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_ALPHANUM_SV_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(STRING_ALPHANUM_NULL_SV_COLUMN, FieldSpec.DataType.STRING)
         .addSingleValueDimension(BYTES_SV_COLUMN, FieldSpec.DataType.BYTES)
@@ -320,6 +328,36 @@ public abstract class BaseTransformFunctionTest {
     testNullBitmap(transformFunction, null);
   }
 
+  protected void testTransformFunctionWithNull(TransformFunction transformFunction, long[] expectedValues,
+      RoaringBitmap expectedNull) {
+    Pair<int[], RoaringBitmap> intValues = transformFunction.transformToIntValuesSVWithNull(_projectionBlock);
+    Pair<long[], RoaringBitmap> longValues = transformFunction.transformToLongValuesSVWithNull(_projectionBlock);
+    Pair<float[], RoaringBitmap> floatValues = transformFunction.transformToFloatValuesSVWithNull(_projectionBlock);
+    Pair<double[], RoaringBitmap> doubleValues = transformFunction.transformToDoubleValuesSVWithNull(_projectionBlock);
+    Pair<BigDecimal[], RoaringBitmap> bigDecimalValues =
+        transformFunction.transformToBigDecimalValuesSVWithNull(_projectionBlock);
+    Pair<String[], RoaringBitmap> stringValues = transformFunction.transformToStringValuesSVWithNull(_projectionBlock);
+    assertEquals(intValues.getRight(), expectedNull);
+    assertEquals(longValues.getRight(), expectedNull);
+    assertEquals(floatValues.getRight(), expectedNull);
+    assertEquals(doubleValues.getRight(), expectedNull);
+    assertEquals(bigDecimalValues.getRight(), expectedNull);
+    assertEquals(stringValues.getRight(), expectedNull);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull.contains(i)) {
+        continue;
+      }
+      // only compare the rows that are not null.
+      assertEquals(intValues.getLeft()[i], (int) expectedValues[i]);
+      assertEquals(longValues.getLeft()[i], expectedValues[i]);
+      assertEquals(floatValues.getLeft()[i], (float) expectedValues[i]);
+      assertEquals(doubleValues.getLeft()[i], (double) expectedValues[i]);
+      assertEquals(bigDecimalValues.getLeft()[i].longValue(), expectedValues[i]);
+      assertEquals(stringValues.getLeft()[i], Long.toString(expectedValues[i]));
+    }
+    testNullBitmap(transformFunction, expectedNull);
+  }
+
   protected void testTransformFunction(TransformFunction transformFunction, float[] expectedValues) {
     int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
index cacf6d834b..27529d86e1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CaseTransformFunctionTest.java
@@ -26,6 +26,7 @@ import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -55,7 +56,8 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
   }
 
   @Test(dataProvider = "params")
-  public void testCasePriorityObserved(String column, int threshold1, int threshold2, int threshold3) {
+  public void testCasePriorityObserved(String column, int threshold1, int threshold2, int threshold3)
+      throws Exception {
     String statement =
         String.format("CASE WHEN %s > %d THEN 3 WHEN %s > %d THEN 2 WHEN %s > %d THEN 1 ELSE -1 END", column,
             threshold1, column, threshold2, column, threshold3);
@@ -81,6 +83,7 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
               : _doubleSVValues[i] > threshold2 ? 2 : _doubleSVValues[i] > threshold3 ? 1 : -1;
           break;
         default:
+          throw new Exception("Unsupported column type:" + column);
       }
     }
     int[] intValues = transformFunction.transformToIntValuesSV(_projectionBlock);
@@ -94,7 +97,6 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
     testCaseQueryWithLongResults("true", expectedLongResults);
     Arrays.fill(expectedLongResults, 10);
     testCaseQueryWithLongResults("false", expectedLongResults);
-
     for (TransformFunctionType functionType : BINARY_OPERATOR_TRANSFORM_FUNCTIONS) {
       testCaseQueryWithLongResults(String.format("%s(%s, %s)", functionType.getName(), INT_SV_COLUMN,
           String.format("%d", _intSVValues[INDEX_TO_COMPARE])), getExpectedLongResults(INT_SV_COLUMN, functionType));
@@ -110,6 +112,25 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
               String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
           getExpectedLongResults(STRING_SV_COLUMN, functionType));
     }
+    RoaringBitmap bitmap = new RoaringBitmap();
+    for (int i = 0; i < NUM_ROWS; i++) {
+      if (i % 2 == 0 && _intSVValues[i] > 0) {
+        expectedLongResults[i] = 100;
+      } else {
+        bitmap.add(i);
+      }
+    }
+    testCaseQueryWithLongResultsNull(String.format("greater_than(%s, 0)", INT_SV_NULL_COLUMN), expectedLongResults,
+        bitmap);
+  }
+
+  @Test
+  public void testCaseTransformFunctionWithNullLiterals() {
+    long[] expectedValues = new long[NUM_ROWS];
+    RoaringBitmap bitmap = new RoaringBitmap();
+    bitmap.add(0L, NUM_ROWS);
+    testCaseQueryWithLongResultsAllNull(String.format("greater_than(%s, 0)", INT_SV_NULL_COLUMN), expectedValues,
+        bitmap);
   }
 
   @Test
@@ -136,6 +157,16 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
               String.format("'%s'", _stringSVValues[INDEX_TO_COMPARE])),
           getExpectedDoubleResults(STRING_SV_COLUMN, functionType));
     }
+    RoaringBitmap bitmap = new RoaringBitmap();
+    for (int i = 0; i < NUM_ROWS; i++) {
+      if (i % 2 == 0 && _intSVValues[i] > 0) {
+        expectedFloatResults[i] = 100;
+      } else {
+        bitmap.add(i);
+      }
+    }
+    testCaseQueryWithDoubleResultsNull(String.format("greater_than(%s, 0)", INT_SV_NULL_COLUMN), expectedFloatResults,
+        bitmap);
   }
 
   @Test
@@ -206,6 +237,26 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
     testTransformFunction(transformFunction, expectedValues);
   }
 
+  private void testCaseQueryWithLongResultsNull(String predicate, long[] expectedValues, RoaringBitmap bitmap) {
+    ExpressionContext expression =
+        RequestContextUtils.getExpression(String.format("CASE WHEN %s THEN 100 END", predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    assertEquals(transformFunction.getResultMetadata().getDataType(), DataType.LONG);
+    testTransformFunctionWithNull(transformFunction, expectedValues, bitmap);
+  }
+
+  private void testCaseQueryWithLongResultsAllNull(String predicate, long[] expectedValues, RoaringBitmap bitmap) {
+    ExpressionContext expression =
+        RequestContextUtils.getExpression(String.format("CASE WHEN %s THEN NULL END", predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    assertEquals(transformFunction.getResultMetadata().getDataType(), DataType.UNKNOWN);
+    testTransformFunctionWithNull(transformFunction, expectedValues, bitmap);
+  }
+
   private void testCaseQueryWithDoubleResults(String predicate, double[] expectedValues) {
     ExpressionContext expression =
         RequestContextUtils.getExpression(String.format("CASE WHEN %s THEN 100.0 ELSE 10.0 END", predicate));
@@ -216,6 +267,16 @@ public class CaseTransformFunctionTest extends BaseTransformFunctionTest {
     testTransformFunction(transformFunction, expectedValues);
   }
 
+  private void testCaseQueryWithDoubleResultsNull(String predicate, double[] expectedValues, RoaringBitmap bitmap) {
+    ExpressionContext expression =
+        RequestContextUtils.getExpression(String.format("CASE WHEN %s THEN 100.0 END", predicate));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    Assert.assertTrue(transformFunction instanceof CaseTransformFunction);
+    assertEquals(transformFunction.getName(), CaseTransformFunction.FUNCTION_NAME);
+    assertEquals(transformFunction.getResultMetadata().getDataType(), DataType.DOUBLE);
+    testTransformFunctionWithNull(transformFunction, expectedValues, bitmap);
+  }
+
   private void testCaseQueryWithBigDecimalResults(String predicate, BigDecimal[] expectedValues) {
     // Note: defining decimal literals within quotes preserves precision.
     ExpressionContext expression = RequestContextUtils.getExpression(
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java
index 7326acb350..0c9f802229 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/CoalesceTransformFunctionTest.java
@@ -18,456 +18,89 @@
  */
 package org.apache.pinot.core.operator.transform.function;
 
-import java.io.File;
-import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.RequestContextUtils;
-import org.apache.pinot.core.operator.DocIdSetOperator;
-import org.apache.pinot.core.operator.ProjectionOperator;
-import org.apache.pinot.core.operator.blocks.ProjectionBlock;
-import org.apache.pinot.core.operator.filter.MatchAllFilterOperator;
-import org.apache.pinot.core.plan.DocIdSetPlanNode;
-import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
-import org.apache.pinot.segment.spi.IndexSegment;
-import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.segment.spi.datasource.DataSource;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.ReadMode;
-import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.roaringbitmap.RoaringBitmap;
 import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 
 public class CoalesceTransformFunctionTest extends BaseTransformFunctionTest {
-  private static final String ENABLE_NULL_SEGMENT_NAME = "testSegment1";
-  private static final String DISABLE_NULL_SEGMENT_NAME = "testSegment2";
-  private static final Random RANDOM = new Random();
-
-  private static final int NUM_ROWS = 1000;
-  private static final String INT_SV_COLUMN1 = "intSV1";
-  private static final String INT_SV_COLUMN2 = "intSV2";
-  private static final String STRING_SV_COLUMN1 = "StringSV1";
-  private static final String STRING_SV_COLUMN2 = "StringSV2";
-  private static final String BIG_DECIMAL_SV_COLUMN1 = "BigDecimalSV1";
-  private static final String BIG_DECIMAL_SV_COLUMN2 = "BigDecimalSV2";
-  private static final String LONG_SV_COLUMN1 = "LongSV1";
-  private static final String LONG_SV_COLUMN2 = "LongSV2";
-  private static final String DOUBLE_SV_COLUMN1 = "DoubleSV1";
-  private static final String DOUBLE_SV_COLUMN2 = "DoubleSV2";
-
-  private static final String FLOAT_SV_COLUMN1 = "FloatSV1";
-  private static final String FLOAT_SV_COLUMN2 = "FLoatSV2";
-  private final int[] _intSVValues = new int[NUM_ROWS];
-  private final double[] _doubleValues = new double[NUM_ROWS];
-  private final float[] _floatValues = new float[NUM_ROWS];
-  private final String[] _stringSVValues = new String[NUM_ROWS];
-  private Map<String, DataSource> _enableNullDataSourceMap;
-  private Map<String, DataSource> _disableNullDataSourceMap;
-  private ProjectionBlock _enableNullProjectionBlock;
-  private ProjectionBlock _disableNullProjectionBlock;
-  // Mod decides whether the first column of the same type should be null.
-  private static final int NULL_MOD1 = 3;
-  // Mod decides whether the second column of the same type should be null.
-  private static final int NULL_MOD2 = 5;
-  // Difference between two same type numeric columns.
-  private static final int INT_VALUE_SHIFT = 2;
-  private static final double DOUBLE_VALUE_SHIFT = 0.1;
-  private static final float FLOAT_VALUE_SHIFT = 0.1f;
-
-  // Suffix for second string column.
-  private static final String SUFFIX = "column2";
-
-  private static String getIndexDirPath(String segmentName) {
-    return FileUtils.getTempDirectoryPath() + File.separator + segmentName;
-  }
-
-  private static Map<String, DataSource> getDataSourceMap(Schema schema, List<GenericRow> rows, String segmentName)
-      throws Exception {
-    TableConfig tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(segmentName).setNullHandlingEnabled(true).build();
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
-    config.setOutDir(getIndexDirPath(segmentName));
-    config.setSegmentName(segmentName);
-    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
-    driver.init(config, new GenericRowRecordReader(rows));
-    driver.build();
-    IndexSegment indexSegment =
-        ImmutableSegmentLoader.load(new File(getIndexDirPath(segmentName), segmentName), ReadMode.heap);
-    Set<String> columnNames = indexSegment.getPhysicalColumnNames();
-    Map<String, DataSource> enableNullDataSourceMap = new HashMap<>(columnNames.size());
-    for (String columnName : columnNames) {
-      enableNullDataSourceMap.put(columnName, indexSegment.getDataSource(columnName));
-    }
-    return enableNullDataSourceMap;
-  }
-
-  private static ProjectionBlock getProjectionBlock(Map<String, DataSource> dataSourceMap) {
-    return new ProjectionOperator(dataSourceMap,
-        new DocIdSetOperator(new MatchAllFilterOperator(NUM_ROWS), DocIdSetPlanNode.MAX_DOC_PER_CALL)).nextBlock();
-  }
-
-  private static boolean isColumn1Null(int i) {
-    return i % NULL_MOD1 == 0;
-  }
-
-  private static boolean isColumn2Null(int i) {
-    return i % NULL_MOD2 == 0;
-  }
-
-  @BeforeClass
-  public void setup()
-      throws Exception {
-    // Set up two tables: one with null option enable, the other with null option disable.
-    // Each table one string column, and one int column with some rows set to null.
-    FileUtils.deleteQuietly(new File(getIndexDirPath(DISABLE_NULL_SEGMENT_NAME)));
-    FileUtils.deleteQuietly(new File(getIndexDirPath(ENABLE_NULL_SEGMENT_NAME)));
-    for (int i = 0; i < NUM_ROWS; i++) {
-      _intSVValues[i] = RANDOM.nextInt();
-      _doubleValues[i] = RANDOM.nextDouble();
-      _floatValues[i] = RANDOM.nextFloat();
-      _stringSVValues[i] = "a" + RANDOM.nextInt();
-    }
-    List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Map<String, Object> map = new HashMap<>();
-      map.put(INT_SV_COLUMN1, _intSVValues[i]);
-      map.put(INT_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT);
-      map.put(DOUBLE_SV_COLUMN1, _doubleValues[i]);
-      map.put(DOUBLE_SV_COLUMN2, _doubleValues[i] + DOUBLE_VALUE_SHIFT);
-      map.put(FLOAT_SV_COLUMN1, _floatValues[i]);
-      map.put(FLOAT_SV_COLUMN2, _floatValues[i] + FLOAT_VALUE_SHIFT);
-      map.put(STRING_SV_COLUMN1, _stringSVValues[i]);
-      map.put(STRING_SV_COLUMN2, _stringSVValues[i] + SUFFIX);
-      map.put(BIG_DECIMAL_SV_COLUMN1, BigDecimal.valueOf(_intSVValues[i]));
-      map.put(BIG_DECIMAL_SV_COLUMN2, BigDecimal.valueOf(_intSVValues[i] + INT_VALUE_SHIFT));
-      map.put(LONG_SV_COLUMN1, _intSVValues[i]);
-      map.put(LONG_SV_COLUMN2, _intSVValues[i] + INT_VALUE_SHIFT);
-
-      if (isColumn1Null(i)) {
-        map.put(INT_SV_COLUMN1, null);
-        map.put(STRING_SV_COLUMN1, null);
-        map.put(BIG_DECIMAL_SV_COLUMN1, null);
-        map.put(LONG_SV_COLUMN1, null);
-        map.put(DOUBLE_SV_COLUMN1, null);
-        map.put(FLOAT_SV_COLUMN1, null);
-      }
-      if (isColumn2Null(i)) {
-        map.put(INT_SV_COLUMN2, null);
-        map.put(STRING_SV_COLUMN2, null);
-        map.put(LONG_SV_COLUMN2, null);
-        map.put(BIG_DECIMAL_SV_COLUMN2, null);
-        map.put(DOUBLE_SV_COLUMN2, null);
-        map.put(FLOAT_SV_COLUMN2, null);
-      }
-      GenericRow row = new GenericRow();
-      row.init(map);
-      rows.add(row);
-    }
-    Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(INT_SV_COLUMN1, FieldSpec.DataType.INT)
-        .addSingleValueDimension(INT_SV_COLUMN2, FieldSpec.DataType.INT)
-        .addSingleValueDimension(STRING_SV_COLUMN1, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(STRING_SV_COLUMN2, FieldSpec.DataType.STRING)
-        .addSingleValueDimension(LONG_SV_COLUMN1, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(LONG_SV_COLUMN2, FieldSpec.DataType.LONG)
-        .addSingleValueDimension(DOUBLE_SV_COLUMN1, FieldSpec.DataType.DOUBLE)
-        .addSingleValueDimension(DOUBLE_SV_COLUMN2, FieldSpec.DataType.DOUBLE)
-        .addSingleValueDimension(FLOAT_SV_COLUMN1, FieldSpec.DataType.FLOAT)
-        .addSingleValueDimension(FLOAT_SV_COLUMN2, FieldSpec.DataType.FLOAT)
-        .addMetric(BIG_DECIMAL_SV_COLUMN1, FieldSpec.DataType.BIG_DECIMAL)
-        .addMetric(BIG_DECIMAL_SV_COLUMN2, FieldSpec.DataType.BIG_DECIMAL).build();
-    _enableNullDataSourceMap = getDataSourceMap(schema, rows, ENABLE_NULL_SEGMENT_NAME);
-    _enableNullProjectionBlock = getProjectionBlock(_enableNullDataSourceMap);
-    _disableNullDataSourceMap = getDataSourceMap(schema, rows, DISABLE_NULL_SEGMENT_NAME);
-    _disableNullProjectionBlock = getProjectionBlock(_disableNullDataSourceMap);
-  }
-
-  private static void testIntTransformFunction(ExpressionContext expression, int[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    int[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToIntValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testStringTransformFunction(ExpressionContext expression, String[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    String[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToStringValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testLongTransformFunction(ExpressionContext expression, long[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    long[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToLongValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testDoubleTransformFunction(ExpressionContext expression, double[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    double[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToDoubleValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testFloatTransformFunction(ExpressionContext expression, float[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    float[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToFloatValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  private static void testBigDecimalTransformFunction(ExpressionContext expression, BigDecimal[] expectedValues,
-      ProjectionBlock projectionBlock, Map<String, DataSource> dataSourceMap)
-      throws Exception {
-    BigDecimal[] actualValues =
-        TransformFunctionFactory.get(expression, dataSourceMap).transformToBigDecimalValuesSV(projectionBlock);
-    for (int i = 0; i < NUM_ROWS; i++) {
-      Assert.assertEquals(actualValues[i], expectedValues[i]);
-    }
-  }
-
-  // Test the Coalesce on two Int columns where one or the other or both can be null.
   @Test
-  public void testCoalesceIntColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", INT_SV_COLUMN1, INT_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    int[] expectedResults = new int[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_INT;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _intSVValues[i];
-      } else {
-        expectedResults[i] = _intSVValues[i];
-      }
-    }
-    testIntTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testIntTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
+  public void testCoalesceIntColumns() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", INT_SV_NULL_COLUMN, LONG_SV_COLUMN)),
+        _dataSourceMap);
 
-  // Test the Coalesce on two long columns where one or the other or both can be null.
-  @Test
-  public void testCoalesceLongColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", LONG_SV_COLUMN1, LONG_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
     long[] expectedResults = new long[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_LONG;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _intSVValues[i];
-      } else {
+      if (i % 2 == 0) {
         expectedResults[i] = _intSVValues[i];
-      }
-    }
-    testLongTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testLongTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
-
-  // Test the Coalesce on two float columns where one or the other or both can be null.
-  @Test
-  public void testCoalesceFloatColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", FLOAT_SV_COLUMN1, FLOAT_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    float[] expectedResults = new float[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_FLOAT;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _floatValues[i] + FLOAT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _floatValues[i];
-      } else {
-        expectedResults[i] = _floatValues[i];
-      }
-    }
-    testFloatTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testFloatTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
-
-  // Test the Coalesce on two double columns where one or the other or both can be null.
-  @Test
-  public void testCoalesceDoubleColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", DOUBLE_SV_COLUMN1, DOUBLE_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    double[] expectedResults = new double[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_DOUBLE;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _doubleValues[i] + DOUBLE_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = _doubleValues[i];
       } else {
-        expectedResults[i] = _doubleValues[i];
+        expectedResults[i] = _longSVValues[i];
       }
     }
-    testDoubleTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testDoubleTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
+    testTransformFunction(coalesceFunc, expectedResults);
   }
 
-  // Test the Coalesce on two big decimal columns where one or the other or both can be null.
   @Test
-  public void testCoalesceBigDecimalColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr = RequestContextUtils.getExpression(
-        String.format("COALESCE(%s,%s)", BIG_DECIMAL_SV_COLUMN1, BIG_DECIMAL_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    BigDecimal[] expectedResults = new BigDecimal[NUM_ROWS];
+  public void testCoalesceIntColumnsAndLiterals() {
+    final int intLiteral = 313;
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", INT_SV_NULL_COLUMN, intLiteral)),
+        _dataSourceMap);
+    Assert.assertEquals(coalesceFunc.getName(), "coalesce");
+    int[] expectedResults = new int[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_BIG_DECIMAL;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = BigDecimal.valueOf(_intSVValues[i] + INT_VALUE_SHIFT);
-      } else if (isColumn2Null(i)) {
-        expectedResults[i] = BigDecimal.valueOf(_intSVValues[i]);
+      if (i % 2 == 0) {
+        expectedResults[i] = _intSVValues[i];
       } else {
-        expectedResults[i] = BigDecimal.valueOf(_intSVValues[i]);
+        expectedResults[i] = intLiteral;
       }
     }
-    testBigDecimalTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock,
-        _enableNullDataSourceMap);
-    testBigDecimalTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock,
-        _disableNullDataSourceMap);
+    testTransformFunction(coalesceFunc, expectedResults);
   }
 
-  // Test the Coalesce on two String columns where one or the other or both can be null.
   @Test
-  public void testCoalesceStringColumns()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", STRING_SV_COLUMN1, STRING_SV_COLUMN2));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
+  public void testDifferentLiteralArgs() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE(%s, '%s')", STRING_SV_NULL_COLUMN, 234)),
+        _dataSourceMap);
     String[] expectedResults = new String[NUM_ROWS];
     for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = CoalesceTransformFunction.NULL_STRING;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _stringSVValues[i] + SUFFIX;
-      } else if (isColumn2Null(i)) {
+      if (i % 2 == 0) {
         expectedResults[i] = _stringSVValues[i];
       } else {
-        expectedResults[i] = _stringSVValues[i];
+        expectedResults[i] = "234";
       }
     }
-    testStringTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testStringTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
-  }
-
-  // Test that non-column-names appear in one of the argument.
-  @Test
-  public void testIllegalColumnName()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", _stringSVValues[0], STRING_SV_COLUMN1));
-    Assert.assertThrows(RuntimeException.class, () -> {
-      TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    });
-    Assert.assertThrows(RuntimeException.class, () -> {
-      TransformFunctionFactory.get(coalesceExpr, _disableNullDataSourceMap);
-    });
+    testTransformFunction(coalesceFunc, expectedResults);
   }
 
-  // Test that wrong data type is illegal argument.
-  @Test
-  public void testIllegalArgType()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s)", TIMESTAMP_COLUMN, STRING_SV_COLUMN));
-    Assert.assertThrows(RuntimeException.class, () -> {
-      TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    });
-    Assert.assertThrows(RuntimeException.class, () -> {
-      TransformFunctionFactory.get(coalesceExpr, _disableNullDataSourceMap);
-    });
-  }
 
-  // Test the Coalesce on two Int columns (where one or the other or both can be null) and litrals.
   @Test
-  public void testCoalesceIntColumnsAndLiterals()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,%s,%s)", INT_SV_COLUMN1, INT_SV_COLUMN2, 314));
-    TransformFunction coalesceTransformFunction = TransformFunctionFactory.get(coalesceExpr, _enableNullDataSourceMap);
-    Assert.assertEquals(coalesceTransformFunction.getName(), "coalesce");
-    int[] expectedResults = new int[NUM_ROWS];
+  public void testCoalesceNullLiteral() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression(String.format("COALESCE((1 + null), %s)", INT_SV_NULL_COLUMN)),
+        _dataSourceMap);
+    double[] expectedResults = new double[NUM_ROWS];
+    RoaringBitmap expectedNull = new RoaringBitmap();
     for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i) && isColumn2Null(i)) {
-        expectedResults[i] = 314;
-      } else if (isColumn1Null(i)) {
-        expectedResults[i] = _intSVValues[i] + INT_VALUE_SHIFT;
-      } else if (isColumn2Null(i)) {
+      if (i % 2 == 0) {
         expectedResults[i] = _intSVValues[i];
       } else {
-        expectedResults[i] = _intSVValues[i];
+        expectedNull.add(i);
       }
     }
-    testIntTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testIntTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
+    testTransformFunctionWithNull(coalesceFunc, expectedResults, expectedNull);
   }
 
-  // Test that string literal works for coalesce
   @Test
-  public void testDifferentLiteralArgs()
-      throws Exception {
-    ExpressionContext coalesceExpr =
-        RequestContextUtils.getExpression(String.format("COALESCE(%s,'%s')", STRING_SV_COLUMN1, 234));
-    String[] expectedResults = new String[NUM_ROWS];
-    for (int i = 0; i < NUM_ROWS; i++) {
-      if (isColumn1Null(i)) {
-        expectedResults[i] = "234";
-      } else {
-        expectedResults[i] = _stringSVValues[i];
-      }
-    }
-    testStringTransformFunction(coalesceExpr, expectedResults, _enableNullProjectionBlock, _enableNullDataSourceMap);
-    testStringTransformFunction(coalesceExpr, expectedResults, _disableNullProjectionBlock, _disableNullDataSourceMap);
+  public void testCoalesceNullNullLiteral() {
+    TransformFunction coalesceFunc = TransformFunctionFactory.get(
+        RequestContextUtils.getExpression("COALESCE(null, null)"), _dataSourceMap);
+    double[] expectedResults = new double[NUM_ROWS];
+    RoaringBitmap expectedNull = new RoaringBitmap();
+    expectedNull.add(0L, NUM_ROWS);
+    testTransformFunctionWithNull(coalesceFunc, expectedResults, expectedNull);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NullHandlingTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NullHandlingTransformFunctionTest.java
index 4cb00e657f..b412a6489f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NullHandlingTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/NullHandlingTransformFunctionTest.java
@@ -193,6 +193,23 @@ public class NullHandlingTransformFunctionTest {
     testTransformFunction(expression, expectedValues);
   }
 
+  @Test
+  public void testIsNullTransformFunctionNullLiteral()
+      throws Exception {
+    ExpressionContext expression = RequestContextUtils.getExpression(String.format("null IS NULL"));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    assertTrue(transformFunction instanceof IsNullTransformFunction);
+    assertEquals(transformFunction.getName(), TransformFunctionType.IS_NULL.getName());
+    TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
+    assertEquals(resultMetadata.getDataType(), DataType.BOOLEAN);
+    assertTrue(resultMetadata.isSingleValue());
+    boolean[] expectedValues = new boolean[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedValues[i] = true;
+    }
+    testTransformFunction(expression, expectedValues);
+  }
+
   @Test
   public void testIsNotNullTransformFunction()
       throws Exception {
@@ -221,6 +238,23 @@ public class NullHandlingTransformFunctionTest {
     testTransformFunction(expression, expectedValues);
   }
 
+  @Test
+  public void testIsNotNullTransformFunctionNullLiteral()
+      throws Exception {
+    ExpressionContext expression = RequestContextUtils.getExpression(String.format("null IS NOT NULL"));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    assertTrue(transformFunction instanceof IsNotNullTransformFunction);
+    assertEquals(transformFunction.getName(), TransformFunctionType.IS_NOT_NULL.getName());
+    TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
+    assertEquals(resultMetadata.getDataType(), DataType.BOOLEAN);
+    assertTrue(resultMetadata.isSingleValue());
+    boolean[] expectedValues = new boolean[NUM_ROWS];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedValues[i] = false;
+    }
+    testTransformFunction(expression, expectedValues);
+  }
+
   protected void testTransformFunction(ExpressionContext expression, boolean[] expectedValues)
       throws Exception {
     int[] intValues = getTransformFunctionInstance(expression).transformToIntValuesSV(_projectionBlock);
@@ -229,16 +263,14 @@ public class NullHandlingTransformFunctionTest {
     double[] doubleValues = getTransformFunctionInstance(expression).transformToDoubleValuesSV(_projectionBlock);
     BigDecimal[] bigDecimalValues =
         getTransformFunctionInstance(expression).transformToBigDecimalValuesSV(_projectionBlock);
-    // TODO: Support implicit cast from BOOLEAN to STRING
-//    String[] stringValues = getTransformFunctionInstance(expression).transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       assertEquals(intValues[i] == 1, expectedValues[i]);
       assertEquals(longValues[i] == 1, expectedValues[i]);
       assertEquals(floatValues[i] == 1, expectedValues[i]);
       assertEquals(doubleValues[i] == 1, expectedValues[i]);
       assertEquals(bigDecimalValues[i].intValue() == 1, expectedValues[i]);
-//      assertEquals(stringValues[i], Boolean.toString(expectedValues[i]));
     }
+    assertEquals(getTransformFunctionInstance(expression).getNullBitmap(_projectionBlock), null);
   }
 
   private TransformFunction getTransformFunctionInstance(ExpressionContext expression) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org